Documentation ¶
Index ¶
- Constants
- Variables
- func DoBatch(ctx context.Context, op Operation, r ReadRing, keys []uint32, ...) error
- func GetCodec() codec.Codec
- func GetInstanceAddr(configAddr string, netInterfaces []string, logger log.Logger) (string, error)
- func GetInstancePort(configPort, listenPort int) int
- func HasInstanceDescsChanged(beforeByID, afterByID map[string]InstanceDesc, ...) bool
- func HasReplicationSetChanged(before, after ReplicationSet) bool
- func HasReplicationSetChangedWithoutState(before, after ReplicationSet) bool
- func HasTokensChanged(before, after InstanceDesc) bool
- func HasZoneChanged(before, after InstanceDesc) bool
- func MergeTokens(instances [][]uint32) []uint32
- func MergeTokensByZone(zones map[string][][]uint32) map[string][]uint32
- func ProtoDescFactory() proto.Message
- func WaitInstanceState(ctx context.Context, r ReadRing, instanceID string, state InstanceState) error
- func WaitRingStability(ctx context.Context, r *Ring, op Operation, ...) error
- func WaitRingTokensStability(ctx context.Context, r *Ring, op Operation, ...) error
- type AutoForgetDelegate
- func (d *AutoForgetDelegate) OnRingInstanceHeartbeat(lifecycler *BasicLifecycler, ringDesc *Desc, instanceDesc *InstanceDesc)
- func (d *AutoForgetDelegate) OnRingInstanceRegister(lifecycler *BasicLifecycler, ringDesc Desc, instanceExists bool, ...) (InstanceState, Tokens)
- func (d *AutoForgetDelegate) OnRingInstanceStopping(lifecycler *BasicLifecycler)
- func (d *AutoForgetDelegate) OnRingInstanceTokens(lifecycler *BasicLifecycler, tokens Tokens)
- type BasicLifecycler
- func (l *BasicLifecycler) ChangeState(ctx context.Context, state InstanceState) error
- func (l *BasicLifecycler) GetInstanceAddr() string
- func (l *BasicLifecycler) GetInstanceID() string
- func (l *BasicLifecycler) GetInstanceZone() string
- func (l *BasicLifecycler) GetRegisteredAt() time.Time
- func (l *BasicLifecycler) GetState() InstanceState
- func (l *BasicLifecycler) GetTokens() Tokens
- func (l *BasicLifecycler) IsRegistered() bool
- type BasicLifecyclerConfig
- type BasicLifecyclerDelegate
- type BasicLifecyclerMetrics
- type ByAddr
- type CompareResult
- type Config
- type Desc
- func (d *Desc) AddIngester(id, addr, zone string, tokens []uint32, state InstanceState, ...) InstanceDesc
- func (d *Desc) ClaimTokens(from, to string) Tokens
- func (d *Desc) Clone() interface{}
- func (*Desc) Descriptor() ([]byte, []int)
- func (this *Desc) Equal(that interface{}) bool
- func (d *Desc) FindDifference(o codec.MultiKey) (interface{}, []string, error)
- func (d *Desc) FindIngestersByState(state InstanceState) []InstanceDesc
- func (m *Desc) GetIngesters() map[string]InstanceDesc
- func (d *Desc) GetItemFactory() proto.Message
- func (d *Desc) GetTokens() []uint32
- func (this *Desc) GoString() string
- func (d *Desc) IsReady(storageLastUpdated time.Time, heartbeatTimeout time.Duration) error
- func (d *Desc) JoinIds(in map[string]interface{})
- func (m *Desc) Marshal() (dAtA []byte, err error)
- func (m *Desc) MarshalTo(dAtA []byte) (int, error)
- func (m *Desc) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (d *Desc) Merge(mergeable memberlist.Mergeable, localCAS bool) (memberlist.Mergeable, error)
- func (d *Desc) MergeContent() []string
- func (*Desc) ProtoMessage()
- func (d *Desc) RemoveIngester(id string)
- func (d *Desc) RemoveTombstones(limit time.Time) (total, removed int)
- func (m *Desc) Reset()
- func (d *Desc) RingCompare(o *Desc) CompareResult
- func (m *Desc) Size() (n int)
- func (d *Desc) SplitByID() map[string]interface{}
- func (this *Desc) String() string
- func (d *Desc) TokensFor(id string) (myTokens, allTokens Tokens)
- func (m *Desc) Unmarshal(dAtA []byte) error
- func (m *Desc) XXX_DiscardUnknown()
- func (m *Desc) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *Desc) XXX_Merge(src proto.Message)
- func (m *Desc) XXX_Size() int
- func (m *Desc) XXX_Unmarshal(b []byte) error
- type FlushTransferer
- type InstanceDesc
- func (*InstanceDesc) Descriptor() ([]byte, []int)
- func (this *InstanceDesc) Equal(that interface{}) bool
- func (m *InstanceDesc) GetAddr() string
- func (i *InstanceDesc) GetRegisteredAt() time.Time
- func (m *InstanceDesc) GetRegisteredTimestamp() int64
- func (m *InstanceDesc) GetState() InstanceState
- func (m *InstanceDesc) GetTimestamp() int64
- func (m *InstanceDesc) GetTokens() []uint32
- func (m *InstanceDesc) GetZone() string
- func (this *InstanceDesc) GoString() string
- func (i *InstanceDesc) IsHealthy(op Operation, heartbeatTimeout time.Duration, storageLastUpdated time.Time) bool
- func (i *InstanceDesc) IsHeartbeatHealthy(heartbeatTimeout time.Duration, storageLastUpdated time.Time) bool
- func (i *InstanceDesc) IsReady(storageLastUpdated time.Time, heartbeatTimeout time.Duration) error
- func (m *InstanceDesc) Marshal() (dAtA []byte, err error)
- func (m *InstanceDesc) MarshalTo(dAtA []byte) (int, error)
- func (m *InstanceDesc) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*InstanceDesc) ProtoMessage()
- func (m *InstanceDesc) Reset()
- func (m *InstanceDesc) Size() (n int)
- func (this *InstanceDesc) String() string
- func (m *InstanceDesc) Unmarshal(dAtA []byte) error
- func (m *InstanceDesc) XXX_DiscardUnknown()
- func (m *InstanceDesc) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *InstanceDesc) XXX_Merge(src proto.Message)
- func (m *InstanceDesc) XXX_Size() int
- func (m *InstanceDesc) XXX_Unmarshal(b []byte) error
- type InstanceState
- type LeaveOnStoppingDelegate
- func (d *LeaveOnStoppingDelegate) OnRingInstanceHeartbeat(lifecycler *BasicLifecycler, ringDesc *Desc, instanceDesc *InstanceDesc)
- func (d *LeaveOnStoppingDelegate) OnRingInstanceRegister(lifecycler *BasicLifecycler, ringDesc Desc, instanceExists bool, ...) (InstanceState, Tokens)
- func (d *LeaveOnStoppingDelegate) OnRingInstanceStopping(lifecycler *BasicLifecycler)
- func (d *LeaveOnStoppingDelegate) OnRingInstanceTokens(lifecycler *BasicLifecycler, tokens Tokens)
- type Lifecycler
- func (i *Lifecycler) ChangeState(ctx context.Context, state InstanceState) error
- func (i *Lifecycler) CheckReady(ctx context.Context) error
- func (i *Lifecycler) ClaimTokensFor(ctx context.Context, ingesterID string) error
- func (i *Lifecycler) FlushOnShutdown() bool
- func (i *Lifecycler) GetState() InstanceState
- func (i *Lifecycler) HealthyInstancesCount() int
- func (i *Lifecycler) Join()
- func (i *Lifecycler) RenewTokens(ratio float64, ctx context.Context)
- func (i *Lifecycler) SetFlushOnShutdown(flushOnShutdown bool)
- func (i *Lifecycler) SetUnregisterOnShutdown(enabled bool)
- func (i *Lifecycler) ShouldUnregisterOnShutdown() bool
- func (i *Lifecycler) Zones() []string
- func (i *Lifecycler) ZonesCount() int
- type LifecyclerConfig
- type LifecyclerMetrics
- type MinimizeSpreadTokenGenerator
- type NoopFlushTransferer
- type Operation
- type RandomTokenGenerator
- type ReadRing
- type ReplicationSet
- func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, zoneResultsQuorum bool, ...) ([]interface{}, error)
- func (r ReplicationSet) GetAddresses() []string
- func (r ReplicationSet) GetAddressesWithout(exclude string) []string
- func (r ReplicationSet) GetNumOfZones() int
- func (r ReplicationSet) Includes(addr string) bool
- type ReplicationStrategy
- type Ring
- func (r *Ring) CleanupShuffleShardCache(identifier string)
- func (r *Ring) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts []string, ...) (ReplicationSet, error)
- func (r *Ring) GetAllHealthy(op Operation) (ReplicationSet, error)
- func (r *Ring) GetAllInstanceDescs(op Operation) ([]InstanceDesc, []InstanceDesc, error)
- func (r *Ring) GetInstanceDescsForOperation(op Operation) (map[string]InstanceDesc, error)
- func (r *Ring) GetInstanceIdByAddr(addr string) (string, error)
- func (r *Ring) GetInstanceState(instanceID string) (InstanceState, error)
- func (r *Ring) GetReplicationSetForOperation(op Operation) (ReplicationSet, error)
- func (r *Ring) HasInstance(instanceID string) bool
- func (r *Ring) InstancesCount() int
- func (r *Ring) IsHealthy(instance *InstanceDesc, op Operation, storageLastUpdate time.Time) bool
- func (r *Ring) ReplicationFactor() int
- func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request)
- func (r *Ring) ShuffleShard(identifier string, size int) ReadRing
- func (r *Ring) ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) ReadRing
- func (r *Ring) ShuffleShardWithZoneStability(identifier string, size int) ReadRing
- type TokenGenerator
- type Tokens
- type TokensHeap
- type TokensPersistencyDelegate
- func (d *TokensPersistencyDelegate) OnRingInstanceHeartbeat(lifecycler *BasicLifecycler, ringDesc *Desc, instanceDesc *InstanceDesc)
- func (d *TokensPersistencyDelegate) OnRingInstanceRegister(lifecycler *BasicLifecycler, ringDesc Desc, instanceExists bool, ...) (InstanceState, Tokens)
- func (d *TokensPersistencyDelegate) OnRingInstanceStopping(lifecycler *BasicLifecycler)
- func (d *TokensPersistencyDelegate) OnRingInstanceTokens(lifecycler *BasicLifecycler, tokens Tokens)
Constants ¶
const ( // GetBufferSize is the suggested size of buffers passed to Ring.Get(). It's based on // a typical replication factor 3, plus extra room for a JOINING + LEAVING instance. GetBufferSize = 5 // GetZoneSize is the suggested size of zone map passed to Ring.Get(). It's based on // a typical replication factor 3. GetZoneSize = 3 )
Variables ¶
var ( // Write operation that also extends replica set, if instance state is not ACTIVE. Write = NewOp([]InstanceState{ACTIVE}, func(s InstanceState) bool { return s != ACTIVE }) // WriteNoExtend is like Write, but with no replicaset extension. WriteNoExtend = NewOp([]InstanceState{ACTIVE}, nil) // Read operation that extends the replica set if an instance is not ACTIVE, LEAVING OR JOINING Read = NewOp([]InstanceState{ACTIVE, PENDING, LEAVING, JOINING}, func(s InstanceState) bool { return s != ACTIVE && s != LEAVING && s != JOINING }) // Reporting is a special value for inquiring about health. Reporting = allStatesRingOperation )
var ( // ErrEmptyRing is the error returned when trying to get an element when nothing has been added to hash. ErrEmptyRing = errors.New("empty ring") // ErrInstanceNotFound is the error returned when trying to get information for an instance // not registered within the ring. ErrInstanceNotFound = errors.New("instance not found in the ring") // ErrTooManyUnhealthyInstances is the error returned when there are too many failed instances for a // specific operation. ErrTooManyUnhealthyInstances = errors.New("too many unhealthy instances in the ring") // ErrInconsistentTokensInfo is the error returned if, due to an internal bug, the mapping between // a token and its own instance is missing or unknown. ErrInconsistentTokensInfo = errors.New("inconsistent ring tokens information") )
var ( ErrInvalidLengthRing = fmt.Errorf("proto: negative length found during unmarshaling") ErrIntOverflowRing = fmt.Errorf("proto: integer overflow") )
var InstanceState_name = map[int32]string{
0: "ACTIVE",
1: "LEAVING",
2: "PENDING",
3: "JOINING",
4: "LEFT",
}
var InstanceState_value = map[string]int32{
"ACTIVE": 0,
"LEAVING": 1,
"PENDING": 2,
"JOINING": 3,
"LEFT": 4,
}
Functions ¶
func DoBatch ¶
func DoBatch(ctx context.Context, op Operation, r ReadRing, keys []uint32, callback func(InstanceDesc, []int) error, cleanup func()) error
DoBatch request against a set of keys in the ring, handling replication and failures. For example if we want to write N items where they may all hit different instances, and we want them all replicated R ways with quorum writes, we track the relationship between batch RPCs and the items within them.
Callback is passed the instance to target, and the indexes of the keys to send to that instance.
cleanup() is always called, either on an error before starting the batches or after they all finish.
Not implemented as a method on Ring so we can test separately.
func GetInstanceAddr ¶ added in v1.1.0
GetInstanceAddr returns the address to use to register the instance in the ring.
func GetInstancePort ¶ added in v1.1.0
GetInstancePort returns the port to use to register the instance in the ring.
func HasInstanceDescsChanged ¶ added in v1.16.0
func HasInstanceDescsChanged(beforeByID, afterByID map[string]InstanceDesc, hasChanged func(b, a InstanceDesc) bool) bool
func HasReplicationSetChanged ¶ added in v1.5.0
func HasReplicationSetChanged(before, after ReplicationSet) bool
HasReplicationSetChanged returns false if two replications sets are the same (with possibly different timestamps), true if they differ in any way (number of instances, instance states, tokens, zones, ...).
func HasReplicationSetChangedWithoutState ¶ added in v1.13.0
func HasReplicationSetChangedWithoutState(before, after ReplicationSet) bool
HasReplicationSetChangedWithoutState returns false if two replications sets are the same (with possibly different timestamps and instance states), true if they differ in any other way (number of instances, tokens, zones, ...).
func HasTokensChanged ¶ added in v1.16.0
func HasTokensChanged(before, after InstanceDesc) bool
func HasZoneChanged ¶ added in v1.16.0
func HasZoneChanged(before, after InstanceDesc) bool
func MergeTokens ¶ added in v1.7.0
MergeTokens takes in input multiple lists of tokens and returns a single list containing all tokens merged and sorted. Each input single list is required to have tokens already sorted.
func MergeTokensByZone ¶ added in v1.7.0
MergeTokensByZone is like MergeTokens but does it for each input zone.
func WaitInstanceState ¶ added in v1.1.0
func WaitInstanceState(ctx context.Context, r ReadRing, instanceID string, state InstanceState) error
WaitInstanceState waits until the input instanceID is registered within the ring matching the provided state. A timeout should be provided within the context.
func WaitRingStability ¶ added in v1.6.0
func WaitRingStability(ctx context.Context, r *Ring, op Operation, minStability, maxWaiting time.Duration) error
WaitRingStability monitors the ring topology for the provided operation and waits until it keeps stable for at least minStability.
func WaitRingTokensStability ¶ added in v1.13.0
func WaitRingTokensStability(ctx context.Context, r *Ring, op Operation, minStability, maxWaiting time.Duration) error
WaitRingTokensStability waits for the Ring to be unchanged at least for minStability time period, excluding transitioning between allowed states (e.g. JOINING->ACTIVE if allowed by op). This can be used to avoid wasting resources on moving data around due to multiple changes in the Ring.
Types ¶
type AutoForgetDelegate ¶ added in v1.1.0
type AutoForgetDelegate struct {
// contains filtered or unexported fields
}
AutoForgetDelegate automatically remove an instance from the ring if the last heartbeat is older than a configured period.
func NewAutoForgetDelegate ¶ added in v1.1.0
func NewAutoForgetDelegate(forgetPeriod time.Duration, next BasicLifecyclerDelegate, logger log.Logger) *AutoForgetDelegate
func (*AutoForgetDelegate) OnRingInstanceHeartbeat ¶ added in v1.1.0
func (d *AutoForgetDelegate) OnRingInstanceHeartbeat(lifecycler *BasicLifecycler, ringDesc *Desc, instanceDesc *InstanceDesc)
func (*AutoForgetDelegate) OnRingInstanceRegister ¶ added in v1.1.0
func (d *AutoForgetDelegate) OnRingInstanceRegister(lifecycler *BasicLifecycler, ringDesc Desc, instanceExists bool, instanceID string, instanceDesc InstanceDesc) (InstanceState, Tokens)
func (*AutoForgetDelegate) OnRingInstanceStopping ¶ added in v1.1.0
func (d *AutoForgetDelegate) OnRingInstanceStopping(lifecycler *BasicLifecycler)
func (*AutoForgetDelegate) OnRingInstanceTokens ¶ added in v1.1.0
func (d *AutoForgetDelegate) OnRingInstanceTokens(lifecycler *BasicLifecycler, tokens Tokens)
type BasicLifecycler ¶ added in v1.1.0
type BasicLifecycler struct { *services.BasicService TokenGenerator // contains filtered or unexported fields }
BasicLifecycler is a basic ring lifecycler which allows to hook custom logic at different stages of the lifecycle. This lifecycler should be used to build higher level lifecyclers.
This lifecycler never change the instance state. It's the delegate responsibility to ChangeState().
func NewBasicLifecycler ¶ added in v1.1.0
func NewBasicLifecycler(cfg BasicLifecyclerConfig, ringName, ringKey string, store kv.Client, delegate BasicLifecyclerDelegate, logger log.Logger, reg prometheus.Registerer) (*BasicLifecycler, error)
NewBasicLifecycler makes a new BasicLifecycler.
func (*BasicLifecycler) ChangeState ¶ added in v1.1.0
func (l *BasicLifecycler) ChangeState(ctx context.Context, state InstanceState) error
func (*BasicLifecycler) GetInstanceAddr ¶ added in v1.1.0
func (l *BasicLifecycler) GetInstanceAddr() string
func (*BasicLifecycler) GetInstanceID ¶ added in v1.1.0
func (l *BasicLifecycler) GetInstanceID() string
func (*BasicLifecycler) GetInstanceZone ¶ added in v1.1.0
func (l *BasicLifecycler) GetInstanceZone() string
func (*BasicLifecycler) GetRegisteredAt ¶ added in v1.5.0
func (l *BasicLifecycler) GetRegisteredAt() time.Time
GetRegisteredAt returns the timestamp when the instance has been registered to the ring or a zero value if the lifecycler hasn't been started yet or was already registered and its timestamp is unknown.
func (*BasicLifecycler) GetState ¶ added in v1.1.0
func (l *BasicLifecycler) GetState() InstanceState
func (*BasicLifecycler) GetTokens ¶ added in v1.1.0
func (l *BasicLifecycler) GetTokens() Tokens
func (*BasicLifecycler) IsRegistered ¶ added in v1.1.0
func (l *BasicLifecycler) IsRegistered() bool
IsRegistered returns whether the instance is currently registered within the ring.
type BasicLifecyclerConfig ¶ added in v1.1.0
type BasicLifecyclerConfig struct { // ID is the instance unique ID. ID string // Addr is the instance address, in the form "address:port". Addr string // Zone is the instance availability zone. Can be an empty string // if zone awareness is unused. Zone string HeartbeatPeriod time.Duration TokensObservePeriod time.Duration NumTokens int TokensGeneratorStrategy string // If true lifecycler doesn't unregister instance from the ring when it's stopping. Default value is false, // which means unregistering. KeepInstanceInTheRingOnShutdown bool FinalSleep time.Duration }
type BasicLifecyclerDelegate ¶ added in v1.1.0
type BasicLifecyclerDelegate interface { // OnRingInstanceRegister is called while the lifecycler is registering the // instance within the ring and should return the state and set of tokens to // use for the instance itself. OnRingInstanceRegister(lifecycler *BasicLifecycler, ringDesc Desc, instanceExists bool, instanceID string, instanceDesc InstanceDesc) (InstanceState, Tokens) // OnRingInstanceTokens is called once the instance tokens are set and are // stable within the ring (honoring the observe period, if set). OnRingInstanceTokens(lifecycler *BasicLifecycler, tokens Tokens) // OnRingInstanceStopping is called while the lifecycler is stopping. The lifecycler // will continue to hearbeat the ring the this function is executing and will proceed // to unregister the instance from the ring only after this function has returned. OnRingInstanceStopping(lifecycler *BasicLifecycler) // OnRingInstanceHeartbeat is called while the instance is updating its heartbeat // in the ring. OnRingInstanceHeartbeat(lifecycler *BasicLifecycler, ringDesc *Desc, instanceDesc *InstanceDesc) }
type BasicLifecyclerMetrics ¶ added in v1.1.0
type BasicLifecyclerMetrics struct {
// contains filtered or unexported fields
}
func NewBasicLifecyclerMetrics ¶ added in v1.1.0
func NewBasicLifecyclerMetrics(ringName string, reg prometheus.Registerer) *BasicLifecyclerMetrics
type CompareResult ¶ added in v1.5.0
type CompareResult int
const ( Equal CompareResult = iota // Both rings contain same exact instances. EqualButStatesAndTimestamps // Both rings contain the same instances with the same data except states and timestamps (may differ). Different // Rings have different set of instances, or their information don't match. )
CompareResult responses
type Config ¶
type Config struct { KVStore kv.Config `yaml:"kvstore"` HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"` ReplicationFactor int `yaml:"replication_factor"` ZoneAwarenessEnabled bool `yaml:"zone_awareness_enabled"` ExcludedZones flagext.StringSliceCSV `yaml:"excluded_zones"` DetailedMetricsEnabled bool `yaml:"detailed_metrics_enabled"` // Whether the shuffle-sharding subring cache is disabled. This option is set // internally and never exposed to the user. SubringCacheDisabled bool `yaml:"-"` }
Config for a Ring
func (*Config) RegisterFlags ¶
RegisterFlags adds the flags required to config this to the given FlagSet with a specified prefix
type Desc ¶
type Desc struct {
Ingesters map[string]InstanceDesc `` /* 149-byte string literal not displayed */
}
func GetOrCreateRingDesc ¶ added in v1.1.0
func GetOrCreateRingDesc(d interface{}) *Desc
func (*Desc) AddIngester ¶
func (d *Desc) AddIngester(id, addr, zone string, tokens []uint32, state InstanceState, registeredAt time.Time) InstanceDesc
AddIngester adds the given ingester to the ring. Ingester will only use supplied tokens, any other tokens are removed.
func (*Desc) ClaimTokens ¶
ClaimTokens transfers all the tokens from one ingester to another, returning the claimed token. This method assumes that Ring is in the correct state, 'to' ingester has no tokens anywhere. Tokens list must be sorted properly. If all of this is true, everything will be fine.
func (*Desc) Clone ¶ added in v1.11.0
func (d *Desc) Clone() interface{}
Clone returns a deep copy of the ring state.
func (*Desc) Descriptor ¶
func (*Desc) FindDifference ¶ added in v1.15.0
func (*Desc) FindIngestersByState ¶
func (d *Desc) FindIngestersByState(state InstanceState) []InstanceDesc
FindIngestersByState returns the list of ingesters in the given state
func (*Desc) GetIngesters ¶
func (m *Desc) GetIngesters() map[string]InstanceDesc
func (*Desc) GetItemFactory ¶ added in v1.15.0
func (*Desc) GetTokens ¶
GetTokens returns sorted list of tokens owned by all instances within the ring.
func (*Desc) IsReady ¶ added in v1.13.0
IsReady returns no error when all instance are ACTIVE and healthy, and the ring has some tokens.
func (*Desc) MarshalToSizedBuffer ¶ added in v0.7.0
func (*Desc) Merge ¶ added in v0.4.0
func (d *Desc) Merge(mergeable memberlist.Mergeable, localCAS bool) (memberlist.Mergeable, error)
Merge merges other ring into this one. Returns sub-ring that represents the change, and can be sent out to other clients.
This merge function depends on the timestamp of the ingester. For each ingester, it will choose more recent state from the two rings, and put that into this ring. There is one exception: we accept LEFT state even if Timestamp hasn't changed.
localCAS flag tells the merge that it can use incoming ring as a full state, and detect missing ingesters based on it. Ingesters from incoming ring will cause ingester to be marked as LEFT and gossiped about.
If multiple ingesters end up owning the same tokens, Merge will do token conflict resolution (see resolveConflicts).
This method is part of memberlist.Mergeable interface, and is only used by gossiping ring.
The receiver must be normalised, that is, the token lists must sorted and not contain duplicates. The function guarantees that the receiver will be left in this normalised state, so multiple subsequent Merge calls are valid usage.
The Mergeable passed as the parameter does not need to be normalised.
Note: This method modifies d and mergeable to reduce allocations and copies.
func (*Desc) MergeContent ¶ added in v0.4.0
MergeContent describes content of this Mergeable. Ring simply returns list of ingesters that it includes.
func (*Desc) ProtoMessage ¶
func (*Desc) ProtoMessage()
func (*Desc) RemoveIngester ¶
RemoveIngester removes the given ingester and all its tokens.
func (*Desc) RemoveTombstones ¶ added in v0.4.0
RemoveTombstones removes LEFT ingesters older than given time limit. If time limit is zero, remove all LEFT ingesters.
func (*Desc) RingCompare ¶ added in v1.5.0
func (d *Desc) RingCompare(o *Desc) CompareResult
RingCompare compares this ring against another one and returns one of Equal, EqualButStatesAndTimestamps or Different.
func (*Desc) TokensFor ¶
TokensFor return all ring tokens and tokens for the input provided ID. Returned tokens are guaranteed to be sorted.
func (*Desc) XXX_DiscardUnknown ¶
func (m *Desc) XXX_DiscardUnknown()
func (*Desc) XXX_Unmarshal ¶
type FlushTransferer ¶
type FlushTransferer interface {
Flush()
}
FlushTransferer controls the shutdown of an instance in the ring. Methods on this interface are called when lifecycler is stopping. At that point, it no longer runs the "actor loop", but it keeps updating heartbeat in the ring. Ring entry is in LEAVING state.
type InstanceDesc ¶ added in v1.8.0
type InstanceDesc struct { Addr string `protobuf:"bytes,1,opt,name=addr,proto3" json:"addr,omitempty"` // Unix timestamp (with seconds precision) of the last heartbeat sent // by this instance. Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"` State InstanceState `protobuf:"varint,3,opt,name=state,proto3,enum=ring.InstanceState" json:"state,omitempty"` Tokens []uint32 `protobuf:"varint,6,rep,packed,name=tokens,proto3" json:"tokens,omitempty"` Zone string `protobuf:"bytes,7,opt,name=zone,proto3" json:"zone,omitempty"` // Unix timestamp (with seconds precision) of when the instance has been registered // to the ring. This field has not been called "joined_timestamp" intentionally, in order // to not introduce any misunderstanding with the instance's "joining" state. // // This field is used to find out subset of instances that could have possibly owned a // specific token in the past. Because of this, it's important that either this timestamp // is set to the real time the instance has been registered to the ring or it's left // 0 (which means unknown). // // When an instance is already registered in the ring with a value of 0 it's NOT safe to // update the timestamp to "now" because it would break the contract, given the instance // was already registered before "now". If unknown (0), it should be left as is, and the // code will properly deal with that. RegisteredTimestamp int64 `protobuf:"varint,8,opt,name=registered_timestamp,json=registeredTimestamp,proto3" json:"registered_timestamp,omitempty"` }
func MakeBuffersForGet ¶ added in v1.7.0
func MakeBuffersForGet() (bufDescs []InstanceDesc, bufHosts []string, bufZones map[string]int)
MakeBuffersForGet returns buffers to use with Ring.Get().
func (*InstanceDesc) Descriptor ¶ added in v1.8.0
func (*InstanceDesc) Descriptor() ([]byte, []int)
func (*InstanceDesc) Equal ¶ added in v1.8.0
func (this *InstanceDesc) Equal(that interface{}) bool
func (*InstanceDesc) GetAddr ¶ added in v1.8.0
func (m *InstanceDesc) GetAddr() string
func (*InstanceDesc) GetRegisteredAt ¶ added in v1.8.0
func (i *InstanceDesc) GetRegisteredAt() time.Time
GetRegisteredAt returns the timestamp when the instance has been registered to the ring or a zero value if unknown.
func (*InstanceDesc) GetRegisteredTimestamp ¶ added in v1.8.0
func (m *InstanceDesc) GetRegisteredTimestamp() int64
func (*InstanceDesc) GetState ¶ added in v1.8.0
func (m *InstanceDesc) GetState() InstanceState
func (*InstanceDesc) GetTimestamp ¶ added in v1.8.0
func (m *InstanceDesc) GetTimestamp() int64
func (*InstanceDesc) GetTokens ¶ added in v1.8.0
func (m *InstanceDesc) GetTokens() []uint32
func (*InstanceDesc) GetZone ¶ added in v1.8.0
func (m *InstanceDesc) GetZone() string
func (*InstanceDesc) GoString ¶ added in v1.8.0
func (this *InstanceDesc) GoString() string
func (*InstanceDesc) IsHeartbeatHealthy ¶ added in v1.11.0
func (i *InstanceDesc) IsHeartbeatHealthy(heartbeatTimeout time.Duration, storageLastUpdated time.Time) bool
IsHeartbeatHealthy returns whether the heartbeat timestamp for the ingester is within the specified timeout period. A timeout of zero disables the timeout; the heartbeat is ignored.
func (*InstanceDesc) IsReady ¶ added in v1.13.0
IsReady returns no error if the instance is ACTIVE and healthy.
func (*InstanceDesc) Marshal ¶ added in v1.8.0
func (m *InstanceDesc) Marshal() (dAtA []byte, err error)
func (*InstanceDesc) MarshalTo ¶ added in v1.8.0
func (m *InstanceDesc) MarshalTo(dAtA []byte) (int, error)
func (*InstanceDesc) MarshalToSizedBuffer ¶ added in v1.8.0
func (m *InstanceDesc) MarshalToSizedBuffer(dAtA []byte) (int, error)
func (*InstanceDesc) ProtoMessage ¶ added in v1.8.0
func (*InstanceDesc) ProtoMessage()
func (*InstanceDesc) Reset ¶ added in v1.8.0
func (m *InstanceDesc) Reset()
func (*InstanceDesc) Size ¶ added in v1.8.0
func (m *InstanceDesc) Size() (n int)
func (*InstanceDesc) String ¶ added in v1.8.0
func (this *InstanceDesc) String() string
func (*InstanceDesc) Unmarshal ¶ added in v1.8.0
func (m *InstanceDesc) Unmarshal(dAtA []byte) error
func (*InstanceDesc) XXX_DiscardUnknown ¶ added in v1.8.0
func (m *InstanceDesc) XXX_DiscardUnknown()
func (*InstanceDesc) XXX_Marshal ¶ added in v1.8.0
func (m *InstanceDesc) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*InstanceDesc) XXX_Merge ¶ added in v1.8.0
func (m *InstanceDesc) XXX_Merge(src proto.Message)
func (*InstanceDesc) XXX_Size ¶ added in v1.8.0
func (m *InstanceDesc) XXX_Size() int
func (*InstanceDesc) XXX_Unmarshal ¶ added in v1.8.0
func (m *InstanceDesc) XXX_Unmarshal(b []byte) error
type InstanceState ¶ added in v1.9.0
type InstanceState int32
const ( ACTIVE InstanceState = 0 LEAVING InstanceState = 1 PENDING InstanceState = 2 JOINING InstanceState = 3 // This state is only used by gossiping code to distribute information about // instances that have been removed from the ring. Ring users should not use it directly. LEFT InstanceState = 4 )
func (InstanceState) EnumDescriptor ¶ added in v1.9.0
func (InstanceState) EnumDescriptor() ([]byte, []int)
func (InstanceState) String ¶ added in v1.9.0
func (x InstanceState) String() string
type LeaveOnStoppingDelegate ¶ added in v1.1.0
type LeaveOnStoppingDelegate struct {
// contains filtered or unexported fields
}
func NewLeaveOnStoppingDelegate ¶ added in v1.1.0
func NewLeaveOnStoppingDelegate(next BasicLifecyclerDelegate, logger log.Logger) *LeaveOnStoppingDelegate
func (*LeaveOnStoppingDelegate) OnRingInstanceHeartbeat ¶ added in v1.1.0
func (d *LeaveOnStoppingDelegate) OnRingInstanceHeartbeat(lifecycler *BasicLifecycler, ringDesc *Desc, instanceDesc *InstanceDesc)
func (*LeaveOnStoppingDelegate) OnRingInstanceRegister ¶ added in v1.1.0
func (d *LeaveOnStoppingDelegate) OnRingInstanceRegister(lifecycler *BasicLifecycler, ringDesc Desc, instanceExists bool, instanceID string, instanceDesc InstanceDesc) (InstanceState, Tokens)
func (*LeaveOnStoppingDelegate) OnRingInstanceStopping ¶ added in v1.1.0
func (d *LeaveOnStoppingDelegate) OnRingInstanceStopping(lifecycler *BasicLifecycler)
func (*LeaveOnStoppingDelegate) OnRingInstanceTokens ¶ added in v1.1.0
func (d *LeaveOnStoppingDelegate) OnRingInstanceTokens(lifecycler *BasicLifecycler, tokens Tokens)
type Lifecycler ¶
type Lifecycler struct { *services.BasicService KVStore kv.Client // These values are initialised at startup, and never change ID string Addr string RingName string RingKey string Zone string // contains filtered or unexported fields }
Lifecycler is responsible for managing the lifecycle of entries in the ring.
func NewLifecycler ¶
func NewLifecycler( cfg LifecyclerConfig, flushTransferer FlushTransferer, ringName, ringKey string, autoJoinOnStartup, flushOnShutdown bool, logger log.Logger, reg prometheus.Registerer, ) (*Lifecycler, error)
NewLifecycler creates new Lifecycler. It must be started via StartAsync.
func (*Lifecycler) ChangeState ¶
func (i *Lifecycler) ChangeState(ctx context.Context, state InstanceState) error
ChangeState of the ingester, for use off of the loop() goroutine.
func (*Lifecycler) CheckReady ¶
func (i *Lifecycler) CheckReady(ctx context.Context) error
CheckReady is used to rate limit the number of ingesters that can be coming or going at any one time, by only returning true if all ingesters are active. The state latches: once we have gone ready we don't go un-ready
func (*Lifecycler) ClaimTokensFor ¶
func (i *Lifecycler) ClaimTokensFor(ctx context.Context, ingesterID string) error
ClaimTokensFor takes all the tokens for the supplied ingester and assigns them to this ingester.
For this method to work correctly (especially when using gossiping), source ingester (specified by ingesterID) must be in the LEAVING state, otherwise ring's merge function may detect token conflict and assign token to the wrong ingester. While we could check for that state here, when this method is called, transfers have already finished -- it's better to check for this *before* transfers start.
func (*Lifecycler) FlushOnShutdown ¶ added in v0.6.0
func (i *Lifecycler) FlushOnShutdown() bool
FlushOnShutdown returns if flushing is enabled if transfer fails on a shutdown.
func (*Lifecycler) GetState ¶
func (i *Lifecycler) GetState() InstanceState
GetState returns the state of this ingester.
func (*Lifecycler) HealthyInstancesCount ¶ added in v0.4.0
func (i *Lifecycler) HealthyInstancesCount() int
HealthyInstancesCount returns the number of healthy instances for the Write operation in the ring, updated during the last heartbeat period.
func (*Lifecycler) Join ¶ added in v1.14.0
func (i *Lifecycler) Join()
Join trigger the instance to join the ring, if autoJoinOnStartup is set to false.
func (*Lifecycler) RenewTokens ¶ added in v1.18.0
func (i *Lifecycler) RenewTokens(ratio float64, ctx context.Context)
func (*Lifecycler) SetFlushOnShutdown ¶ added in v0.6.0
func (i *Lifecycler) SetFlushOnShutdown(flushOnShutdown bool)
SetFlushOnShutdown enables/disables flush on shutdown if transfer fails. Passing 'true' enables it, and 'false' disabled it.
func (*Lifecycler) SetUnregisterOnShutdown ¶ added in v1.6.0
func (i *Lifecycler) SetUnregisterOnShutdown(enabled bool)
SetUnregisterOnShutdown enables/disables unregistering on shutdown.
func (*Lifecycler) ShouldUnregisterOnShutdown ¶ added in v1.6.0
func (i *Lifecycler) ShouldUnregisterOnShutdown() bool
ShouldUnregisterOnShutdown returns if unregistering should be skipped on shutdown.
func (*Lifecycler) Zones ¶ added in v1.18.0
func (i *Lifecycler) Zones() []string
Zones returns the zones for which there's at least 1 instance registered in the ring.
func (*Lifecycler) ZonesCount ¶ added in v1.5.0
func (i *Lifecycler) ZonesCount() int
ZonesCount returns the number of zones for which there's at least 1 instance registered in the ring.
type LifecyclerConfig ¶
type LifecyclerConfig struct { RingConfig Config `yaml:"ring"` // Config for the ingester lifecycle control NumTokens int `yaml:"num_tokens"` TokensGeneratorStrategy string `yaml:"tokens_generator_strategy"` HeartbeatPeriod time.Duration `yaml:"heartbeat_period"` ObservePeriod time.Duration `yaml:"observe_period"` JoinAfter time.Duration `yaml:"join_after"` MinReadyDuration time.Duration `yaml:"min_ready_duration"` InfNames []string `yaml:"interface_names"` FinalSleep time.Duration `yaml:"final_sleep"` TokensFilePath string `yaml:"tokens_file_path"` Zone string `yaml:"availability_zone"` UnregisterOnShutdown bool `yaml:"unregister_on_shutdown"` ReadinessCheckRingHealth bool `yaml:"readiness_check_ring_health"` // For testing, you can override the address and ID of this ingester Addr string `yaml:"address" doc:"hidden"` Port int `doc:"hidden"` ID string `doc:"hidden"` // Injected internally ListenPort int `yaml:"-"` }
LifecyclerConfig is the config to build a Lifecycler.
func (*LifecyclerConfig) RegisterFlags ¶
func (cfg *LifecyclerConfig) RegisterFlags(f *flag.FlagSet)
RegisterFlags adds the flags required to config this to the given FlagSet
func (*LifecyclerConfig) RegisterFlagsWithPrefix ¶
func (cfg *LifecyclerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)
RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet.
func (*LifecyclerConfig) Validate ¶ added in v1.17.0
func (cfg *LifecyclerConfig) Validate() error
type LifecyclerMetrics ¶ added in v1.13.0
type LifecyclerMetrics struct {
// contains filtered or unexported fields
}
func NewLifecyclerMetrics ¶ added in v1.13.0
func NewLifecyclerMetrics(ringName string, reg prometheus.Registerer) *LifecyclerMetrics
type MinimizeSpreadTokenGenerator ¶ added in v1.17.0
type MinimizeSpreadTokenGenerator struct {
// contains filtered or unexported fields
}
func (*MinimizeSpreadTokenGenerator) GenerateTokens ¶ added in v1.17.0
func (g *MinimizeSpreadTokenGenerator) GenerateTokens(ring *Desc, id, zone string, numTokens int, force bool) []uint32
GenerateTokens try to place nearly generated tokens on the optimal position given the existing ingesters in the ring. In order to do so, order all the existing ingester on the ring based on its ownership (by az), and start to create new tokens in order to balance out the ownership amongst all ingesters.
type NoopFlushTransferer ¶ added in v0.6.0
type NoopFlushTransferer struct{}
NoopFlushTransferer is a FlushTransferer which does nothing and can be used in cases we don't need one
func NewNoopFlushTransferer ¶ added in v0.6.0
func NewNoopFlushTransferer() *NoopFlushTransferer
NewNoopFlushTransferer makes a new NoopFlushTransferer
func (*NoopFlushTransferer) Flush ¶ added in v0.6.0
func (t *NoopFlushTransferer) Flush()
Flush is a noop
type Operation ¶
type Operation uint32
Operation describes which instances can be included in the replica set, based on their state.
Implemented as bitmap, with upper 16-bits used for encoding extendReplicaSet, and lower 16-bits used for encoding healthy states.
func NewOp ¶ added in v1.7.0
func NewOp(healthyStates []InstanceState, shouldExtendReplicaSet func(s InstanceState) bool) Operation
NewOp constructs new Operation with given "healthy" states for operation, and optional function to extend replica set. Result of calling shouldExtendReplicaSet is cached.
func (Operation) IsInstanceInStateHealthy ¶ added in v1.7.0
func (op Operation) IsInstanceInStateHealthy(s InstanceState) bool
IsInstanceInStateHealthy is used during "filtering" phase to remove undesired instances based on their state.
func (Operation) ShouldExtendReplicaSetOnState ¶ added in v1.7.0
func (op Operation) ShouldExtendReplicaSetOnState(s InstanceState) bool
ShouldExtendReplicaSetOnState returns true if given a state of instance that's going to be added to the replica set, the replica set size should be extended by 1 more instance for the given operation.
type RandomTokenGenerator ¶ added in v1.17.0
type RandomTokenGenerator struct{}
func (*RandomTokenGenerator) GenerateTokens ¶ added in v1.17.0
type ReadRing ¶
type ReadRing interface { // Get returns n (or more) instances which form the replicas for the given key. // bufDescs, bufHosts and bufZones are slices to be overwritten for the return value // to avoid memory allocation; can be nil, or created with ring.MakeBuffersForGet(). Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts []string, bufZones map[string]int) (ReplicationSet, error) // GetAllHealthy returns all healthy instances in the ring, for the given operation. // This function doesn't check if the quorum is honored, so doesn't fail if the number // of unhealthy instances is greater than the tolerated max unavailable. GetAllHealthy(op Operation) (ReplicationSet, error) // GetAllInstanceDescs returns a slice of healthy and unhealthy InstanceDesc. GetAllInstanceDescs(op Operation) ([]InstanceDesc, []InstanceDesc, error) // GetInstanceDescsForOperation returns map of InstanceDesc with instance ID as the keys. GetInstanceDescsForOperation(op Operation) (map[string]InstanceDesc, error) // GetReplicationSetForOperation returns all instances where the input operation should be executed. // The resulting ReplicationSet doesn't necessarily contains all healthy instances // in the ring, but could contain the minimum set of instances required to execute // the input operation. GetReplicationSetForOperation(op Operation) (ReplicationSet, error) ReplicationFactor() int // InstancesCount returns the number of instances in the ring. InstancesCount() int // ShuffleShard returns a subring for the provided identifier (eg. a tenant ID) // and size (number of instances). ShuffleShard(identifier string, size int) ReadRing // ShuffleShardWithZoneStability does the same as ShuffleShard but using a different shuffle sharding algorithm. // It doesn't round up shard size to be divisible to number of zones and make sure when scaling up/down one // shard size at a time, at most 1 instance can be changed. // It is only used in Store Gateway for now. ShuffleShardWithZoneStability(identifier string, size int) ReadRing // GetInstanceState returns the current state of an instance or an error if the // instance does not exist in the ring. GetInstanceState(instanceID string) (InstanceState, error) // GetInstanceIdByAddr returns the instance id from its address or an error if the // // instance does not exist in the ring. GetInstanceIdByAddr(addr string) (string, error) // ShuffleShardWithLookback is like ShuffleShard() but the returned subring includes // all instances that have been part of the identifier's shard since "now - lookbackPeriod". ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) ReadRing // HasInstance returns whether the ring contains an instance matching the provided instanceID. HasInstance(instanceID string) bool // CleanupShuffleShardCache should delete cached shuffle-shard subrings for given identifier. CleanupShuffleShardCache(identifier string) }
ReadRing represents the read interface to the ring.
type ReplicationSet ¶
type ReplicationSet struct { Instances []InstanceDesc // Maximum number of tolerated failing instances. Max errors and max unavailable zones are // mutually exclusive. MaxErrors int // max errors are mutually exclusive. MaxUnavailableZones int }
ReplicationSet describes the instances to talk to for a given key, and how many errors to tolerate.
func (ReplicationSet) Do ¶
func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, zoneResultsQuorum bool, f func(context.Context, *InstanceDesc) (interface{}, error)) ([]interface{}, error)
Do function f in parallel for all replicas in the set, erroring is we exceed MaxErrors and returning early otherwise. zoneResultsQuorum allows only include results from zones that already reach quorum to improve performance.
func (ReplicationSet) GetAddresses ¶ added in v1.5.0
func (r ReplicationSet) GetAddresses() []string
GetAddresses returns the addresses of all instances within the replication set. Returned slice order is not guaranteed.
func (ReplicationSet) GetAddressesWithout ¶ added in v1.9.0
func (r ReplicationSet) GetAddressesWithout(exclude string) []string
GetAddressesWithout returns the addresses of all instances within the replication set while excluding the specified address. Returned slice order is not guaranteed.
func (ReplicationSet) GetNumOfZones ¶ added in v1.16.0
func (r ReplicationSet) GetNumOfZones() int
GetNumOfZones returns number of distinct zones.
func (ReplicationSet) Includes ¶ added in v1.1.0
func (r ReplicationSet) Includes(addr string) bool
Includes returns whether the replication set includes the replica with the provided addr.
type ReplicationStrategy ¶ added in v1.1.0
type ReplicationStrategy interface { // Filter out unhealthy instances and checks if there're enough instances // for an operation to succeed. Returns an error if there are not enough // instances. Filter(instances []InstanceDesc, op Operation, replicationFactor int, heartbeatTimeout time.Duration, zoneAwarenessEnabled bool, storageLastUpdate time.Time) (healthy []InstanceDesc, maxFailures int, err error) }
func NewDefaultReplicationStrategy ¶ added in v1.6.0
func NewDefaultReplicationStrategy() ReplicationStrategy
func NewIgnoreUnhealthyInstancesReplicationStrategy ¶ added in v1.7.0
func NewIgnoreUnhealthyInstancesReplicationStrategy() ReplicationStrategy
type Ring ¶
Ring holds the information about the members of the consistent hash ring.
func New ¶
func New(cfg Config, name, key string, logger log.Logger, reg prometheus.Registerer) (*Ring, error)
New creates a new Ring. Being a service, Ring needs to be started to do anything.
func NewWithStoreClientAndStrategy ¶ added in v1.1.0
func NewWithStoreClientAndStrategy(cfg Config, name, key string, store kv.Client, strategy ReplicationStrategy, reg prometheus.Registerer, logger log.Logger) (*Ring, error)
func (*Ring) CleanupShuffleShardCache ¶ added in v1.8.0
func (*Ring) Get ¶
func (r *Ring) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts []string, bufZones map[string]int) (ReplicationSet, error)
Get returns n (or more) instances which form the replicas for the given key. This implementation guarantees: - Stability: given the same ring, two invocations returns the same set for same operation. - Consistency: adding/removing 1 instance from the ring returns set with no more than 1 difference for same operation.
func (*Ring) GetAllHealthy ¶ added in v1.6.0
func (r *Ring) GetAllHealthy(op Operation) (ReplicationSet, error)
GetAllHealthy implements ReadRing.
func (*Ring) GetAllInstanceDescs ¶ added in v1.17.0
func (r *Ring) GetAllInstanceDescs(op Operation) ([]InstanceDesc, []InstanceDesc, error)
GetAllInstanceDescs implements ReadRing.
func (*Ring) GetInstanceDescsForOperation ¶ added in v1.16.0
func (r *Ring) GetInstanceDescsForOperation(op Operation) (map[string]InstanceDesc, error)
GetInstanceDescsForOperation implements ReadRing.
func (*Ring) GetInstanceIdByAddr ¶ added in v1.18.0
GetInstanceIdByAddr implements ReadRing.
func (*Ring) GetInstanceState ¶ added in v1.1.0
func (r *Ring) GetInstanceState(instanceID string) (InstanceState, error)
GetInstanceState returns the current state of an instance or an error if the instance does not exist in the ring.
func (*Ring) GetReplicationSetForOperation ¶ added in v1.6.0
func (r *Ring) GetReplicationSetForOperation(op Operation) (ReplicationSet, error)
GetReplicationSetForOperation implements ReadRing.
func (*Ring) HasInstance ¶ added in v1.4.0
HasInstance returns whether the ring contains an instance matching the provided instanceID.
func (*Ring) InstancesCount ¶ added in v1.8.0
InstancesCount returns the number of instances in the ring.
func (*Ring) ReplicationFactor ¶
ReplicationFactor of the ring.
func (*Ring) ShuffleShard ¶ added in v1.5.0
ShuffleShard returns a subring for the provided identifier (eg. a tenant ID) and size (number of instances). The size is expected to be a multiple of the number of zones and the returned subring will contain the same number of instances per zone as far as there are enough registered instances in the ring.
The algorithm used to build the subring is a shuffle sharder based on probabilistic hashing. We treat each zone as a separate ring and pick N unique replicas from each zone, walking the ring starting from random but predictable numbers. The random generator is initialised with a seed based on the provided identifier.
This implementation guarantees:
- Stability: given the same ring, two invocations returns the same result.
- Consistency: adding/removing 1 instance from the ring generates a resulting subring with no more than 1 difference.
- Shuffling: probabilistically, for a large enough cluster each identifier gets a different set of instances, with a reduced number of overlapping instances between two identifiers.
func (*Ring) ShuffleShardWithLookback ¶ added in v1.5.0
func (r *Ring) ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) ReadRing
ShuffleShardWithLookback is like ShuffleShard() but the returned subring includes all instances that have been part of the identifier's shard since "now - lookbackPeriod".
The returned subring may be unbalanced with regard to zones and should never be used for write operations (read only).
This function doesn't support caching.
type TokenGenerator ¶ added in v1.17.0
type TokenGenerator interface { // GenerateTokens make numTokens unique random tokens, none of which clash // with takenTokens. Generated tokens are sorted. // GenerateTokens can return any number of token between 0 and numTokens if force is set to false. // If force is set to true, all tokens needs to be generated GenerateTokens(ring *Desc, id, zone string, numTokens int, force bool) []uint32 }
func NewMinimizeSpreadTokenGenerator ¶ added in v1.17.0
func NewMinimizeSpreadTokenGenerator() TokenGenerator
func NewRandomTokenGenerator ¶ added in v1.17.0
func NewRandomTokenGenerator() TokenGenerator
type Tokens ¶ added in v0.6.0
type Tokens []uint32
Tokens is a simple list of tokens.
func LoadTokensFromFile ¶ added in v0.6.0
LoadTokensFromFile loads tokens from given file path.
func (Tokens) Equals ¶ added in v1.1.0
Equals returns whether the tokens are equal to the input ones.
func (Tokens) StoreToFile ¶ added in v0.6.0
StoreToFile stores the tokens in the given directory.
type TokensHeap ¶ added in v1.7.0
type TokensHeap [][]uint32
TokensHeap is an heap data structure used to merge multiple lists of sorted tokens into a single one.
func (TokensHeap) Len ¶ added in v1.7.0
func (h TokensHeap) Len() int
func (TokensHeap) Less ¶ added in v1.7.0
func (h TokensHeap) Less(i, j int) bool
func (*TokensHeap) Pop ¶ added in v1.7.0
func (h *TokensHeap) Pop() interface{}
func (*TokensHeap) Push ¶ added in v1.7.0
func (h *TokensHeap) Push(x interface{})
func (TokensHeap) Swap ¶ added in v1.7.0
func (h TokensHeap) Swap(i, j int)
type TokensPersistencyDelegate ¶ added in v1.1.0
type TokensPersistencyDelegate struct {
// contains filtered or unexported fields
}
func NewTokensPersistencyDelegate ¶ added in v1.1.0
func NewTokensPersistencyDelegate(path string, state InstanceState, next BasicLifecyclerDelegate, logger log.Logger) *TokensPersistencyDelegate
func (*TokensPersistencyDelegate) OnRingInstanceHeartbeat ¶ added in v1.1.0
func (d *TokensPersistencyDelegate) OnRingInstanceHeartbeat(lifecycler *BasicLifecycler, ringDesc *Desc, instanceDesc *InstanceDesc)
func (*TokensPersistencyDelegate) OnRingInstanceRegister ¶ added in v1.1.0
func (d *TokensPersistencyDelegate) OnRingInstanceRegister(lifecycler *BasicLifecycler, ringDesc Desc, instanceExists bool, instanceID string, instanceDesc InstanceDesc) (InstanceState, Tokens)
func (*TokensPersistencyDelegate) OnRingInstanceStopping ¶ added in v1.1.0
func (d *TokensPersistencyDelegate) OnRingInstanceStopping(lifecycler *BasicLifecycler)
func (*TokensPersistencyDelegate) OnRingInstanceTokens ¶ added in v1.1.0
func (d *TokensPersistencyDelegate) OnRingInstanceTokens(lifecycler *BasicLifecycler, tokens Tokens)