360 lines
10 KiB
Go
Executable File
360 lines
10 KiB
Go
Executable File
// Copyright (c) 2018 Burak Sezer
|
|
// All rights reserved.
|
|
//
|
|
// This code is licensed under the MIT License.
|
|
//
|
|
// Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
// of this software and associated documentation files(the "Software"), to deal
|
|
// in the Software without restriction, including without limitation the rights
|
|
// to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
|
|
// copies of the Software, and to permit persons to whom the Software is
|
|
// furnished to do so, subject to the following conditions :
|
|
//
|
|
// The above copyright notice and this permission notice shall be included in
|
|
// all copies or substantial portions of the Software.
|
|
//
|
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
|
|
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
|
// THE SOFTWARE.
|
|
|
|
// Package consistent provides a consistent hashing function with bounded loads.
|
|
// For more information about the underlying algorithm, please take a look at
|
|
// https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html
|
|
//
|
|
// Example Use:
|
|
// cfg := consistent.Config{
|
|
// PartitionCount: 71,
|
|
// ReplicationFactor: 20,
|
|
// Load: 1.25,
|
|
// Hasher: hasher{},
|
|
// }
|
|
//
|
|
// // Create a new consistent object
|
|
// // You may call this with a list of members
|
|
// // instead of adding them one by one.
|
|
// c := consistent.New(members, cfg)
|
|
//
|
|
// // myMember struct just needs to implement a String method.
|
|
// // New/Add/Remove distributes partitions among members using the algorithm
|
|
// // defined on Google Research Blog.
|
|
// c.Add(myMember)
|
|
//
|
|
// key := []byte("my-key")
|
|
// // LocateKey hashes the key and calculates partition ID with
|
|
// // this modulo operation: MOD(hash result, partition count)
|
|
// // The owner of the partition is already calculated by New/Add/Remove.
|
|
// // LocateKey just returns the member which's responsible for the key.
|
|
// member := c.LocateKey(key)
|
|
//
|
|
package consistent
|
|
|
|
import (
|
|
"encoding/binary"
|
|
"errors"
|
|
"fmt"
|
|
"math"
|
|
"sort"
|
|
"sync"
|
|
)
|
|
|
|
var (
|
|
//ErrInsufficientMemberCount represents an error which means there are not enough members to complete the task.
|
|
ErrInsufficientMemberCount = errors.New("insufficient member count")
|
|
|
|
// ErrMemberNotFound represents an error which means requested member could not be found in consistent hash ring.
|
|
ErrMemberNotFound = errors.New("member could not be found in ring")
|
|
)
|
|
|
|
// Hasher is responsible for generating unsigned, 64 bit hash of provided byte slice.
|
|
// Hasher should minimize collisions (generating same hash for different byte slice)
|
|
// and while performance is also important fast functions are preferable (i.e.
|
|
// you can use FarmHash family).
|
|
type Hasher interface {
|
|
Sum64([]byte) uint64
|
|
}
|
|
|
|
// Member interface represents a member in consistent hash ring.
|
|
type Member interface {
|
|
String() string
|
|
}
|
|
|
|
// Config represents a structure to control consistent package.
|
|
type Config struct {
|
|
// Hasher is responsible for generating unsigned, 64 bit hash of provided byte slice.
|
|
Hasher Hasher
|
|
|
|
// Keys are distributed among partitions. Prime numbers are good to
|
|
// distribute keys uniformly. Select a big PartitionCount if you have
|
|
// too many keys.
|
|
PartitionCount int
|
|
|
|
// Members are replicated on consistent hash ring. This number means that a member
|
|
// how many times replicated on the ring.
|
|
ReplicationFactor int
|
|
|
|
// Load is used to calculate average load. See the code, the paper and Google's blog post to learn about it.
|
|
Load float64
|
|
}
|
|
|
|
// Consistent holds the information about the members of the consistent hash circle.
|
|
type Consistent struct {
|
|
mu sync.RWMutex
|
|
|
|
config Config
|
|
hasher Hasher
|
|
sortedSet []uint64
|
|
partitionCount uint64
|
|
loads map[string]float64
|
|
members map[string]*Member
|
|
partitions map[int]*Member
|
|
ring map[uint64]*Member
|
|
}
|
|
|
|
// New creates and returns a new Consistent object.
|
|
func New(members []Member, config Config) *Consistent {
|
|
c := &Consistent{
|
|
config: config,
|
|
members: make(map[string]*Member),
|
|
partitionCount: uint64(config.PartitionCount),
|
|
ring: make(map[uint64]*Member),
|
|
}
|
|
if config.Hasher == nil {
|
|
panic("Hasher cannot be nil")
|
|
}
|
|
// TODO: Check configuration here
|
|
c.hasher = config.Hasher
|
|
for _, member := range members {
|
|
c.add(member)
|
|
}
|
|
if members != nil {
|
|
c.distributePartitions()
|
|
}
|
|
return c
|
|
}
|
|
|
|
// GetMembers returns a thread-safe copy of members.
|
|
func (c *Consistent) GetMembers() []Member {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
|
|
// Create a thread-safe copy of member list.
|
|
members := []Member{}
|
|
for _, member := range c.members {
|
|
members = append(members, *member)
|
|
}
|
|
return members
|
|
}
|
|
|
|
// AverageLoad exposes the current average load.
|
|
func (c *Consistent) AverageLoad() float64 {
|
|
avgLoad := float64(c.partitionCount/uint64(len(c.members))) * c.config.Load
|
|
return math.Ceil(avgLoad)
|
|
}
|
|
|
|
func (c *Consistent) distributeWithLoad(partID, idx int, partitions map[int]*Member, loads map[string]float64) {
|
|
avgLoad := c.AverageLoad()
|
|
var count int
|
|
for {
|
|
count++
|
|
if count >= len(c.sortedSet) {
|
|
// User needs to decrease partition count, increase member count or increase load factor.
|
|
panic("not enough room to distribute partitions")
|
|
}
|
|
i := c.sortedSet[idx]
|
|
tmp := c.ring[i]
|
|
member := *tmp
|
|
load := loads[member.String()]
|
|
if load+1 <= avgLoad {
|
|
partitions[partID] = &member
|
|
loads[member.String()]++
|
|
return
|
|
}
|
|
idx++
|
|
if idx >= len(c.sortedSet) {
|
|
idx = 0
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *Consistent) distributePartitions() {
|
|
loads := make(map[string]float64)
|
|
partitions := make(map[int]*Member)
|
|
|
|
bs := make([]byte, 8)
|
|
for partID := uint64(0); partID < c.partitionCount; partID++ {
|
|
binary.LittleEndian.PutUint64(bs, partID)
|
|
key := c.hasher.Sum64(bs)
|
|
idx := sort.Search(len(c.sortedSet), func(i int) bool {
|
|
return c.sortedSet[i] >= key
|
|
})
|
|
if idx >= len(c.sortedSet) {
|
|
idx = 0
|
|
}
|
|
c.distributeWithLoad(int(partID), idx, partitions, loads)
|
|
}
|
|
c.partitions = partitions
|
|
c.loads = loads
|
|
}
|
|
|
|
func (c *Consistent) add(member Member) {
|
|
for i := 0; i < c.config.ReplicationFactor; i++ {
|
|
key := []byte(fmt.Sprintf("%s%d", member.String(), i))
|
|
h := c.hasher.Sum64(key)
|
|
c.ring[h] = &member
|
|
c.sortedSet = append(c.sortedSet, h)
|
|
}
|
|
// sort hashes ascendingly
|
|
sort.Slice(c.sortedSet, func(i int, j int) bool {
|
|
return c.sortedSet[i] < c.sortedSet[j]
|
|
})
|
|
// Storing member at this map is useful to find backup members of a partition.
|
|
c.members[member.String()] = &member
|
|
}
|
|
|
|
// Add adds a new member to the consistent hash circle.
|
|
func (c *Consistent) Add(member Member) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
if _, ok := c.members[member.String()]; ok {
|
|
// We have already have this. Quit immediately.
|
|
return
|
|
}
|
|
c.add(member)
|
|
c.distributePartitions()
|
|
}
|
|
|
|
func (c *Consistent) delSlice(val uint64) {
|
|
for i := 0; i < len(c.sortedSet); i++ {
|
|
if c.sortedSet[i] == val {
|
|
c.sortedSet = append(c.sortedSet[:i], c.sortedSet[i+1:]...)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// Remove removes a member from the consistent hash circle.
|
|
func (c *Consistent) Remove(name string) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
if _, ok := c.members[name]; !ok {
|
|
// There is no member with that name. Quit immediately.
|
|
return
|
|
}
|
|
|
|
for i := 0; i < c.config.ReplicationFactor; i++ {
|
|
key := []byte(fmt.Sprintf("%s%d", name, i))
|
|
h := c.hasher.Sum64(key)
|
|
delete(c.ring, h)
|
|
c.delSlice(h)
|
|
}
|
|
delete(c.members, name)
|
|
if len(c.members) == 0 {
|
|
// consistent hash ring is empty now. Reset the partition table.
|
|
c.partitions = make(map[int]*Member)
|
|
return
|
|
}
|
|
c.distributePartitions()
|
|
}
|
|
|
|
// LoadDistribution exposes load distribution of members.
|
|
func (c *Consistent) LoadDistribution() map[string]float64 {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
|
|
// Create a thread-safe copy
|
|
res := make(map[string]float64)
|
|
for member, load := range c.loads {
|
|
res[member] = load
|
|
}
|
|
return res
|
|
}
|
|
|
|
// FindPartitionID returns partition id for given key.
|
|
func (c *Consistent) FindPartitionID(key []byte) int {
|
|
hkey := c.hasher.Sum64(key)
|
|
return int(hkey % c.partitionCount)
|
|
}
|
|
|
|
// GetPartitionOwner returns the owner of the given partition.
|
|
func (c *Consistent) GetPartitionOwner(partID int) Member {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
|
|
member, ok := c.partitions[partID]
|
|
if !ok {
|
|
return nil
|
|
}
|
|
// Create a thread-safe copy of member and return it.
|
|
return *member
|
|
}
|
|
|
|
// LocateKey finds a home for given key
|
|
func (c *Consistent) LocateKey(key []byte) Member {
|
|
partID := c.FindPartitionID(key)
|
|
return c.GetPartitionOwner(partID)
|
|
}
|
|
|
|
func (c *Consistent) getClosestN(partID, count int) ([]Member, error) {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
|
|
res := []Member{}
|
|
if count > len(c.members)-1 {
|
|
return res, ErrInsufficientMemberCount
|
|
}
|
|
|
|
var ownerKey uint64
|
|
owner := c.GetPartitionOwner(partID)
|
|
// Hash and sort all the names.
|
|
keys := []uint64{}
|
|
kmems := make(map[uint64]*Member)
|
|
for name, member := range c.members {
|
|
key := c.hasher.Sum64([]byte(name))
|
|
if name == owner.String() {
|
|
ownerKey = key
|
|
}
|
|
keys = append(keys, key)
|
|
kmems[key] = member
|
|
}
|
|
sort.Slice(keys, func(i, j int) bool {
|
|
return keys[i] < keys[j]
|
|
})
|
|
|
|
// Find the member
|
|
idx := 0
|
|
for idx < len(keys) {
|
|
if keys[idx] == ownerKey {
|
|
break
|
|
}
|
|
idx++
|
|
}
|
|
|
|
// Find the closest members.
|
|
for len(res) < count {
|
|
idx++
|
|
if idx >= len(keys) {
|
|
idx = 0
|
|
}
|
|
key := keys[idx]
|
|
res = append(res, *kmems[key])
|
|
}
|
|
return res, nil
|
|
}
|
|
|
|
// GetClosestN returns the closest N member to a key in the hash ring. This may be useful to find members for replication.
|
|
func (c *Consistent) GetClosestN(key []byte, count int) ([]Member, error) {
|
|
partID := c.FindPartitionID(key)
|
|
return c.getClosestN(partID, count)
|
|
}
|
|
|
|
// GetClosestNForPartition returns the closest N member for given partition. This may be useful to find members for replication.
|
|
func (c *Consistent) GetClosestNForPartition(partID, count int) ([]Member, error) {
|
|
return c.getClosestN(partID, count)
|
|
}
|