ring

package
v1.0.0-rc.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 31, 2020 License: Apache-2.0 Imports: 34 Imported by: 49

Documentation

Index

Constants

View Source
const (

	// IngesterRingKey is the key under which we store the ingesters ring in the KVStore.
	IngesterRingKey = "ring"

	// RulerRingKey is the key under which we store the rulers ring in the KVStore.
	RulerRingKey = "ring"

	// DistributorRingKey is the key under which we store the distributors ring in the KVStore.
	DistributorRingKey = "distributor"

	// CompactorRingKey is the key under which we store the compactors ring in the KVStore.
	CompactorRingKey = "compactor"

	// StoreGatewayRingKey is the key under which we store the store gateways ring in the KVStore.
	StoreGatewayRingKey = "store-gateway"
)

Variables

View Source
var (
	ErrInvalidLengthRing = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflowRing   = fmt.Errorf("proto: integer overflow")
)
View Source
var ErrEmptyRing = errors.New("empty ring")

ErrEmptyRing is the error returned when trying to get an element when nothing has been added to hash.

View Source
var ErrTransferDisabled = errors.New("transfers disabled")

ErrTransferDisabled is the error returned by TransferOut when the transfers are disabled.

View Source
var IngesterState_name = map[int32]string{
	0: "ACTIVE",
	1: "LEAVING",
	2: "PENDING",
	3: "JOINING",
	4: "LEFT",
}
View Source
var IngesterState_value = map[string]int32{
	"ACTIVE":  0,
	"LEAVING": 1,
	"PENDING": 2,
	"JOINING": 3,
	"LEFT":    4,
}

Functions

func DoBatch

func DoBatch(ctx context.Context, r ReadRing, keys []uint32, callback func(IngesterDesc, []int) error, cleanup func()) error

DoBatch request against a set of keys in the ring, handling replication and failures. For example if we want to write N items where they may all hit different ingesters, and we want them all replicated R ways with quorum writes, we track the relationship between batch RPCs and the items within them.

Callback is passed the ingester to target, and the indexes of the keys to send to that ingester.

Not implemented as a method on Ring so we can test separately.

func GenerateTokens

func GenerateTokens(numTokens int, takenTokens []uint32) []uint32

GenerateTokens make numTokens unique random tokens, none of which clash with takenTokens.

func GetCodec

func GetCodec() codec.Codec

GetCodec returns the codec used to encode and decode data being put by ring.

func ProtoDescFactory

func ProtoDescFactory() proto.Message

ProtoDescFactory makes new Descs

Types

type ByToken

type ByToken []TokenDesc

ByToken is a sortable list of TokenDescs

func (ByToken) Len

func (ts ByToken) Len() int

func (ByToken) Less

func (ts ByToken) Less(i, j int) bool

func (ByToken) Swap

func (ts ByToken) Swap(i, j int)

type Config

type Config struct {
	KVStore           kv.Config     `yaml:"kvstore"`
	HeartbeatTimeout  time.Duration `yaml:"heartbeat_timeout"`
	ReplicationFactor int           `yaml:"replication_factor"`
}

Config for a Ring

func (*Config) RegisterFlags

func (cfg *Config) RegisterFlags(f *flag.FlagSet)

RegisterFlags adds the flags required to config this to the given FlagSet with a specified prefix

func (*Config) RegisterFlagsWithPrefix

func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)

RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet with a specified prefix

type Desc

type Desc struct {
	Ingesters map[string]IngesterDesc `` /* 149-byte string literal not displayed */
}

func NewDesc

func NewDesc() *Desc

NewDesc returns an empty ring.Desc

func (*Desc) AddIngester

func (d *Desc) AddIngester(id, addr, zone string, tokens []uint32, state IngesterState)

AddIngester adds the given ingester to the ring. Ingester will only use supplied tokens, any other tokens are removed.

func (*Desc) ClaimTokens

func (d *Desc) ClaimTokens(from, to string) Tokens

