Documentation ¶
Index ¶
- Constants
- Variables
- func DoBatch(ctx context.Context, r ReadRing, keys []uint32, ...) error
- func GenerateTokens(numTokens int, takenTokens []uint32) []uint32
- func GetCodec() codec.Codec
- func ProtoDescFactory() proto.Message
- type ByToken
- type Config
- type Desc
- func (d *Desc) AddIngester(id, addr, zone string, tokens []uint32, state IngesterState)
- func (d *Desc) ClaimTokens(from, to string) Tokens
- func (*Desc) Descriptor() ([]byte, []int)
- func (this *Desc) Equal(that interface{}) bool
- func (d *Desc) FindIngestersByState(state IngesterState) []IngesterDesc
- func (m *Desc) GetIngesters() map[string]IngesterDesc
- func (this *Desc) GoString() string
- func (m *Desc) Marshal() (dAtA []byte, err error)
- func (m *Desc) MarshalTo(dAtA []byte) (int, error)
- func (m *Desc) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (d *Desc) Merge(mergeable memberlist.Mergeable, localCAS bool) (memberlist.Mergeable, error)
- func (d *Desc) MergeContent() []string
- func (*Desc) ProtoMessage()
- func (d *Desc) Ready(now time.Time, heartbeatTimeout time.Duration) error
- func (d *Desc) RemoveIngester(id string)
- func (d *Desc) RemoveTombstones(limit time.Time)
- func (m *Desc) Reset()
- func (m *Desc) Size() (n int)
- func (this *Desc) String() string
- func (d *Desc) TokensFor(id string) (tokens, other Tokens)
- func (m *Desc) Unmarshal(dAtA []byte) error
- func (m *Desc) XXX_DiscardUnknown()
- func (m *Desc) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *Desc) XXX_Merge(src proto.Message)
- func (m *Desc) XXX_Size() int
- func (m *Desc) XXX_Unmarshal(b []byte) error
- type FlushTransferer
- type IngesterDesc
- func (*IngesterDesc) Descriptor() ([]byte, []int)
- func (this *IngesterDesc) Equal(that interface{}) bool
- func (m *IngesterDesc) GetAddr() string
- func (m *IngesterDesc) GetState() IngesterState
- func (m *IngesterDesc) GetTimestamp() int64
- func (m *IngesterDesc) GetTokens() []uint32
- func (m *IngesterDesc) GetZone() string
- func (this *IngesterDesc) GoString() string
- func (i *IngesterDesc) IsHealthy(op Operation, heartbeatTimeout time.Duration) bool
- func (m *IngesterDesc) Marshal() (dAtA []byte, err error)
- func (m *IngesterDesc) MarshalTo(dAtA []byte) (int, error)
- func (m *IngesterDesc) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*IngesterDesc) ProtoMessage()
- func (m *IngesterDesc) Reset()
- func (m *IngesterDesc) Size() (n int)
- func (this *IngesterDesc) String() string
- func (m *IngesterDesc) Unmarshal(dAtA []byte) error
- func (m *IngesterDesc) XXX_DiscardUnknown()
- func (m *IngesterDesc) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *IngesterDesc) XXX_Merge(src proto.Message)
- func (m *IngesterDesc) XXX_Size() int
- func (m *IngesterDesc) XXX_Unmarshal(b []byte) error
- type IngesterState
- type Lifecycler
- func (i *Lifecycler) ChangeState(ctx context.Context, state IngesterState) error
- func (i *Lifecycler) CheckReady(ctx context.Context) error
- func (i *Lifecycler) ClaimTokensFor(ctx context.Context, ingesterID string) error
- func (i *Lifecycler) FlushOnShutdown() bool
- func (i *Lifecycler) GetState() IngesterState
- func (i *Lifecycler) HealthyInstancesCount() int
- func (i *Lifecycler) SetFlushOnShutdown(flushOnShutdown bool)
- type LifecyclerConfig
- type NoopFlushTransferer
- type Operation
- type ReadRing
- type ReplicationSet
- type Ring
- func (r *Ring) Collect(ch chan<- prometheus.Metric)
- func (r *Ring) Describe(ch chan<- *prometheus.Desc)
- func (r *Ring) Get(key uint32, op Operation, buf []IngesterDesc) (ReplicationSet, error)
- func (r *Ring) GetAll() (ReplicationSet, error)
- func (r *Ring) IngesterCount() int
- func (r *Ring) IsHealthy(ingester *IngesterDesc, op Operation) bool
- func (r *Ring) ReplicationFactor() int
- func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request)
- func (r *Ring) Subring(key uint32, n int) (ReadRing, error)
- type TokenDesc
- type Tokens
Constants ¶
const ( // IngesterRingKey is the key under which we store the ingesters ring in the KVStore. IngesterRingKey = "ring" // RulerRingKey is the key under which we store the rulers ring in the KVStore. RulerRingKey = "ring" // DistributorRingKey is the key under which we store the distributors ring in the KVStore. DistributorRingKey = "distributor" // CompactorRingKey is the key under which we store the compactors ring in the KVStore. CompactorRingKey = "compactor" // StoreGatewayRingKey is the key under which we store the store gateways ring in the KVStore. StoreGatewayRingKey = "store-gateway" )
Variables ¶
var ( ErrInvalidLengthRing = fmt.Errorf("proto: negative length found during unmarshaling") ErrIntOverflowRing = fmt.Errorf("proto: integer overflow") )
var ErrEmptyRing = errors.New("empty ring")
ErrEmptyRing is the error returned when trying to get an element when nothing has been added to hash.
var ErrTransferDisabled = errors.New("transfers disabled")
ErrTransferDisabled is the error returned by TransferOut when the transfers are disabled.
var IngesterState_name = map[int32]string{
0: "ACTIVE",
1: "LEAVING",
2: "PENDING",
3: "JOINING",
4: "LEFT",
}
var IngesterState_value = map[string]int32{
"ACTIVE": 0,
"LEAVING": 1,
"PENDING": 2,
"JOINING": 3,
"LEFT": 4,
}
Functions ¶
func DoBatch ¶
func DoBatch(ctx context.Context, r ReadRing, keys []uint32, callback func(IngesterDesc, []int) error, cleanup func()) error
DoBatch request against a set of keys in the ring, handling replication and failures. For example if we want to write N items where they may all hit different ingesters, and we want them all replicated R ways with quorum writes, we track the relationship between batch RPCs and the items within them.
Callback is passed the ingester to target, and the indexes of the keys to send to that ingester.
Not implemented as a method on Ring so we can test separately.
func GenerateTokens ¶
GenerateTokens make numTokens unique random tokens, none of which clash with takenTokens.
Types ¶
type Config ¶
type Config struct { KVStore kv.Config `yaml:"kvstore"` HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"` ReplicationFactor int `yaml:"replication_factor"` }
Config for a Ring
func (*Config) RegisterFlags ¶
RegisterFlags adds the flags required to config this to the given FlagSet with a specified prefix
type Desc ¶
type Desc struct {
Ingesters map[string]IngesterDesc `` /* 149-byte string literal not displayed */
}
func (*Desc) AddIngester ¶
func (d *Desc) AddIngester(id, addr, zone string, tokens []uint32, state IngesterState)
AddIngester adds the given ingester to the ring. Ingester will only use supplied tokens, any other tokens are removed.
func (*Desc) ClaimTokens ¶
ClaimTokens transfers all the tokens from one ingester to another, returning the claimed token. This method assumes that Ring is in the correct state, 'to' ingester has no tokens anywhere. Tokens list must be sorted properly. If all of this is true, everything will be fine.
func (*Desc) Descriptor ¶
func (*Desc) FindIngestersByState ¶
func (d *Desc) FindIngestersByState(state IngesterState) []IngesterDesc
FindIngestersByState returns the list of ingesters in the given state
func (*Desc) GetIngesters ¶
func (m *Desc) GetIngesters() map[string]IngesterDesc
func (*Desc) MarshalToSizedBuffer ¶ added in v0.7.0
func (*Desc) Merge ¶ added in v0.4.0
func (d *Desc) Merge(mergeable memberlist.Mergeable, localCAS bool) (memberlist.Mergeable, error)
Merge merges other ring into this one. Returns sub-ring that represents the change, and can be sent out to other clients.
This merge function depends on the timestamp of the ingester. For each ingester, it will choose more recent state from the two rings, and put that into this ring. There is one exception: we accept LEFT state even if Timestamp hasn't changed.
localCAS flag tells the merge that it can use incoming ring as a full state, and detect missing ingesters based on it. Ingesters from incoming ring will cause ingester to be marked as LEFT and gossiped about.
If multiple ingesters end up owning the same tokens, Merge will do token conflict resolution (see resolveConflicts).
This method is part of memberlist.Mergeable interface, and is only used by gossiping ring.
func (*Desc) MergeContent ¶ added in v0.4.0
MergeContent describes content of this Mergeable. Ring simply returns list of ingesters that it includes.
func (*Desc) ProtoMessage ¶
func (*Desc) ProtoMessage()
func (*Desc) RemoveIngester ¶
RemoveIngester removes the given ingester and all its tokens.
func (*Desc) RemoveTombstones ¶ added in v0.4.0
RemoveTombstones removes LEFT ingesters older than given time limit. If time limit is zero, remove all LEFT ingesters.
func (*Desc) TokensFor ¶
TokensFor partitions the tokens into those for the given ID, and those for others.
func (*Desc) XXX_DiscardUnknown ¶
func (m *Desc) XXX_DiscardUnknown()
func (*Desc) XXX_Unmarshal ¶
type FlushTransferer ¶
FlushTransferer controls the shutdown of an instance in the ring. Methods on this interface are called when lifecycler is stopping. At that point, it no longer runs the "actor loop", but it keeps updating heartbeat in the ring. Ring entry is in LEAVING state. After calling TransferOut and then Flush, lifecycler stops.
type IngesterDesc ¶
type IngesterDesc struct { Addr string `protobuf:"bytes,1,opt,name=addr,proto3" json:"addr,omitempty"` Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"` State IngesterState `protobuf:"varint,3,opt,name=state,proto3,enum=ring.IngesterState" json:"state,omitempty"` Tokens []uint32 `protobuf:"varint,6,rep,packed,name=tokens,proto3" json:"tokens,omitempty"` Zone string `protobuf:"bytes,7,opt,name=zone,proto3" json:"zone,omitempty"` }
func (*IngesterDesc) Descriptor ¶
func (*IngesterDesc) Descriptor() ([]byte, []int)
func (*IngesterDesc) Equal ¶
func (this *IngesterDesc) Equal(that interface{}) bool
func (*IngesterDesc) GetAddr ¶
func (m *IngesterDesc) GetAddr() string
func (*IngesterDesc) GetState ¶
func (m *IngesterDesc) GetState() IngesterState
func (*IngesterDesc) GetTimestamp ¶
func (m *IngesterDesc) GetTimestamp() int64
func (*IngesterDesc) GetTokens ¶
func (m *IngesterDesc) GetTokens() []uint32
func (*IngesterDesc) GetZone ¶ added in v1.0.0
func (m *IngesterDesc) GetZone() string
func (*IngesterDesc) GoString ¶
func (this *IngesterDesc) GoString() string
func (*IngesterDesc) IsHealthy ¶ added in v0.4.0
func (i *IngesterDesc) IsHealthy(op Operation, heartbeatTimeout time.Duration) bool
IsHealthy checks whether the ingester appears to be alive and heartbeating
func (*IngesterDesc) Marshal ¶
func (m *IngesterDesc) Marshal() (dAtA []byte, err error)
func (*IngesterDesc) MarshalToSizedBuffer ¶ added in v0.7.0
func (m *IngesterDesc) MarshalToSizedBuffer(dAtA []byte) (int, error)
func (*IngesterDesc) ProtoMessage ¶
func (*IngesterDesc) ProtoMessage()
func (*IngesterDesc) Reset ¶
func (m *IngesterDesc) Reset()
func (*IngesterDesc) Size ¶
func (m *IngesterDesc) Size() (n int)
func (*IngesterDesc) String ¶
func (this *IngesterDesc) String() string
func (*IngesterDesc) Unmarshal ¶
func (m *IngesterDesc) Unmarshal(dAtA []byte) error
func (*IngesterDesc) XXX_DiscardUnknown ¶
func (m *IngesterDesc) XXX_DiscardUnknown()
func (*IngesterDesc) XXX_Marshal ¶
func (m *IngesterDesc) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*IngesterDesc) XXX_Merge ¶
func (m *IngesterDesc) XXX_Merge(src proto.Message)
func (*IngesterDesc) XXX_Size ¶
func (m *IngesterDesc) XXX_Size() int
func (*IngesterDesc) XXX_Unmarshal ¶
func (m *IngesterDesc) XXX_Unmarshal(b []byte) error
type IngesterState ¶
type IngesterState int32
const ( ACTIVE IngesterState = 0 LEAVING IngesterState = 1 PENDING IngesterState = 2 JOINING IngesterState = 3 // This state is only used by gossiping code to distribute information about // ingesters that have been removed from the ring. Ring users should not use it directly. LEFT IngesterState = 4 )
func (IngesterState) EnumDescriptor ¶
func (IngesterState) EnumDescriptor() ([]byte, []int)
func (IngesterState) String ¶
func (x IngesterState) String() string
type Lifecycler ¶
type Lifecycler struct { *services.BasicService KVStore kv.Client // These values are initialised at startup, and never change ID string Addr string RingName string RingKey string Zone string // contains filtered or unexported fields }
Lifecycler is responsible for managing the lifecycle of entries in the ring.
func NewLifecycler ¶
func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringName, ringKey string, flushOnShutdown bool) (*Lifecycler, error)
NewLifecycler creates new Lifecycler. It must be started via StartAsync.
func (*Lifecycler) ChangeState ¶
func (i *Lifecycler) ChangeState(ctx context.Context, state IngesterState) error
ChangeState of the ingester, for use off of the loop() goroutine.
func (*Lifecycler) CheckReady ¶
func (i *Lifecycler) CheckReady(ctx context.Context) error
CheckReady is used to rate limit the number of ingesters that can be coming or going at any one time, by only returning true if all ingesters are active. The state latches: once we have gone ready we don't go un-ready
func (*Lifecycler) ClaimTokensFor ¶
func (i *Lifecycler) ClaimTokensFor(ctx context.Context, ingesterID string) error
ClaimTokensFor takes all the tokens for the supplied ingester and assigns them to this ingester.
For this method to work correctly (especially when using gossiping), source ingester (specified by ingesterID) must be in the LEAVING state, otherwise ring's merge function may detect token conflict and assign token to the wrong ingester. While we could check for that state here, when this method is called, transfers have already finished -- it's better to check for this *before* transfers start.
func (*Lifecycler) FlushOnShutdown ¶ added in v0.6.0
func (i *Lifecycler) FlushOnShutdown() bool
FlushOnShutdown returns if flushing is enabled if transfer fails on a shutdown.
func (*Lifecycler) GetState ¶
func (i *Lifecycler) GetState() IngesterState
GetState returns the state of this ingester.
func (*Lifecycler) HealthyInstancesCount ¶ added in v0.4.0
func (i *Lifecycler) HealthyInstancesCount() int
HealthyInstancesCount returns the number of healthy instances in the ring, updated during the last heartbeat period
func (*Lifecycler) SetFlushOnShutdown ¶ added in v0.6.0
func (i *Lifecycler) SetFlushOnShutdown(flushOnShutdown bool)
SetFlushOnShutdown enables/disables flush on shutdown if transfer fails. Passing 'true' enables it, and 'false' disabled it.
type LifecyclerConfig ¶
type LifecyclerConfig struct { RingConfig Config `yaml:"ring"` // Config for the ingester lifecycle control ListenPort *int `yaml:"-"` NumTokens int `yaml:"num_tokens"` HeartbeatPeriod time.Duration `yaml:"heartbeat_period"` ObservePeriod time.Duration `yaml:"observe_period"` JoinAfter time.Duration `yaml:"join_after"` MinReadyDuration time.Duration `yaml:"min_ready_duration"` InfNames []string `yaml:"interface_names"` FinalSleep time.Duration `yaml:"final_sleep"` TokensFilePath string `yaml:"tokens_file_path"` Zone string `yaml:"availability_zone"` // For testing, you can override the address and ID of this ingester Addr string `yaml:"address" doc:"hidden"` Port int `doc:"hidden"` ID string `doc:"hidden"` SkipUnregister bool `yaml:"-"` }
LifecyclerConfig is the config to build a Lifecycler.
func (*LifecyclerConfig) RegisterFlags ¶
func (cfg *LifecyclerConfig) RegisterFlags(f *flag.FlagSet)
RegisterFlags adds the flags required to config this to the given FlagSet
func (*LifecyclerConfig) RegisterFlagsWithPrefix ¶
func (cfg *LifecyclerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)
RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet.
type NoopFlushTransferer ¶ added in v0.6.0
type NoopFlushTransferer struct{}
NoopFlushTransferer is a FlushTransferer which does nothing and can be used in cases we don't need one
func NewNoopFlushTransferer ¶ added in v0.6.0
func NewNoopFlushTransferer() *NoopFlushTransferer
NewNoopFlushTransferer makes a new NoopFlushTransferer
func (*NoopFlushTransferer) Flush ¶ added in v0.6.0
func (t *NoopFlushTransferer) Flush()
Flush is a noop
func (*NoopFlushTransferer) TransferOut ¶ added in v0.6.0
func (t *NoopFlushTransferer) TransferOut(ctx context.Context) error
TransferOut is a noop
type ReadRing ¶
type ReadRing interface { prometheus.Collector // Get returns n (or more) ingesters which form the replicas for the given key. // buf is a slice to be overwritten for the return value // to avoid memory allocation; can be nil. Get(key uint32, op Operation, buf []IngesterDesc) (ReplicationSet, error) GetAll() (ReplicationSet, error) ReplicationFactor() int IngesterCount() int Subring(key uint32, n int) (ReadRing, error) }
ReadRing represents the read interface to the ring.
type ReplicationSet ¶
type ReplicationSet struct { Ingesters []IngesterDesc MaxErrors int }
ReplicationSet describes the ingesters to talk to for a given key, and how many errors to tolerate.
func (ReplicationSet) Do ¶
func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, f func(*IngesterDesc) (interface{}, error)) ([]interface{}, error)
Do function f in parallel for all replicas in the set, erroring is we exceed MaxErrors and returning early otherwise.
type Ring ¶
Ring holds the information about the members of the consistent hash ring.
func (*Ring) Collect ¶
func (r *Ring) Collect(ch chan<- prometheus.Metric)
Collect implements prometheus.Collector.
func (*Ring) Describe ¶
func (r *Ring) Describe(ch chan<- *prometheus.Desc)
Describe implements prometheus.Collector.
func (*Ring) Get ¶
func (r *Ring) Get(key uint32, op Operation, buf []IngesterDesc) (ReplicationSet, error)
Get returns n (or more) ingesters which form the replicas for the given key.
func (*Ring) GetAll ¶
func (r *Ring) GetAll() (ReplicationSet, error)
GetAll returns all available ingesters in the ring.
func (*Ring) IngesterCount ¶ added in v0.3.0
IngesterCount is number of ingesters in the ring
func (*Ring) IsHealthy ¶
func (r *Ring) IsHealthy(ingester *IngesterDesc, op Operation) bool
IsHealthy checks whether an ingester appears to be alive and heartbeating
func (*Ring) ReplicationFactor ¶
ReplicationFactor of the ring.
type Tokens ¶ added in v0.6.0
type Tokens []uint32
Tokens is a simple list of tokens.
func LoadTokensFromFile ¶ added in v0.6.0
LoadTokensFromFile loads tokens from given file path.
func (Tokens) StoreToFile ¶ added in v0.6.0
StoreToFile stores the tokens in the given directory.