Documentation ¶
Index ¶
- Constants
- Variables
- func DoBatch(ctx context.Context, r ReadRing, keys []uint32, ...) error
- func GenerateTokens(numTokens int, takenTokens []uint32) []uint32
- func GetCodec() codec.Codec
- func GetInstanceAddr(configAddr string, netInterfaces []string) (string, error)
- func GetInstancePort(configPort, listenPort int) int
- func HasReplicationSetChanged(before, after ReplicationSet) bool
- func ProtoDescFactory() proto.Message
- func WaitInstanceState(ctx context.Context, r *Ring, instanceID string, state IngesterState) error
- type AutoForgetDelegate
- func (d *AutoForgetDelegate) OnRingInstanceHeartbeat(lifecycler *BasicLifecycler, ringDesc *Desc, instanceDesc *IngesterDesc)
- func (d *AutoForgetDelegate) OnRingInstanceRegister(lifecycler *BasicLifecycler, ringDesc Desc, instanceExists bool, ...) (IngesterState, Tokens)
- func (d *AutoForgetDelegate) OnRingInstanceStopping(lifecycler *BasicLifecycler)
- func (d *AutoForgetDelegate) OnRingInstanceTokens(lifecycler *BasicLifecycler, tokens Tokens)
- type BasicLifecycler
- func (l *BasicLifecycler) ChangeState(ctx context.Context, state IngesterState) error
- func (l *BasicLifecycler) GetInstanceAddr() string
- func (l *BasicLifecycler) GetInstanceID() string
- func (l *BasicLifecycler) GetInstanceZone() string
- func (l *BasicLifecycler) GetRegisteredAt() time.Time
- func (l *BasicLifecycler) GetState() IngesterState
- func (l *BasicLifecycler) GetTokens() Tokens
- func (l *BasicLifecycler) IsRegistered() bool
- type BasicLifecyclerConfig
- type BasicLifecyclerDelegate
- type BasicLifecyclerMetrics
- type ByAddr
- type ByToken
- type CompareResult
- type Config
- type DefaultReplicationStrategy
- type Desc
- func (d *Desc) AddIngester(id, addr, zone string, tokens []uint32, state IngesterState, ...) IngesterDesc
- func (d *Desc) ClaimTokens(from, to string) Tokens
- func (*Desc) Descriptor() ([]byte, []int)
- func (this *Desc) Equal(that interface{}) bool
- func (d *Desc) FindIngestersByState(state IngesterState) []IngesterDesc
- func (m *Desc) GetIngesters() map[string]IngesterDesc
- func (this *Desc) GoString() string
- func (m *Desc) Marshal() (dAtA []byte, err error)
- func (m *Desc) MarshalTo(dAtA []byte) (int, error)
- func (m *Desc) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (d *Desc) Merge(mergeable memberlist.Mergeable, localCAS bool) (memberlist.Mergeable, error)
- func (d *Desc) MergeContent() []string
- func (*Desc) ProtoMessage()
- func (d *Desc) Ready(now time.Time, heartbeatTimeout time.Duration) error
- func (d *Desc) RemoveIngester(id string)
- func (d *Desc) RemoveTombstones(limit time.Time)
- func (m *Desc) Reset()
- func (d *Desc) RingCompare(o *Desc) CompareResult
- func (m *Desc) Size() (n int)
- func (this *Desc) String() string
- func (d *Desc) TokensFor(id string) (tokens, other Tokens)
- func (m *Desc) Unmarshal(dAtA []byte) error
- func (m *Desc) XXX_DiscardUnknown()
- func (m *Desc) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *Desc) XXX_Merge(src proto.Message)
- func (m *Desc) XXX_Size() int
- func (m *Desc) XXX_Unmarshal(b []byte) error
- type FlushTransferer
- type IngesterDesc
- func (*IngesterDesc) Descriptor() ([]byte, []int)
- func (this *IngesterDesc) Equal(that interface{}) bool
- func (m *IngesterDesc) GetAddr() string
- func (i *IngesterDesc) GetRegisteredAt() time.Time
- func (m *IngesterDesc) GetRegisteredTimestamp() int64
- func (m *IngesterDesc) GetState() IngesterState
- func (m *IngesterDesc) GetTimestamp() int64
- func (m *IngesterDesc) GetTokens() []uint32
- func (m *IngesterDesc) GetZone() string
- func (this *IngesterDesc) GoString() string
- func (i *IngesterDesc) IsHealthy(op Operation, heartbeatTimeout time.Duration) bool
- func (m *IngesterDesc) Marshal() (dAtA []byte, err error)
- func (m *IngesterDesc) MarshalTo(dAtA []byte) (int, error)
- func (m *IngesterDesc) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*IngesterDesc) ProtoMessage()
- func (m *IngesterDesc) Reset()
- func (m *IngesterDesc) Size() (n int)
- func (this *IngesterDesc) String() string
- func (m *IngesterDesc) Unmarshal(dAtA []byte) error
- func (m *IngesterDesc) XXX_DiscardUnknown()
- func (m *IngesterDesc) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *IngesterDesc) XXX_Merge(src proto.Message)
- func (m *IngesterDesc) XXX_Size() int
- func (m *IngesterDesc) XXX_Unmarshal(b []byte) error
- type IngesterState
- type LeaveOnStoppingDelegate
- func (d *LeaveOnStoppingDelegate) OnRingInstanceHeartbeat(lifecycler *BasicLifecycler, ringDesc *Desc, instanceDesc *IngesterDesc)
- func (d *LeaveOnStoppingDelegate) OnRingInstanceRegister(lifecycler *BasicLifecycler, ringDesc Desc, instanceExists bool, ...) (IngesterState, Tokens)
- func (d *LeaveOnStoppingDelegate) OnRingInstanceStopping(lifecycler *BasicLifecycler)
- func (d *LeaveOnStoppingDelegate) OnRingInstanceTokens(lifecycler *BasicLifecycler, tokens Tokens)
- type Lifecycler
- func (i *Lifecycler) ChangeState(ctx context.Context, state IngesterState) error
- func (i *Lifecycler) CheckReady(ctx context.Context) error
- func (i *Lifecycler) ClaimTokensFor(ctx context.Context, ingesterID string) error
- func (i *Lifecycler) FlushOnShutdown() bool
- func (i *Lifecycler) GetState() IngesterState
- func (i *Lifecycler) HealthyInstancesCount() int
- func (i *Lifecycler) SetFlushOnShutdown(flushOnShutdown bool)
- func (i *Lifecycler) ZonesCount() int
- type LifecyclerConfig
- type NoopFlushTransferer
- type Operation
- type ReadRing
- type ReplicationSet
- type ReplicationStrategy
- type Ring
- func (r *Ring) Collect(ch chan<- prometheus.Metric)
- func (r *Ring) Describe(ch chan<- *prometheus.Desc)
- func (r *Ring) Get(key uint32, op Operation, buf []IngesterDesc) (ReplicationSet, error)
- func (r *Ring) GetAll(op Operation) (ReplicationSet, error)
- func (r *Ring) GetInstanceState(instanceID string) (IngesterState, error)
- func (r *Ring) HasInstance(instanceID string) bool
- func (r *Ring) IngesterCount() int
- func (r *Ring) IsHealthy(ingester *IngesterDesc, op Operation) bool
- func (r *Ring) ReplicationFactor() int
- func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request)
- func (r *Ring) ShuffleShard(identifier string, size int) ReadRing
- func (r *Ring) ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) ReadRing
- type TokenDesc
- type Tokens
- type TokensPersistencyDelegate
- func (d *TokensPersistencyDelegate) OnRingInstanceHeartbeat(lifecycler *BasicLifecycler, ringDesc *Desc, instanceDesc *IngesterDesc)
- func (d *TokensPersistencyDelegate) OnRingInstanceRegister(lifecycler *BasicLifecycler, ringDesc Desc, instanceExists bool, ...) (IngesterState, Tokens)
- func (d *TokensPersistencyDelegate) OnRingInstanceStopping(lifecycler *BasicLifecycler)
- func (d *TokensPersistencyDelegate) OnRingInstanceTokens(lifecycler *BasicLifecycler, tokens Tokens)
Constants ¶
const ( // IngesterRingKey is the key under which we store the ingesters ring in the KVStore. IngesterRingKey = "ring" // RulerRingKey is the key under which we store the rulers ring in the KVStore. RulerRingKey = "ring" // DistributorRingKey is the key under which we store the distributors ring in the KVStore. DistributorRingKey = "distributor" // CompactorRingKey is the key under which we store the compactors ring in the KVStore. CompactorRingKey = "compactor" )
Variables ¶
var ( // ErrEmptyRing is the error returned when trying to get an element when nothing has been added to hash. ErrEmptyRing = errors.New("empty ring") // ErrInstanceNotFound is the error returned when trying to get information for an instance // not registered within the ring. ErrInstanceNotFound = errors.New("instance not found in the ring") )
var ( ErrInvalidLengthRing = fmt.Errorf("proto: negative length found during unmarshaling") ErrIntOverflowRing = fmt.Errorf("proto: integer overflow") )
var ErrTransferDisabled = errors.New("transfers disabled")
ErrTransferDisabled is the error returned by TransferOut when the transfers are disabled.
var IngesterState_name = map[int32]string{
0: "ACTIVE",
1: "LEAVING",
2: "PENDING",
3: "JOINING",
4: "LEFT",
}
var IngesterState_value = map[string]int32{
"ACTIVE": 0,
"LEAVING": 1,
"PENDING": 2,
"JOINING": 3,
"LEFT": 4,
}
Functions ¶
func DoBatch ¶
func DoBatch(ctx context.Context, r ReadRing, keys []uint32, callback func(IngesterDesc, []int) error, cleanup func()) error
DoBatch request against a set of keys in the ring, handling replication and failures. For example if we want to write N items where they may all hit different ingesters, and we want them all replicated R ways with quorum writes, we track the relationship between batch RPCs and the items within them.
Callback is passed the ingester to target, and the indexes of the keys to send to that ingester.
Not implemented as a method on Ring so we can test separately.
func GenerateTokens ¶
GenerateTokens make numTokens unique random tokens, none of which clash with takenTokens.
func GetInstanceAddr ¶
GetInstanceAddr returns the address to use to register the instance in the ring.
func GetInstancePort ¶
GetInstancePort returns the port to use to register the instance in the ring.
func HasReplicationSetChanged ¶
func HasReplicationSetChanged(before, after ReplicationSet) bool
HasReplicationSetChanged returns true if two replications sets are the same (with possibly different timestamps), false if they differ in any way (number of instances, instance states, tokens, zones, ...).
func WaitInstanceState ¶
WaitInstanceState waits until the input instanceID is registered within the ring matching the provided state. A timeout should be provided within the context.
Types ¶
type AutoForgetDelegate ¶
type AutoForgetDelegate struct {
// contains filtered or unexported fields
}
AutoForgetDelegate automatically remove an instance from the ring if the last heartbeat is older than a configured period.
func NewAutoForgetDelegate ¶
func NewAutoForgetDelegate(forgetPeriod time.Duration, next BasicLifecyclerDelegate, logger log.Logger) *AutoForgetDelegate
func (*AutoForgetDelegate) OnRingInstanceHeartbeat ¶
func (d *AutoForgetDelegate) OnRingInstanceHeartbeat(lifecycler *BasicLifecycler, ringDesc *Desc, instanceDesc *IngesterDesc)
func (*AutoForgetDelegate) OnRingInstanceRegister ¶
func (d *AutoForgetDelegate) OnRingInstanceRegister(lifecycler *BasicLifecycler, ringDesc Desc, instanceExists bool, instanceID string, instanceDesc IngesterDesc) (IngesterState, Tokens)
func (*AutoForgetDelegate) OnRingInstanceStopping ¶
func (d *AutoForgetDelegate) OnRingInstanceStopping(lifecycler *BasicLifecycler)
func (*AutoForgetDelegate) OnRingInstanceTokens ¶
func (d *AutoForgetDelegate) OnRingInstanceTokens(lifecycler *BasicLifecycler, tokens Tokens)
type BasicLifecycler ¶
type BasicLifecycler struct { *services.BasicService // contains filtered or unexported fields }
BasicLifecycler is a basic ring lifecycler which allows to hook custom logic at different stages of the lifecycle. This lifecycler should be used to build higher level lifecyclers.
This lifecycler never change the instance state. It's the delegate responsibility to ChangeState().
func NewBasicLifecycler ¶
func NewBasicLifecycler(cfg BasicLifecyclerConfig, ringName, ringKey string, store kv.Client, delegate BasicLifecyclerDelegate, logger log.Logger, reg prometheus.Registerer) (*BasicLifecycler, error)
NewBasicLifecycler makes a new BasicLifecycler.
func (*BasicLifecycler) ChangeState ¶
func (l *BasicLifecycler) ChangeState(ctx context.Context, state IngesterState) error
func (*BasicLifecycler) GetInstanceAddr ¶
func (l *BasicLifecycler) GetInstanceAddr() string
func (*BasicLifecycler) GetInstanceID ¶
func (l *BasicLifecycler) GetInstanceID() string
func (*BasicLifecycler) GetInstanceZone ¶
func (l *BasicLifecycler) GetInstanceZone() string
func (*BasicLifecycler) GetRegisteredAt ¶
func (l *BasicLifecycler) GetRegisteredAt() time.Time
GetRegisteredAt returns the timestamp when the instance has been registered to the ring or a zero value if the lifecycler hasn't been started yet or was already registered and its timestamp is unknown.
func (*BasicLifecycler) GetState ¶
func (l *BasicLifecycler) GetState() IngesterState
func (*BasicLifecycler) GetTokens ¶
func (l *BasicLifecycler) GetTokens() Tokens
func (*BasicLifecycler) IsRegistered ¶
func (l *BasicLifecycler) IsRegistered() bool
IsRegistered returns whether the instance is currently registered within the ring.
type BasicLifecyclerConfig ¶
type BasicLifecyclerConfig struct { // ID is the instance unique ID. ID string // Addr is the instance address, in the form "address:port". Addr string // Zone is the instance availability zone. Can be an empty string // if zone awareness is unused. Zone string HeartbeatPeriod time.Duration TokensObservePeriod time.Duration NumTokens int }
type BasicLifecyclerDelegate ¶
type BasicLifecyclerDelegate interface { // OnRingInstanceRegister is called while the lifecycler is registering the // instance within the ring and should return the state and set of tokens to // use for the instance itself. OnRingInstanceRegister(lifecycler *BasicLifecycler, ringDesc Desc, instanceExists bool, instanceID string, instanceDesc IngesterDesc) (IngesterState, Tokens) // OnRingInstanceTokens is called once the instance tokens are set and are // stable within the ring (honoring the observe period, if set). OnRingInstanceTokens(lifecycler *BasicLifecycler, tokens Tokens) // OnRingInstanceStopping is called while the lifecycler is stopping. The lifecycler // will continue to hearbeat the ring the this function is executing and will proceed // to unregister the instance from the ring only after this function has returned. OnRingInstanceStopping(lifecycler *BasicLifecycler) // OnRingInstanceHeartbeat is called while the instance is updating its heartbeat // in the ring. OnRingInstanceHeartbeat(lifecycler *BasicLifecycler, ringDesc *Desc, instanceDesc *IngesterDesc) }
type BasicLifecyclerMetrics ¶
type BasicLifecyclerMetrics struct {
// contains filtered or unexported fields
}
func NewBasicLifecyclerMetrics ¶
func NewBasicLifecyclerMetrics(ringName string, reg prometheus.Registerer) *BasicLifecyclerMetrics
type CompareResult ¶
type CompareResult int
const ( Equal CompareResult = iota // Both rings contain same exact instances. EqualButStatesAndTimestamps // Both rings contain the same instances with the same data except states and timestamps (may differ). Different // Rings have different set of instances, or their information don't match. )
type Config ¶
type Config struct { KVStore kv.Config `yaml:"kvstore"` HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"` ReplicationFactor int `yaml:"replication_factor"` ZoneAwarenessEnabled bool `yaml:"zone_awareness_enabled"` }
Config for a Ring
func (*Config) RegisterFlags ¶
RegisterFlags adds the flags required to config this to the given FlagSet with a specified prefix
type DefaultReplicationStrategy ¶
type DefaultReplicationStrategy struct{}
func (*DefaultReplicationStrategy) Filter ¶
func (s *DefaultReplicationStrategy) Filter(ingesters []IngesterDesc, op Operation, replicationFactor int, heartbeatTimeout time.Duration, zoneAwarenessEnabled bool) ([]IngesterDesc, int, error)
Filter decides, given the set of ingesters eligible for a key, which ingesters you will try and write to and how many failures you will tolerate. - Filters out dead ingesters so the one doesn't even try to write to them. - Checks there is enough ingesters for an operation to succeed. The ingesters argument may be overwritten.
func (*DefaultReplicationStrategy) ShouldExtendReplicaSet ¶
func (s *DefaultReplicationStrategy) ShouldExtendReplicaSet(ingester IngesterDesc, op Operation) bool
type Desc ¶
type Desc struct {
Ingesters map[string]IngesterDesc `` /* 149-byte string literal not displayed */
}
func GetOrCreateRingDesc ¶
func GetOrCreateRingDesc(d interface{}) *Desc
func (*Desc) AddIngester ¶
func (d *Desc) AddIngester(id, addr, zone string, tokens []uint32, state IngesterState, registeredAt time.Time) IngesterDesc
AddIngester adds the given ingester to the ring. Ingester will only use supplied tokens, any other tokens are removed.
func (*Desc) ClaimTokens ¶
ClaimTokens transfers all the tokens from one ingester to another, returning the claimed token. This method assumes that Ring is in the correct state, 'to' ingester has no tokens anywhere. Tokens list must be sorted properly. If all of this is true, everything will be fine.
func (*Desc) Descriptor ¶
func (*Desc) FindIngestersByState ¶
func (d *Desc) FindIngestersByState(state IngesterState) []IngesterDesc
FindIngestersByState returns the list of ingesters in the given state
func (*Desc) GetIngesters ¶
func (m *Desc) GetIngesters() map[string]IngesterDesc
func (*Desc) Merge ¶
func (d *Desc) Merge(mergeable memberlist.Mergeable, localCAS bool) (memberlist.Mergeable, error)
Merge merges other ring into this one. Returns sub-ring that represents the change, and can be sent out to other clients.
This merge function depends on the timestamp of the ingester. For each ingester, it will choose more recent state from the two rings, and put that into this ring. There is one exception: we accept LEFT state even if Timestamp hasn't changed.
localCAS flag tells the merge that it can use incoming ring as a full state, and detect missing ingesters based on it. Ingesters from incoming ring will cause ingester to be marked as LEFT and gossiped about.
If multiple ingesters end up owning the same tokens, Merge will do token conflict resolution (see resolveConflicts).
This method is part of memberlist.Mergeable interface, and is only used by gossiping ring.
func (*Desc) MergeContent ¶
MergeContent describes content of this Mergeable. Ring simply returns list of ingesters that it includes.
func (*Desc) ProtoMessage ¶
func (*Desc) ProtoMessage()
func (*Desc) RemoveIngester ¶
RemoveIngester removes the given ingester and all its tokens.
func (*Desc) RemoveTombstones ¶
RemoveTombstones removes LEFT ingesters older than given time limit. If time limit is zero, remove all LEFT ingesters.
func (*Desc) RingCompare ¶
func (d *Desc) RingCompare(o *Desc) CompareResult
RingCompare compares this ring against another one and returns one of Equal, EqualButStatesAndTimestamps or Different.
func (*Desc) TokensFor ¶
TokensFor partitions the tokens into those for the given ID, and those for others.
func (*Desc) XXX_DiscardUnknown ¶
func (m *Desc) XXX_DiscardUnknown()
func (*Desc) XXX_Unmarshal ¶
type FlushTransferer ¶
FlushTransferer controls the shutdown of an instance in the ring. Methods on this interface are called when lifecycler is stopping. At that point, it no longer runs the "actor loop", but it keeps updating heartbeat in the ring. Ring entry is in LEAVING state. After calling TransferOut and then Flush, lifecycler stops.
type IngesterDesc ¶
type IngesterDesc struct { Addr string `protobuf:"bytes,1,opt,name=addr,proto3" json:"addr,omitempty"` // Unix timestamp (with seconds precision) of the last heartbeat sent // by this instance. Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"` State IngesterState `protobuf:"varint,3,opt,name=state,proto3,enum=ring.IngesterState" json:"state,omitempty"` Tokens []uint32 `protobuf:"varint,6,rep,packed,name=tokens,proto3" json:"tokens,omitempty"` Zone string `protobuf:"bytes,7,opt,name=zone,proto3" json:"zone,omitempty"` // Unix timestamp (with seconds precision) of when the instance has been registered // to the ring. This field has not been called "joined_timestamp" intentionally, in order // to not introduce any misunderstanding with the instance's "joining" state. // // This field is used to find out subset of instances that could have possibly owned a // specific token in the past. Because of this, it's important that either this timestamp // is set to the real time the instance has been registered to the ring or it's left // 0 (which means unknown). // // When an instance is already registered in the ring with a value of 0 it's NOT safe to // update the timestamp to "now" because it would break the contract, given the instance // was already registered before "now". If unknown (0), it should be left as is, and the // Cortex code will properly deal with that. RegisteredTimestamp int64 `protobuf:"varint,8,opt,name=registered_timestamp,json=registeredTimestamp,proto3" json:"registered_timestamp,omitempty"` }
func (*IngesterDesc) Descriptor ¶
func (*IngesterDesc) Descriptor() ([]byte, []int)
func (*IngesterDesc) Equal ¶
func (this *IngesterDesc) Equal(that interface{}) bool
func (*IngesterDesc) GetAddr ¶
func (m *IngesterDesc) GetAddr() string
func (*IngesterDesc) GetRegisteredAt ¶
func (i *IngesterDesc) GetRegisteredAt() time.Time
GetRegisteredAt returns the timestamp when the instance has been registered to the ring or a zero value if unknown.
func (*IngesterDesc) GetRegisteredTimestamp ¶
func (m *IngesterDesc) GetRegisteredTimestamp() int64
func (*IngesterDesc) GetState ¶
func (m *IngesterDesc) GetState() IngesterState
func (*IngesterDesc) GetTimestamp ¶
func (m *IngesterDesc) GetTimestamp() int64
func (*IngesterDesc) GetTokens ¶
func (m *IngesterDesc) GetTokens() []uint32
func (*IngesterDesc) GetZone ¶
func (m *IngesterDesc) GetZone() string
func (*IngesterDesc) GoString ¶
func (this *IngesterDesc) GoString() string
func (*IngesterDesc) IsHealthy ¶
func (i *IngesterDesc) IsHealthy(op Operation, heartbeatTimeout time.Duration) bool
IsHealthy checks whether the ingester appears to be alive and heartbeating
func (*IngesterDesc) Marshal ¶
func (m *IngesterDesc) Marshal() (dAtA []byte, err error)
func (*IngesterDesc) MarshalToSizedBuffer ¶
func (m *IngesterDesc) MarshalToSizedBuffer(dAtA []byte) (int, error)
func (*IngesterDesc) ProtoMessage ¶
func (*IngesterDesc) ProtoMessage()
func (*IngesterDesc) Reset ¶
func (m *IngesterDesc) Reset()
func (*IngesterDesc) Size ¶
func (m *IngesterDesc) Size() (n int)
func (*IngesterDesc) String ¶
func (this *IngesterDesc) String() string
func (*IngesterDesc) Unmarshal ¶
func (m *IngesterDesc) Unmarshal(dAtA []byte) error
func (*IngesterDesc) XXX_DiscardUnknown ¶
func (m *IngesterDesc) XXX_DiscardUnknown()
func (*IngesterDesc) XXX_Marshal ¶
func (m *IngesterDesc) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*IngesterDesc) XXX_Merge ¶
func (m *IngesterDesc) XXX_Merge(src proto.Message)
func (*IngesterDesc) XXX_Size ¶
func (m *IngesterDesc) XXX_Size() int
func (*IngesterDesc) XXX_Unmarshal ¶
func (m *IngesterDesc) XXX_Unmarshal(b []byte) error
type IngesterState ¶
type IngesterState int32
const ( ACTIVE IngesterState = 0 LEAVING IngesterState = 1 PENDING IngesterState = 2 JOINING IngesterState = 3 // This state is only used by gossiping code to distribute information about // ingesters that have been removed from the ring. Ring users should not use it directly. LEFT IngesterState = 4 )
func (IngesterState) EnumDescriptor ¶
func (IngesterState) EnumDescriptor() ([]byte, []int)
func (IngesterState) String ¶
func (x IngesterState) String() string
type LeaveOnStoppingDelegate ¶
type LeaveOnStoppingDelegate struct {
// contains filtered or unexported fields
}
func NewLeaveOnStoppingDelegate ¶
func NewLeaveOnStoppingDelegate(next BasicLifecyclerDelegate, logger log.Logger) *LeaveOnStoppingDelegate
func (*LeaveOnStoppingDelegate) OnRingInstanceHeartbeat ¶
func (d *LeaveOnStoppingDelegate) OnRingInstanceHeartbeat(lifecycler *BasicLifecycler, ringDesc *Desc, instanceDesc *IngesterDesc)
func (*LeaveOnStoppingDelegate) OnRingInstanceRegister ¶
func (d *LeaveOnStoppingDelegate) OnRingInstanceRegister(lifecycler *BasicLifecycler, ringDesc Desc, instanceExists bool, instanceID string, instanceDesc IngesterDesc) (IngesterState, Tokens)
func (*LeaveOnStoppingDelegate) OnRingInstanceStopping ¶
func (d *LeaveOnStoppingDelegate) OnRingInstanceStopping(lifecycler *BasicLifecycler)
func (*LeaveOnStoppingDelegate) OnRingInstanceTokens ¶
func (d *LeaveOnStoppingDelegate) OnRingInstanceTokens(lifecycler *BasicLifecycler, tokens Tokens)
type Lifecycler ¶
type Lifecycler struct { *services.BasicService KVStore kv.Client // These values are initialised at startup, and never change ID string Addr string RingName string RingKey string Zone string // contains filtered or unexported fields }
Lifecycler is responsible for managing the lifecycle of entries in the ring.
func NewLifecycler ¶
func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringName, ringKey string, flushOnShutdown bool, reg prometheus.Registerer) (*Lifecycler, error)
NewLifecycler creates new Lifecycler. It must be started via StartAsync.
func (*Lifecycler) ChangeState ¶
func (i *Lifecycler) ChangeState(ctx context.Context, state IngesterState) error
ChangeState of the ingester, for use off of the loop() goroutine.
func (*Lifecycler) CheckReady ¶
func (i *Lifecycler) CheckReady(ctx context.Context) error
CheckReady is used to rate limit the number of ingesters that can be coming or going at any one time, by only returning true if all ingesters are active. The state latches: once we have gone ready we don't go un-ready
func (*Lifecycler) ClaimTokensFor ¶
func (i *Lifecycler) ClaimTokensFor(ctx context.Context, ingesterID string) error
ClaimTokensFor takes all the tokens for the supplied ingester and assigns them to this ingester.
For this method to work correctly (especially when using gossiping), source ingester (specified by ingesterID) must be in the LEAVING state, otherwise ring's merge function may detect token conflict and assign token to the wrong ingester. While we could check for that state here, when this method is called, transfers have already finished -- it's better to check for this *before* transfers start.
func (*Lifecycler) FlushOnShutdown ¶
func (i *Lifecycler) FlushOnShutdown() bool
FlushOnShutdown returns if flushing is enabled if transfer fails on a shutdown.
func (*Lifecycler) GetState ¶
func (i *Lifecycler) GetState() IngesterState
GetState returns the state of this ingester.
func (*Lifecycler) HealthyInstancesCount ¶
func (i *Lifecycler) HealthyInstancesCount() int
HealthyInstancesCount returns the number of healthy instances for the Write operation in the ring, updated during the last heartbeat period.
func (*Lifecycler) SetFlushOnShutdown ¶
func (i *Lifecycler) SetFlushOnShutdown(flushOnShutdown bool)
SetFlushOnShutdown enables/disables flush on shutdown if transfer fails. Passing 'true' enables it, and 'false' disabled it.
func (*Lifecycler) ZonesCount ¶
func (i *Lifecycler) ZonesCount() int
ZonesCount returns the number of zones for which there's at least 1 instance registered in the ring.
type LifecyclerConfig ¶
type LifecyclerConfig struct { RingConfig Config `yaml:"ring"` // Config for the ingester lifecycle control NumTokens int `yaml:"num_tokens"` HeartbeatPeriod time.Duration `yaml:"heartbeat_period"` ObservePeriod time.Duration `yaml:"observe_period"` JoinAfter time.Duration `yaml:"join_after"` MinReadyDuration time.Duration `yaml:"min_ready_duration"` InfNames []string `yaml:"interface_names"` FinalSleep time.Duration `yaml:"final_sleep"` TokensFilePath string `yaml:"tokens_file_path"` Zone string `yaml:"availability_zone"` // For testing, you can override the address and ID of this ingester Addr string `yaml:"address" doc:"hidden"` Port int `doc:"hidden"` ID string `doc:"hidden"` SkipUnregister bool `yaml:"-"` // Injected internally ListenPort int `yaml:"-"` }
LifecyclerConfig is the config to build a Lifecycler.
func (*LifecyclerConfig) RegisterFlags ¶
func (cfg *LifecyclerConfig) RegisterFlags(f *flag.FlagSet)
RegisterFlags adds the flags required to config this to the given FlagSet
func (*LifecyclerConfig) RegisterFlagsWithPrefix ¶
func (cfg *LifecyclerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)
RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet.
type NoopFlushTransferer ¶
type NoopFlushTransferer struct{}
NoopFlushTransferer is a FlushTransferer which does nothing and can be used in cases we don't need one
func NewNoopFlushTransferer ¶
func NewNoopFlushTransferer() *NoopFlushTransferer
NewNoopFlushTransferer makes a new NoopFlushTransferer
func (*NoopFlushTransferer) TransferOut ¶
func (t *NoopFlushTransferer) TransferOut(ctx context.Context) error
TransferOut is a noop
type Operation ¶
type Operation int
Operation can be Read or Write
const ( Read Operation = iota Write Reporting // Special value for inquiring about health // BlocksSync is the operation run by the store-gateway to sync blocks. BlocksSync // BlocksRead is the operation run by the querier to query blocks via the store-gateway. BlocksRead // Ruler is the operation used for distributing rule groups between rulers. Ruler // Compactor is the operation used for distributing tenants/blocks across compactors. Compactor )
Values for Operation
type ReadRing ¶
type ReadRing interface { prometheus.Collector // Get returns n (or more) ingesters which form the replicas for the given key. // buf is a slice to be overwritten for the return value // to avoid memory allocation; can be nil. Get(key uint32, op Operation, buf []IngesterDesc) (ReplicationSet, error) GetAll(op Operation) (ReplicationSet, error) ReplicationFactor() int IngesterCount() int // ShuffleShard returns a subring for the provided identifier (eg. a tenant ID) // and size (number of instances). ShuffleShard(identifier string, size int) ReadRing // ShuffleShardWithLookback is like ShuffleShard() but the returned subring includes // all instances that have been part of the identifier's shard since "now - lookbackPeriod". ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) ReadRing // HasInstance returns whether the ring contains an instance matching the provided instanceID. HasInstance(instanceID string) bool }
ReadRing represents the read interface to the ring.
type ReplicationSet ¶
type ReplicationSet struct { Ingesters []IngesterDesc MaxErrors int }
ReplicationSet describes the ingesters to talk to for a given key, and how many errors to tolerate.
func (ReplicationSet) Do ¶
func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, f func(context.Context, *IngesterDesc) (interface{}, error)) ([]interface{}, error)
Do function f in parallel for all replicas in the set, erroring is we exceed MaxErrors and returning early otherwise.
func (ReplicationSet) GetAddresses ¶
func (r ReplicationSet) GetAddresses() []string
GetAddresses returns the addresses of all instances within the replication set. Returned slice order is not guaranteed.
func (ReplicationSet) Includes ¶
func (r ReplicationSet) Includes(addr string) bool
Includes returns whether the replication set includes the replica with the provided addr.
type ReplicationStrategy ¶
type ReplicationStrategy interface { // Filter out unhealthy instances and checks if there're enough instances // for an operation to succeed. Returns an error if there are not enough // instances. Filter(instances []IngesterDesc, op Operation, replicationFactor int, heartbeatTimeout time.Duration, zoneAwarenessEnabled bool) (healthy []IngesterDesc, maxFailures int, err error) // ShouldExtendReplicaSet returns true if given an instance that's going to be // added to the replica set, the replica set size should be extended by 1 // more instance for the given operation. ShouldExtendReplicaSet(instance IngesterDesc, op Operation) bool }
type Ring ¶
Ring holds the information about the members of the consistent hash ring.
func New ¶
func New(cfg Config, name, key string, reg prometheus.Registerer) (*Ring, error)
New creates a new Ring. Being a service, Ring needs to be started to do anything.
func (*Ring) Collect ¶
func (r *Ring) Collect(ch chan<- prometheus.Metric)
Collect implements prometheus.Collector.
func (*Ring) Describe ¶
func (r *Ring) Describe(ch chan<- *prometheus.Desc)
Describe implements prometheus.Collector.
func (*Ring) Get ¶
func (r *Ring) Get(key uint32, op Operation, buf []IngesterDesc) (ReplicationSet, error)
Get returns n (or more) ingesters which form the replicas for the given key.
func (*Ring) GetAll ¶
func (r *Ring) GetAll(op Operation) (ReplicationSet, error)
GetAll returns all available ingesters in the ring.
func (*Ring) GetInstanceState ¶
func (r *Ring) GetInstanceState(instanceID string) (IngesterState, error)
GetInstanceState returns the current state of an instance or an error if the instance does not exist in the ring.
func (*Ring) HasInstance ¶
HasInstance returns whether the ring contains an instance matching the provided instanceID.
func (*Ring) IngesterCount ¶
IngesterCount is number of ingesters in the ring
func (*Ring) IsHealthy ¶
func (r *Ring) IsHealthy(ingester *IngesterDesc, op Operation) bool
IsHealthy checks whether an ingester appears to be alive and heartbeating
func (*Ring) ReplicationFactor ¶
ReplicationFactor of the ring.
func (*Ring) ShuffleShard ¶
ShuffleShard returns a subring for the provided identifier (eg. a tenant ID) and size (number of instances). The size is expected to be a multiple of the number of zones and the returned subring will contain the same number of instances per zone as far as there are enough registered instances in the ring.
The algorithm used to build the subring is a shuffle sharder based on probabilistic hashing. We treat each zone as a separate ring and pick N unique replicas from each zone, walking the ring starting from random but predictable numbers. The random generator is initialised with a seed based on the provided identifier.
This implementation guarantees:
- Stability: given the same ring, two invocations returns the same result.
- Consistency: adding/removing 1 instance from the ring generates a resulting subring with no more then 1 difference.
- Shuffling: probabilistically, for a large enough cluster each identifier gets a different set of instances, with a reduced number of overlapping instances between two identifiers.
func (*Ring) ShuffleShardWithLookback ¶
func (r *Ring) ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) ReadRing
ShuffleShardWithLookback is like ShuffleShard() but the returned subring includes all instances that have been part of the identifier's shard since "now - lookbackPeriod".
The returned subring may be unbalanced with regard to zones and should never be used for write operations (read only).
This function doesn't support caching.
type Tokens ¶
type Tokens []uint32
Tokens is a simple list of tokens.
func LoadTokensFromFile ¶
LoadTokensFromFile loads tokens from given file path.
func (Tokens) StoreToFile ¶
StoreToFile stores the tokens in the given directory.
type TokensPersistencyDelegate ¶
type TokensPersistencyDelegate struct {
// contains filtered or unexported fields
}
func NewTokensPersistencyDelegate ¶
func NewTokensPersistencyDelegate(path string, state IngesterState, next BasicLifecyclerDelegate, logger log.Logger) *TokensPersistencyDelegate
func (*TokensPersistencyDelegate) OnRingInstanceHeartbeat ¶
func (d *TokensPersistencyDelegate) OnRingInstanceHeartbeat(lifecycler *BasicLifecycler, ringDesc *Desc, instanceDesc *IngesterDesc)
func (*TokensPersistencyDelegate) OnRingInstanceRegister ¶
func (d *TokensPersistencyDelegate) OnRingInstanceRegister(lifecycler *BasicLifecycler, ringDesc Desc, instanceExists bool, instanceID string, instanceDesc IngesterDesc) (IngesterState, Tokens)
func (*TokensPersistencyDelegate) OnRingInstanceStopping ¶
func (d *TokensPersistencyDelegate) OnRingInstanceStopping(lifecycler *BasicLifecycler)
func (*TokensPersistencyDelegate) OnRingInstanceTokens ¶
func (d *TokensPersistencyDelegate) OnRingInstanceTokens(lifecycler *BasicLifecycler, tokens Tokens)