ClaimTokens transfers all the tokens from one ingester to another, returning the claimed token. This method assumes that Ring is in the correct state, 'to' ingester has no tokens anywhere. Tokens list must be sorted properly. If all of this is true, everything will be fine.

func (*Desc) Descriptor

func (*Desc) Descriptor() ([]byte, []int)

func (*Desc) Equal

func (this *Desc) Equal(that interface{}) bool

func (*Desc) FindIngestersByState

func (d *Desc) FindIngestersByState(state IngesterState) []IngesterDesc

FindIngestersByState returns the list of ingesters in the given state

func (*Desc) GetIngesters

func (m *Desc) GetIngesters() map[string]IngesterDesc

func (*Desc) GoString

func (this *Desc) GoString() string

func (*Desc) Marshal

func (m *Desc) Marshal() (dAtA []byte, err error)

func (*Desc) MarshalTo

func (m *Desc) MarshalTo(dAtA []byte) (int, error)

func (*Desc) MarshalToSizedBuffer added in v0.7.0

func (m *Desc) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*Desc) Merge added in v0.4.0

func (d *Desc) Merge(mergeable memberlist.Mergeable, localCAS bool) (memberlist.Mergeable, error)

Merge merges other ring into this one. Returns sub-ring that represents the change, and can be sent out to other clients.

This merge function depends on the timestamp of the ingester. For each ingester, it will choose more recent state from the two rings, and put that into this ring. There is one exception: we accept LEFT state even if Timestamp hasn't changed.

localCAS flag tells the merge that it can use incoming ring as a full state, and detect missing ingesters based on it. Ingesters from incoming ring will cause ingester to be marked as LEFT and gossiped about.

If multiple ingesters end up owning the same tokens, Merge will do token conflict resolution (see resolveConflicts).

This method is part of memberlist.Mergeable interface, and is only used by gossiping ring.

func (*Desc) MergeContent added in v0.4.0

func (d *Desc) MergeContent() []string

MergeContent describes content of this Mergeable. Ring simply returns list of ingesters that it includes.

func (*Desc) ProtoMessage

func (*Desc) ProtoMessage()

func (*Desc) Ready

func (d *Desc) Ready(now time.Time, heartbeatTimeout time.Duration) error

Ready returns no error when all ingesters are active and healthy.

func (*Desc) RemoveIngester

func (d *Desc) RemoveIngester(id string)

RemoveIngester removes the given ingester and all its tokens.

func (*Desc) RemoveTombstones added in v0.4.0

func (d *Desc) RemoveTombstones(limit time.Time)

RemoveTombstones removes LEFT ingesters older than given time limit. If time limit is zero, remove all LEFT ingesters.

func (*Desc) Reset

func (m *Desc) Reset()

func (*Desc) Size

func (m *Desc) Size() (n int)

func (*Desc) String

func (this *Desc) String() string

func (*Desc) TokensFor

func (d *Desc) TokensFor(id string) (tokens, other Tokens)

TokensFor partitions the tokens into those for the given ID, and those for others.

func (*Desc) Unmarshal

func (m *Desc) Unmarshal(dAtA []byte) error

func (*Desc) XXX_DiscardUnknown

func (m *Desc) XXX_DiscardUnknown()

func (*Desc) XXX_Marshal

func (m *Desc) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*Desc) XXX_Merge

func (m *Desc) XXX_Merge(src proto.Message)

func (*Desc) XXX_Size

func (m *Desc) XXX_Size() int

func (*Desc) XXX_Unmarshal

func (m *Desc) XXX_Unmarshal(b []byte) error

type FlushTransferer

type FlushTransferer interface {
	Flush()
	TransferOut(ctx context.Context) error
}

FlushTransferer controls the shutdown of an instance in the ring. Methods on this interface are called when lifecycler is stopping. At that point, it no longer runs the "actor loop", but it keeps updating heartbeat in the ring. Ring entry is in LEAVING state. After calling TransferOut and then Flush, lifecycler stops.

type IngesterDesc

type IngesterDesc struct {
	Addr      string        `protobuf:"bytes,1,opt,name=addr,proto3" json:"addr,omitempty"`
	Timestamp int64         `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
	State     IngesterState `protobuf:"varint,3,opt,name=state,proto3,enum=ring.IngesterState" json:"state,omitempty"`
	Tokens    []uint32      `protobuf:"varint,6,rep,packed,name=tokens,proto3" json:"tokens,omitempty"`
	Zone      string        `protobuf:"bytes,7,opt,name=zone,proto3" json:"zone,omitempty"`
}

func (*IngesterDesc) Descriptor

func (*IngesterDesc) Descriptor() ([]byte, []int)

func (*IngesterDesc) Equal

func (this *IngesterDesc) Equal(that interface{}) bool

func (*IngesterDesc) GetAddr

func (m *IngesterDesc) GetAddr() string

func (*IngesterDesc) GetState

func (m *IngesterDesc) GetState() IngesterState

func (*IngesterDesc) GetTimestamp

func (m *IngesterDesc) GetTimestamp() int64

func (*IngesterDesc) GetTokens

func (m *IngesterDesc) GetTokens() []uint32

func (*IngesterDesc) GetZone added in v1.0.0

func (m *IngesterDesc) GetZone() string

func (*IngesterDesc) GoString

func (this *IngesterDesc) GoString() string

func (*IngesterDesc) IsHealthy added in v0.4.0

func (i *IngesterDesc) IsHealthy(op Operation, heartbeatTimeout time.Duration) bool

IsHealthy checks whether the ingester appears to be alive and heartbeating

func (*IngesterDesc) Marshal

func (m *IngesterDesc) Marshal() (dAtA []byte, err error)

func (*IngesterDesc) MarshalTo

func (m *IngesterDesc) MarshalTo(dAtA []byte) (int, error)

func (*IngesterDesc) MarshalToSizedBuffer added in v0.7.0

func (m *IngesterDesc) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*IngesterDesc) ProtoMessage

func (*IngesterDesc) ProtoMessage()

func (*IngesterDesc) Reset

func (m *IngesterDesc) Reset()

func (*IngesterDesc) Size

func (m *IngesterDesc) Size() (n int)

func (*IngesterDesc) String

func (this *IngesterDesc) String() string

func (*IngesterDesc) Unmarshal

func (m *IngesterDesc) Unmarshal(dAtA []byte) error

func (*IngesterDesc) XXX_DiscardUnknown

func (m *IngesterDesc) XXX_DiscardUnknown()

func (*IngesterDesc) XXX_Marshal

func (m *IngesterDesc) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*IngesterDesc) XXX_Merge

func (m *IngesterDesc) XXX_Merge(src proto.Message)

func (*IngesterDesc) XXX_Size

func (m *IngesterDesc) XXX_Size() int

func (*IngesterDesc) XXX_Unmarshal

func (m *IngesterDesc) XXX_Unmarshal(b []byte) error

type IngesterState

type IngesterState int32
const (
	ACTIVE  IngesterState = 0
	LEAVING IngesterState = 1
	PENDING IngesterState = 2
	JOINING IngesterState = 3
	// This state is only used by gossiping code to distribute information about
	// ingesters that have been removed from the ring. Ring users should not use it directly.
	LEFT IngesterState = 4
)

func (IngesterState) EnumDescriptor

func (IngesterState) EnumDescriptor() ([]byte, []int)

func (IngesterState) String

func (x IngesterState) String() string

type Lifecycler

type Lifecycler struct {
	*services.BasicService

	KVStore kv.Client

	// These values are initialised at startup, and never change
	ID       string
	Addr     string
	RingName string
	RingKey  string
	Zone     string
	// contains filtered or unexported fields
}

Lifecycler is responsible for managing the lifecycle of entries in the ring.

func NewLifecycler

func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringName, ringKey string, flushOnShutdown bool) (*Lifecycler, error)

NewLifecycler creates new Lifecycler. It must be started via StartAsync.

func (*Lifecycler) ChangeState

func (i *Lifecycler) ChangeState(ctx context.Context, state IngesterState) error

ChangeState of the ingester, for use off of the loop() goroutine.

func (*Lifecycler) CheckReady

func (i *Lifecycler) CheckReady(ctx context.Context) error

CheckReady is used to rate limit the number of ingesters that can be coming or going at any one time, by only returning true if all ingesters are active. The state latches: once we have gone ready we don't go un-ready

func (*Lifecycler) ClaimTokensFor

func (i *Lifecycler) ClaimTokensFor(ctx context.Context, ingesterID string) error

ClaimTokensFor takes all the tokens for the supplied ingester and assigns them to this ingester.

For this method to work correctly (especially when using gossiping), source ingester (specified by ingesterID) must be in the LEAVING state, otherwise ring's merge function may detect token conflict and assign token to the wrong ingester. While we could check for that state here, when this method is called, transfers have already finished -- it's better to check for this *before* transfers start.

func (*Lifecycler) FlushOnShutdown added in v0.6.0

func (i *Lifecycler) FlushOnShutdown() bool

FlushOnShutdown returns if flushing is enabled if transfer fails on a shutdown.

func (*Lifecycler) GetState

func (i *Lifecycler) GetState() IngesterState

GetState returns the state of this ingester.

func (*Lifecycler) HealthyInstancesCount added in v0.4.0

func (i *Lifecycler) HealthyInstancesCount() int

HealthyInstancesCount returns the number of healthy instances in the ring, updated during the last heartbeat period

func (*Lifecycler) SetFlushOnShutdown added in v0.6.0

func (i *Lifecycler) SetFlushOnShutdown(flushOnShutdown bool)

SetFlushOnShutdown enables/disables flush on shutdown if transfer fails. Passing 'true' enables it, and 'false' disabled it.

type LifecyclerConfig

type LifecyclerConfig struct {
	RingConfig Config `yaml:"ring"`

	// Config for the ingester lifecycle control
	ListenPort       *int          `yaml:"-"`
	NumTokens        int           `yaml:"num_tokens"`
	HeartbeatPeriod  time.Duration `yaml:"heartbeat_period"`
	ObservePeriod    time.Duration `yaml:"observe_period"`
	JoinAfter        time.Duration `yaml:"join_after"`
	MinReadyDuration time.Duration `yaml:"min_ready_duration"`
	InfNames         []string      `yaml:"interface_names"`
	FinalSleep       time.Duration `yaml:"final_sleep"`
	TokensFilePath   string        `yaml:"tokens_file_path"`
	Zone             string        `yaml:"availability_zone"`

	// For testing, you can override the address and ID of this ingester
	Addr           string `yaml:"address" doc:"hidden"`
	Port           int    `doc:"hidden"`
	ID             string `doc:"hidden"`
	SkipUnregister bool   `yaml:"-"`
}

LifecyclerConfig is the config to build a Lifecycler.

func (*LifecyclerConfig) RegisterFlags

func (cfg *LifecyclerConfig) RegisterFlags(f *flag.FlagSet)

RegisterFlags adds the flags required to config this to the given FlagSet

func (*LifecyclerConfig) RegisterFlagsWithPrefix

func (cfg *LifecyclerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)

RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet.

type NoopFlushTransferer added in v0.6.0

type NoopFlushTransferer struct{}

NoopFlushTransferer is a FlushTransferer which does nothing and can be used in cases we don't need one

func NewNoopFlushTransferer added in v0.6.0

func NewNoopFlushTransferer() *NoopFlushTransferer

NewNoopFlushTransferer makes a new NoopFlushTransferer

func (*NoopFlushTransferer) Flush added in v0.6.0

func (t *NoopFlushTransferer) Flush()

Flush is a noop

func (*NoopFlushTransferer) TransferOut added in v0.6.0

func (t *NoopFlushTransferer) TransferOut(ctx context.Context) error

TransferOut is a noop

type Operation

type Operation int

Operation can be Read or Write

const (
	Read Operation = iota
	Write
	Reporting // Special value for inquiring about health
)

Values for Operation

type ReadRing

type ReadRing interface {
	prometheus.Collector

	// Get returns n (or more) ingesters which form the replicas for the given key.
	// buf is a slice to be overwritten for the return value
	// to avoid memory allocation; can be nil.
	Get(key uint32, op Operation, buf []IngesterDesc) (ReplicationSet, error)
	GetAll() (ReplicationSet, error)
	ReplicationFactor() int
	IngesterCount() int
	Subring(key uint32, n int) (ReadRing, error)
}

ReadRing represents the read interface to the ring.

type ReplicationSet

type ReplicationSet struct {
	Ingesters []IngesterDesc
	MaxErrors int
}

ReplicationSet describes the ingesters to talk to for a given key, and how many errors to tolerate.

func (ReplicationSet) Do

func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, f func(*IngesterDesc) (interface{}, error)) ([]interface{}, error)

Do function f in parallel for all replicas in the set, erroring is we exceed MaxErrors and returning early otherwise.

type Ring

type Ring struct {
	services.Service

	KVClient kv.Client
	// contains filtered or unexported fields
}

Ring holds the information about the members of the consistent hash ring.

func New

func New(cfg Config, name, key string) (*Ring, error)

New creates a new Ring. Being a service, Ring needs to be started to do anything.

func (*Ring) Collect

func (r *Ring) Collect(ch chan<- prometheus.Metric)

Collect implements prometheus.Collector.

func (*Ring) Describe

func (r *Ring) Describe(ch chan<- *prometheus.Desc)

Describe implements prometheus.Collector.

func (*Ring) Get

func (r *Ring) Get(key uint32, op Operation, buf []IngesterDesc) (ReplicationSet, error)

Get returns n (or more) ingesters which form the replicas for the given key.

func (*Ring) GetAll

func (r *Ring) GetAll() (ReplicationSet, error)

GetAll returns all available ingesters in the ring.

func (*Ring) IngesterCount added in v0.3.0

func (r *Ring) IngesterCount() int

IngesterCount is number of ingesters in the ring

func (*Ring) IsHealthy

func (r *Ring) IsHealthy(ingester *IngesterDesc, op Operation) bool

IsHealthy checks whether an ingester appears to be alive and heartbeating

func (*Ring) ReplicationFactor

func (r *Ring) ReplicationFactor() int

ReplicationFactor of the ring.

func (*Ring) ServeHTTP

func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request)

func (*Ring) Subring added in v0.7.0

func (r *Ring) Subring(key uint32, n int) (ReadRing, error)

Subring returns a ring of n ingesters from the given ring Subrings are meant only for ingestor lookup and should have their data externalized.

type TokenDesc

type TokenDesc struct {
	Token    uint32
	Ingester string
	Zone     string
}

type Tokens added in v0.6.0

type Tokens []uint32

Tokens is a simple list of tokens.

func LoadTokensFromFile added in v0.6.0

func LoadTokensFromFile(tokenFilePath string) (Tokens, error)

LoadTokensFromFile loads tokens from given file path.

func (Tokens) Len added in v0.6.0

func (t Tokens) Len() int

func (Tokens) Less added in v0.6.0

func (t Tokens) Less(i, j int) bool

func (Tokens) Marshal added in v0.6.0

func (t Tokens) Marshal() ([]byte, error)

Marshal encodes the tokens into JSON.

func (Tokens) StoreToFile added in v0.6.0

func (t Tokens) StoreToFile(tokenFilePath string) error

StoreToFile stores the tokens in the given directory.

func (Tokens) Swap added in v0.6.0

func (t Tokens) Swap(i, j int)

func (*Tokens) Unmarshal added in v0.6.0

func (t *Tokens) Unmarshal(b []byte) error

Unmarshal reads the tokens from JSON byte stream.

Directories

Path Synopsis
kv

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL