ring

package
v0.0.0-...-a1bba12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 9, 2024 License: Apache-2.0 Imports: 48 Imported by: 176

Documentation

Overview

Package ring contains types and functions for creating and working with rings.

Overview

Rings are shared between instances via a key-value store, and are represented by the Desc struct, which contains a map of InstanceDesc structs representing individual instances in the ring.

Creating a Ring

Two types are available for creating and updating rings:

  • Lifecycler - A Service that writes to a ring on behalf of a single instance. It's responsible for claiming tokens on the ring and updating the key/value store with heartbeats, state changes, and token changes. This type is the original lifecycler implementation, and BasicLifecycler should be used instead for new services.
  • BasicLifecycler - A Service that writes to a ring on behalf of a single instance. It's responsible for claiming tokens on the ring, updating the key/value store with heartbeats and state changes, and uses a delegate with event listeners to help with these. This type is general purpose, is used by numerous services, and is meant for building higher level lifecyclers.

Observing a ring

The Ring type is a Service that is primarily used for reading and watching for changes to a ring.

Index

Constants

View Source
const (

	// GetBufferSize is the suggested size of buffers passed to Ring.Get(). It's based on
	// a typical replication factor 3, plus extra room for a JOINING + LEAVING instance.
	GetBufferSize = 5
)

Variables

View Source
var (
	ErrPartitionDoesNotExist          = errors.New("the partition does not exist")
	ErrPartitionStateMismatch         = errors.New("the partition state does not match the expected one")
	ErrPartitionStateChangeNotAllowed = errors.New("partition state change not allowed")
)
View Source
var (
	ErrInvalidLengthPartitionRingDesc = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflowPartitionRingDesc   = fmt.Errorf("proto: integer overflow")
)
View Source
var (
	// Write operation that also extends replica set, if instance state is not ACTIVE.
	Write = NewOp([]InstanceState{ACTIVE}, func(s InstanceState) bool {

		return s != ACTIVE
	})

	// WriteNoExtend is like Write, but with no replicaset extension.
	WriteNoExtend = NewOp([]InstanceState{ACTIVE}, nil)

	// Read operation that extends the replica set if an instance is not ACTIVE or LEAVING
	Read = NewOp([]InstanceState{ACTIVE, PENDING, LEAVING}, func(s InstanceState) bool {

		return s != ACTIVE && s != LEAVING
	})

	// Reporting is a special value for inquiring about health.
	Reporting = allStatesRingOperation
)
View Source
var (
	// ErrEmptyRing is the error returned when trying to get an element when nothing has been added to hash.
	ErrEmptyRing = errors.New("empty ring")

	// ErrInstanceNotFound is the error returned when trying to get information for an instance
	// not registered within the ring.
	ErrInstanceNotFound = errors.New("instance not found in the ring")

	// ErrTooManyUnhealthyInstances is the error returned when there are too many failed instances for a
	// specific operation.
	ErrTooManyUnhealthyInstances = errors.New("too many unhealthy instances in the ring")

	// ErrInconsistentTokensInfo is the error returned if, due to an internal bug, the mapping between
	// a token and its own instance is missing or unknown.
	ErrInconsistentTokensInfo = errors.New("inconsistent ring tokens information")
)
View Source
var (
	ErrInvalidLengthRing = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflowRing   = fmt.Errorf("proto: integer overflow")
)
View Source
var ErrNoActivePartitionFound = fmt.Errorf("no active partition found")
View Source
var ErrTransferDisabled = errors.New("transfers disabled")

ErrTransferDisabled is the error returned by TransferOut when the transfers are disabled.

View Source
var InstanceState_name = map[int32]string{
	0: "ACTIVE",
	1: "LEAVING",
	2: "PENDING",
	3: "JOINING",
	4: "LEFT",
}
View Source
var InstanceState_value = map[string]int32{
	"ACTIVE":  0,
	"LEAVING": 1,
	"PENDING": 2,
	"JOINING": 3,
	"LEFT":    4,
}
View Source
var OwnerState_name = map[int32]string{
	0: "OwnerUnknown",
	1: "OwnerActive",
	2: "OwnerDeleted",
}
View Source
var OwnerState_value = map[string]int32{
	"OwnerUnknown": 0,
	"OwnerActive":  1,
	"OwnerDeleted": 2,
}
View Source
var PartitionState_name = map[int32]string{
	0: "PartitionUnknown",
	1: "PartitionPending",
	2: "PartitionActive",
	3: "PartitionInactive",
	4: "PartitionDeleted",
}
View Source
var PartitionState_value = map[string]int32{
	"PartitionUnknown":  0,
	"PartitionPending":  1,
	"PartitionActive":   2,
	"PartitionInactive": 3,
	"PartitionDeleted":  4,
}

Functions

func DoBatch

func DoBatch(ctx context.Context, op Operation, r DoBatchRing, keys []uint32, callback func(InstanceDesc, []int) error, cleanup func()) error

DoBatch is a deprecated version of DoBatchWithOptions where grpc errors containing status codes 4xx are treated as client errors. Deprecated. Use DoBatchWithOptions instead.

func DoBatchWithOptions

func DoBatchWithOptions(ctx context.Context, op Operation, r DoBatchRing, keys []uint32, callback func(InstanceDesc, []int) error, o DoBatchOptions) error

DoBatchWithOptions request against a set of keys in the ring, handling replication and failures. For example if we want to write N items where they may all hit different instances, and we want them all replicated R ways with quorum writes, we track the relationship between batch RPCs and the items within them.

See comments on DoBatchOptions for available options for this call.

Not implemented as a method on Ring, so we can test separately.

func DoMultiUntilQuorumWithoutSuccessfulContextCancellation

func DoMultiUntilQuorumWithoutSuccessfulContextCancellation[T any](ctx context.Context, sets []ReplicationSet, cfg DoUntilQuorumConfig, f func(context.Context, *InstanceDesc, context.CancelCauseFunc) (T, error), cleanupFunc func(T)) ([]T, error)

DoMultiUntilQuorumWithoutSuccessfulContextCancellation behaves similar to DoUntilQuorumWithoutSuccessfulContextCancellation with the following exceptions:

  • This function calls DoUntilQuorumWithoutSuccessfulContextCancellation for each input ReplicationSet and requires DoUntilQuorumWithoutSuccessfulContextCancellation to successfully run for each of them. Execution breaks on the first error returned by DoUntilQuorumWithoutSuccessfulContextCancellation on any ReplicationSet.

  • This function requires that the callback function f always call context.CancelCauseFunc once done. Failing to cancel the context will leak resources.

func DoUntilQuorum

func DoUntilQuorum[T any](ctx context.Context, r ReplicationSet, cfg DoUntilQuorumConfig, f func(context.Context, *InstanceDesc) (T, error), cleanupFunc func(T)) ([]T, error)

DoUntilQuorum runs function f in parallel for all replicas in r.

Result selection

If r.MaxUnavailableZones is greater than zero, or r.ZoneAwarenessEnabled is true, DoUntilQuorum operates in zone-aware mode:

  • DoUntilQuorum returns an error if calls to f for instances in more than r.MaxUnavailableZones zones return errors
  • Otherwise, DoUntilQuorum returns all results from all replicas in the first zones for which f succeeds for every instance in that zone (eg. if there are 3 zones and r.MaxUnavailableZones is 1, DoUntilQuorum will return the results from all instances in 2 zones, even if all calls to f succeed).

Otherwise, DoUntilQuorum operates in non-zone-aware mode:

  • DoUntilQuorum returns an error if more than r.MaxErrors calls to f return errors
  • Otherwise, DoUntilQuorum returns all results from the first len(r.Instances) - r.MaxErrors instances (eg. if there are 6 replicas and r.MaxErrors is 2, DoUntilQuorum will return the results from the first 4 successful calls to f, even if all 6 calls to f succeed).

Request minimization

cfg.MinimizeRequests enables or disables request minimization.

Regardless of the value of cfg.MinimizeRequests, if one of the termination conditions above is satisfied or ctx is cancelled before f is called for an instance, f may not be called for that instance at all.

## When disabled

If request minimization is disabled, DoUntilQuorum will call f for each instance in r. The value of cfg.HedgingDelay is ignored.

## When enabled

If request minimization is enabled, DoUntilQuorum will initially call f for the minimum number of instances needed to reach the termination conditions above, and later call f for further instances if required. For example, if r.MaxUnavailableZones is 1 and there are three zones, DoUntilQuorum will initially only call f for instances in two zones, and only call f for instances in the remaining zone if a request in the initial two zones fails.

If cfg.ZoneSorter is non-nil and DoUntilQuorum is operating in zone-aware mode, DoUntilQuorum will initiate requests to zones in the order returned by the sorter.

If cfg.ZoneSorter is nil, or DoUntilQuorum is operating in non-zone-aware mode, DoUntilQuorum will randomly select available zones / instances such that calling DoUntilQuorum multiple times with the same ReplicationSet should evenly distribute requests across all zones / instances.

If cfg.HedgingDelay is non-zero, DoUntilQuorum will call f for an additional zone's instances (if zone-aware) / an additional instance (if not zone-aware) every cfg.HedgingDelay until one of the termination conditions above is reached. For example, if r.MaxUnavailableZones is 2, cfg.HedgingDelay is 4 seconds and there are fives zones, DoUntilQuorum will initially only call f for instances in three zones, and unless one of the termination conditions is reached earlier, will then call f for instances in a fourth zone approximately 4 seconds later, and then call f for instances in the final zone approximately 4 seconds after that (ie. roughly 8 seconds since the call to DoUntilQuorum began).

Cleanup

Any results from successful calls to f that are not returned by DoUntilQuorum will be passed to cleanupFunc, including when DoUntilQuorum returns an error or only returns a subset of successful results. cleanupFunc may be called both before and after DoUntilQuorum returns.

A call to f is considered successful if it returns a nil error.

Contexts

The context.Context passed to an invocation of f may be cancelled at any time if the result of that invocation of f will not be used.

DoUntilQuorum cancels the context.Context passed to each invocation of f before DoUntilQuorum returns.

func DoUntilQuorumWithoutSuccessfulContextCancellation

func DoUntilQuorumWithoutSuccessfulContextCancellation[T any](ctx context.Context, r ReplicationSet, cfg DoUntilQuorumConfig, f func(context.Context, *InstanceDesc, context.CancelCauseFunc) (T, error), cleanupFunc func(T)) ([]T, error)

DoUntilQuorumWithoutSuccessfulContextCancellation behaves the same as DoUntilQuorum, except it does not cancel the context.Context passed to invocations of f whose results are returned.

For example, this is useful in situations where DoUntilQuorumWithoutSuccessfulContextCancellation is used to establish a set of streams that will be used after DoUntilQuorumWithoutSuccessfulContextCancellation returns.

It is the caller's responsibility to ensure that either of the following are eventually true:

  • ctx is cancelled, or
  • the corresponding context.CancelFunc is called for all invocations of f whose results are returned by DoUntilQuorumWithoutSuccessfulContextCancellation

Failing to do this may result in a memory leak.

func GetCodec

func GetCodec() codec.Codec

GetCodec returns the codec used to encode and decode data being put by ring.

func GetInstanceAddr

func GetInstanceAddr(configAddr string, netInterfaces []string, logger log.Logger, enableInet6 bool) (string, error)

GetInstanceAddr returns the address to use to register the instance in the ring.

func GetInstancePort

func GetInstancePort(configPort, listenPort int) int

GetInstancePort returns the port to use to register the instance in the ring.

func GetPartitionRingCodec

func GetPartitionRingCodec() codec.Codec

func HasReplicationSetChanged

func HasReplicationSetChanged(before, after ReplicationSet) bool

HasReplicationSetChanged returns false if two replications sets are the same (with possibly different timestamps), true if they differ in any way (number of instances, instance states, tokens, zones, ...).

func HasReplicationSetChangedWithoutState

func HasReplicationSetChangedWithoutState(before, after ReplicationSet) bool

HasReplicationSetChangedWithoutState returns false if two replications sets are the same (with possibly different timestamps and instance states), true if they differ in any other way (number of instances, tokens, zones, ...).

func HasReplicationSetChangedWithoutStateOrAddr

func HasReplicationSetChangedWithoutStateOrAddr(before, after ReplicationSet) bool

Has HasReplicationSetChangedWithoutStateOrAddr returns false if two replications sets are the same (with possibly different timestamps, instance states, and ip addresses), true if they differ in any other way (number of instances, tokens, zones, ...).

func MergeTokens

func MergeTokens(instances [][]uint32) []uint32

MergeTokens takes in input multiple lists of tokens and returns a single list containing all tokens merged and sorted. Each input single list is required to have tokens already sorted.

func MergeTokensByZone

func MergeTokensByZone(zones map[string][][]uint32) map[string][]uint32

MergeTokensByZone is like MergeTokens but does it for each input zone.

func PartitionRingDescFactory

func PartitionRingDescFactory() proto.Message

PartitionRingDescFactory makes new PartitionRingDesc.

func ProtoDescFactory

func ProtoDescFactory() proto.Message

ProtoDescFactory makes new Descs

func WaitInstanceState

func WaitInstanceState(ctx context.Context, r ReadRing, instanceID string, state InstanceState) error

WaitInstanceState waits until the input instanceID is registered within the ring matching the provided state. A timeout should be provided within the context.

func WaitRingStability

func WaitRingStability(ctx context.Context, r ReadRing, op Operation, minStability, maxWaiting time.Duration) error

WaitRingStability monitors the ring topology for the provided operation and waits until it keeps stable for at least minStability.

func WaitRingTokensStability

func WaitRingTokensStability(ctx context.Context, r ReadRing, op Operation, minStability, maxWaiting time.Duration) error

WaitRingTokensStability waits for the Ring to be unchanged at least for minStability time period, excluding transitioning between allowed states (e.g. JOINING->ACTIVE if allowed by op). This can be used to avoid wasting resources on moving data around due to multiple changes in the Ring.

Types

type ActivePartitionBatchRing

type ActivePartitionBatchRing struct {
	// contains filtered or unexported fields
}

ActivePartitionBatchRing wraps PartitionRing and implements DoBatchRing to lookup ACTIVE partitions.

func NewActivePartitionBatchRing

func NewActivePartitionBatchRing(ring *PartitionRing) *ActivePartitionBatchRing

func (*ActivePartitionBatchRing) Get

func (r *ActivePartitionBatchRing) Get(key uint32, _ Operation, bufInstances []InstanceDesc, _, _ []string) (ReplicationSet, error)

Get implements DoBatchRing.Get.

func (*ActivePartitionBatchRing) InstancesCount

func (r *ActivePartitionBatchRing) InstancesCount() int

InstancesCount returns the number of active partitions in the ring.

InstancesCount implements DoBatchRing.InstancesCount.

func (*ActivePartitionBatchRing) ReplicationFactor

func (r *ActivePartitionBatchRing) ReplicationFactor() int

ReplicationFactor returns 1 as partitions replication factor: an entry (looked by key via Get()) is always stored in 1 and only 1 partition.

ReplicationFactor implements DoBatchRing.ReplicationFactor.

type AutoForgetDelegate

type AutoForgetDelegate struct {
	// contains filtered or unexported fields
}

AutoForgetDelegate automatically remove an instance from the ring if the last heartbeat is older than a configured period.

func NewAutoForgetDelegate

func NewAutoForgetDelegate(forgetPeriod time.Duration, next BasicLifecyclerDelegate, logger log.Logger) *AutoForgetDelegate

func (*AutoForgetDelegate) OnRingInstanceHeartbeat

func (d *AutoForgetDelegate) OnRingInstanceHeartbeat(lifecycler *BasicLifecycler, ringDesc *Desc, instanceDesc *InstanceDesc)

func (*AutoForgetDelegate) OnRingInstanceRegister

func (d *AutoForgetDelegate) OnRingInstanceRegister(lifecycler *BasicLifecycler, ringDesc Desc, instanceExists bool, instanceID string, instanceDesc InstanceDesc) (InstanceState, Tokens)

func (*AutoForgetDelegate) OnRingInstanceStopping

func (d *AutoForgetDelegate) OnRingInstanceStopping(lifecycler *BasicLifecycler)

func (*AutoForgetDelegate) OnRingInstanceTokens

func (d *AutoForgetDelegate) OnRingInstanceTokens(lifecycler *BasicLifecycler, tokens Tokens)

type BasicLifecycler

type BasicLifecycler struct {
	*services.BasicService
	// contains filtered or unexported fields
}

BasicLifecycler is a Service that is responsible for publishing changes to a ring for a single instance. It accepts a delegate that can handle lifecycle events, and should be used to build higher level lifecyclers. Unlike Lifecycler, BasicLifecycler does not change instance state internally. Rather, it's the delegate's responsibility to call BasicLifecycler.ChangeState.

func NewBasicLifecycler

func NewBasicLifecycler(cfg BasicLifecyclerConfig, ringName, ringKey string, store kv.Client, delegate BasicLifecyclerDelegate, logger log.Logger, reg prometheus.Registerer) (*BasicLifecycler, error)

NewBasicLifecycler makes a new BasicLifecycler.

func (*BasicLifecycler) ChangeState

func (l *BasicLifecycler) ChangeState(ctx context.Context, state InstanceState) error

func (*BasicLifecycler) GetInstanceAddr

func (l *BasicLifecycler) GetInstanceAddr() string

func (*BasicLifecycler) GetInstanceID

func (l *BasicLifecycler) GetInstanceID() string

func (*BasicLifecycler) GetInstanceZone

func (l *BasicLifecycler) GetInstanceZone() string

func (*BasicLifecycler) GetRegisteredAt

func (l *BasicLifecycler) GetRegisteredAt() time.Time

GetRegisteredAt returns the timestamp when the instance has been registered to the ring or a zero value if the lifecycler hasn't been started yet or was already registered and its timestamp is unknown.

func (*BasicLifecycler) GetState

func (l *BasicLifecycler) GetState() InstanceState

func (*BasicLifecycler) GetTokenGenerator

func (l *BasicLifecycler) GetTokenGenerator() TokenGenerator

func (*BasicLifecycler) GetTokens

func (l *BasicLifecycler) GetTokens() Tokens

func (*BasicLifecycler) IsRegistered

func (l *BasicLifecycler) IsRegistered() bool

IsRegistered returns whether the instance is currently registered within the ring.

func (*BasicLifecycler) ServeHTTP

func (l *BasicLifecycler) ServeHTTP(w http.ResponseWriter, req *http.Request)

func (*BasicLifecycler) SetKeepInstanceInTheRingOnShutdown

func (l *BasicLifecycler) SetKeepInstanceInTheRingOnShutdown(enabled bool)

SetKeepInstanceInTheRingOnShutdown enables/disables unregistering on shutdown.

func (*BasicLifecycler) ShouldKeepInstanceInTheRingOnShutdown

func (l *BasicLifecycler) ShouldKeepInstanceInTheRingOnShutdown() bool

ShouldKeepInstanceInTheRingOnShutdown returns if the instance should be kept in the ring or unregistered on shutdown.

type BasicLifecyclerConfig

type BasicLifecyclerConfig struct {
	// ID is the instance unique ID.
	ID string

	// Addr is the instance address, in the form "address:port".
	Addr string

	// Zone is the instance availability zone. Can be an empty string
	// if zone awareness is unused.
	Zone string

	HeartbeatPeriod     time.Duration
	HeartbeatTimeout    time.Duration
	TokensObservePeriod time.Duration
	NumTokens           int

	// If true lifecycler doesn't unregister instance from the ring when it's stopping. Default value is false,
	// which means unregistering.
	KeepInstanceInTheRingOnShutdown bool

	// If set, specifies the TokenGenerator implementation that will be used for generating tokens.
	// Default value is nil, which means that RandomTokenGenerator is used.
	RingTokenGenerator TokenGenerator
}

type BasicLifecyclerDelegate

type BasicLifecyclerDelegate interface {
	// OnRingInstanceRegister is called while the lifecycler is registering the
	// instance within the ring and should return the state and set of tokens to
	// use for the instance itself.
	OnRingInstanceRegister(lifecycler *BasicLifecycler, ringDesc Desc, instanceExists bool, instanceID string, instanceDesc InstanceDesc) (InstanceState, Tokens)

	// OnRingInstanceTokens is called once the instance tokens are set and are
	// stable within the ring (honoring the observe period, if set).
	OnRingInstanceTokens(lifecycler *BasicLifecycler, tokens Tokens)

	// OnRingInstanceStopping is called while the lifecycler is stopping. The lifecycler
	// will continue to hearbeat the ring the this function is executing and will proceed
	// to unregister the instance from the ring only after this function has returned.
	OnRingInstanceStopping(lifecycler *BasicLifecycler)

	// OnRingInstanceHeartbeat is called while the instance is updating its heartbeat
	// in the ring.
	OnRingInstanceHeartbeat(lifecycler *BasicLifecycler, ringDesc *Desc, instanceDesc *InstanceDesc)
}

type BasicLifecyclerMetrics

type BasicLifecyclerMetrics struct {
	// contains filtered or unexported fields
}

func NewBasicLifecyclerMetrics

func NewBasicLifecyclerMetrics(ringName string, reg prometheus.Registerer) *BasicLifecyclerMetrics

type ByAddr

type ByAddr []InstanceDesc

ByAddr is a sortable list of InstanceDesc.

func (ByAddr) Len

func (ts ByAddr) Len() int

func (ByAddr) Less

func (ts ByAddr) Less(i, j int) bool

func (ByAddr) Swap

func (ts ByAddr) Swap(i, j int)

type ByID

type ByID []InstanceDesc

ByID is a sortable list of InstanceDesc.

func (ByID) Len

func (ts ByID) Len() int

func (ByID) Less

func (ts ByID) Less(i, j int) bool

func (ByID) Swap

func (ts ByID) Swap(i, j int)

type CompareResult

type CompareResult int
const (
	Equal                       CompareResult = iota // Both rings contain same exact instances.
	EqualButStatesAndTimestamps                      // Both rings contain the same instances with the same data except states and timestamps (may differ).
	Different                                        // Rings have different set of instances, or their information don't match.
)

CompareResult responses

type Config

type Config struct {
	KVStore              kv.Config              `yaml:"kvstore"`
	HeartbeatTimeout     time.Duration          `yaml:"heartbeat_timeout" category:"advanced"`
	ReplicationFactor    int                    `yaml:"replication_factor"`
	ZoneAwarenessEnabled bool                   `yaml:"zone_awareness_enabled"`
	ExcludedZones        flagext.StringSliceCSV `yaml:"excluded_zones" category:"advanced"`

	// Whether the shuffle-sharding subring cache is disabled. This option is set
	// internally and never exposed to the user.
	SubringCacheDisabled bool `yaml:"-"`
}

Config for a Ring

func (*Config) RegisterFlags

func (cfg *Config) RegisterFlags(f *flag.FlagSet)

RegisterFlags adds the flags required to config this to the given FlagSet with a specified prefix

func (*Config) RegisterFlagsWithPrefix

func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)

RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet with a specified prefix

type Desc

type Desc struct {
	Ingesters map[string]InstanceDesc `` /* 149-byte string literal not displayed */
}

Desc is the top-level type used to model a ring, containing information for individual instances.

func GetOrCreateRingDesc

func GetOrCreateRingDesc(d interface{}) *Desc

func NewDesc

func NewDesc() *Desc

NewDesc returns an empty ring.Desc

func (*Desc) AddIngester

func (d *Desc) AddIngester(id, addr, zone string, tokens []uint32, state InstanceState, registeredAt time.Time) InstanceDesc

AddIngester adds the given ingester to the ring. Ingester will only use supplied tokens, any other tokens are removed.

func (*Desc) ClaimTokens

func (d *Desc) ClaimTokens(from, to string) Tokens

ClaimTokens transfers all the tokens from one ingester to another, returning the claimed token. This method assumes that Ring is in the correct state, 'to' ingester has no tokens anywhere. Tokens list must be sorted properly. If all of this is true, everything will be fine.

func (*Desc) Clone

func (d *Desc) Clone() memberlist.Mergeable

Clone returns a deep copy of the ring state.

func (*Desc) CountTokens

func (r *Desc) CountTokens() map[string]int64

CountTokens returns the number tokens within the range for each instance. In case of zone-awareness, this method takes into account only tokens of the same zone. More precisely, for each instance only the distance between its tokens and tokens of the instances from the same zone will be considered.

func (*Desc) Descriptor

func (*Desc) Descriptor() ([]byte, []int)

func (*Desc) Equal

func (this *Desc) Equal(that interface{}) bool

func (*Desc) FindIngestersByState

func (d *Desc) FindIngestersByState(state InstanceState) []InstanceDesc

FindIngestersByState returns the list of ingesters in the given state

func (*Desc) GetIngesters

func (m *Desc) GetIngesters() map[string]InstanceDesc

func (*Desc) GetTokens

func (d *Desc) GetTokens() []uint32

GetTokens returns sorted list of tokens owned by all instances within the ring.

func (*Desc) GoString

func (this *Desc) GoString() string

func (*Desc) IsReady

func (d *Desc) IsReady(now time.Time, heartbeatTimeout time.Duration) error

IsReady returns no error when all instance are ACTIVE and healthy, and the ring has some tokens.

func (*Desc) Marshal

func (m *Desc) Marshal() (dAtA []byte, err error)

func (*Desc) MarshalTo

func (m *Desc) MarshalTo(dAtA []byte) (int, error)

func (*Desc) MarshalToSizedBuffer

func (m *Desc) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*Desc) Merge

func (d *Desc) Merge(mergeable memberlist.Mergeable, localCAS bool) (memberlist.Mergeable, error)

Merge merges other ring into this one. Returns sub-ring that represents the change, and can be sent out to other clients.

This merge function depends on the timestamp of the ingester. For each ingester, it will choose more recent state from the two rings, and put that into this ring. There is one exception: we accept LEFT state even if Timestamp hasn't changed.

localCAS flag tells the merge that it can use incoming ring as a full state, and detect missing ingesters based on it. Ingesters from incoming ring will cause ingester to be marked as LEFT and gossiped about.

If multiple ingesters end up owning the same tokens, Merge will do token conflict resolution (see resolveConflicts).

This method is part of memberlist.Mergeable interface, and is only used by gossiping ring.

The receiver must be normalised, that is, the token lists must sorted and not contain duplicates. The function guarantees that the receiver will be left in this normalised state, so multiple subsequent Merge calls are valid usage.

The Mergeable passed as the parameter does not need to be normalised.

Note: This method modifies d and mergeable to reduce allocations and copies.

func (*Desc) MergeContent

func (d *Desc) MergeContent() []string

MergeContent describes content of this Mergeable. Ring simply returns list of ingesters that it includes.

func (*Desc) ProtoMessage

func (*Desc) ProtoMessage()

func (*Desc) RemoveIngester

func (d *Desc) RemoveIngester(id string)

RemoveIngester removes the given ingester and all its tokens.

func (*Desc) RemoveTombstones

func (d *Desc) RemoveTombstones(limit time.Time) (total, removed int)

RemoveTombstones removes LEFT ingesters older than given time limit. If time limit is zero, remove all LEFT ingesters.

func (*Desc) Reset

func (m *Desc) Reset()

func (*Desc) RingCompare

func (d *Desc) RingCompare(o *Desc) CompareResult

RingCompare compares this ring against another one and returns one of Equal, EqualButStatesAndTimestamps or Different.

func (*Desc) Size

func (m *Desc) Size() (n int)

func (*Desc) String

func (this *Desc) String() string

func (*Desc) TokensFor

func (d *Desc) TokensFor(id string) (myTokens, allTokens Tokens)

TokensFor return all ring tokens and tokens for the input provided ID. Returned tokens are guaranteed to be sorted.

func (*Desc) Unmarshal

func (m *Desc) Unmarshal(dAtA []byte) error

func (*Desc) XXX_DiscardUnknown

func (m *Desc) XXX_DiscardUnknown()

func (*Desc) XXX_Marshal

func (m *Desc) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*Desc) XXX_Merge

func (m *Desc) XXX_Merge(src proto.Message)

func (*Desc) XXX_Size

func (m *Desc) XXX_Size() int

func (*Desc) XXX_Unmarshal

func (m *Desc) XXX_Unmarshal(b []byte) error

type DoBatchOptions

type DoBatchOptions struct {
	// Cleanup is always called, either on an error before starting the batches or after they are all finished.
	// If nil, a noop will be called.
	Cleanup func()

	// IsClientError classifies errors returned by `callback()` into client or server errors.
	// See `batchTracker.record()` function for details about how errors are combined into final error returned by DoBatchWithClientError.
	// If nil, a default implementation is used that classifies grpc errors containing status codes 4xx as client errors.
	IsClientError func(error) bool

	// Go will be used to spawn the callback goroutines, and can be used to use a worker pool like concurrency.ReusableGoroutinesPool.
	Go func(func())
}

DoBatchOptions defines options for the DoBatchWithOptions call. Zero value options are valid, as well as individual zero valued fields.

type DoBatchRing

type DoBatchRing interface {
	// Get returns a ReplicationSet containing the instances to which the input key should be sharded to
	// for the input Operation.
	//
	// The input buffers may be referenced in the returned ReplicationSet. This means that it's unsafe to call
	// Get() multiple times passing the same buffers if ReplicationSet is retained between two different Get()
	// calls. In this cas, you can pass nil buffers.
	Get(key uint32, op Operation, bufInstances []InstanceDesc, bufStrings1, bufStrings2 []string) (ReplicationSet, error)

	// ReplicationFactor returns the number of instances each key is expected to be sharded to.
	ReplicationFactor() int

	// InstancesCount returns the number of instances in the ring eligible to get any key sharded to.
	InstancesCount() int
}

DoBatchRing defines the interface required by a ring implementation to use DoBatch() and DoBatchWithOptions().

type DoUntilQuorumConfig

type DoUntilQuorumConfig struct {
	// MinimizeRequests enables request minimization.
	// See docs for DoUntilQuorum for more information.
	MinimizeRequests bool

	// HedgingDelay configures the delay used before initiating hedged requests.
	// Hedging is only enabled if HedgingDelay is non-zero and MinimizeRequests is true.
	// See docs for DoUntilQuorum for more information.
	HedgingDelay time.Duration

	// Logger to emit log lines and span events to during the call.
	// Can be nil, in which case no log lines or span events are emitted.
	Logger *spanlogger.SpanLogger

	// IsTerminalError allows DoUntilQuorum to detect terminal errors generated by requests.
	//
	// If IsTerminalError is non-nil and a request returns an error for which IsTerminalError returns true,
	// DoUntilQuorum will immediately cancel any inflight requests and return the error.
	//
	// This is useful to cancel DoUntilQuorum when an unrecoverable error occurs and it does not
	// make sense to attempt requests to other instances. For example, if a client-side limit on the
	// total response size across all instances is reached, making further requests to other
	// instances would not be worthwhile.
	IsTerminalError func(error) bool

	// ZoneSorter orders the provided zones in preference order, for use when MinimizeRequests is true
	// and DoUntilQuorum is operating in zone-aware mode. If not set, zones will be used in a
	// randomly-selected order.
	//
	// Earlier zones will be used first.
	// The function can modify the provided slice of zones in place.
	// All provided zones must be returned exactly once.
	//
	// This can be used to prioritise zones that are more likely to succeed, or are expected to complete
	// faster, for example.
	ZoneSorter ZoneSorter
}

func (DoUntilQuorumConfig) Validate

func (c DoUntilQuorumConfig) Validate() error

type FlushTransferer

type FlushTransferer interface {
	Flush()
	TransferOut(ctx context.Context) error
}

FlushTransferer controls the shutdown of an instance in the ring. Methods on this interface are called when lifecycler is stopping. At that point, it no longer runs the "actor loop", but it keeps updating heartbeat in the ring. Ring entry is in LEAVING state. After calling TransferOut and then Flush, lifecycler stops.

type InstanceDesc

type InstanceDesc struct {
	Addr string `protobuf:"bytes,1,opt,name=addr,proto3" json:"addr,omitempty"`
	// Unix timestamp (with seconds precision) of the last heartbeat sent
	// by this instance.
	Timestamp int64         `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
	State     InstanceState `protobuf:"varint,3,opt,name=state,proto3,enum=ring.InstanceState" json:"state,omitempty"`
	Tokens    []uint32      `protobuf:"varint,6,rep,packed,name=tokens,proto3" json:"tokens,omitempty"`
	Zone      string        `protobuf:"bytes,7,opt,name=zone,proto3" json:"zone,omitempty"`
	// Unix timestamp (with seconds precision) of when the instance has been registered
	// to the ring. This field has not been called "joined_timestamp" intentionally, in order
	// to not introduce any misunderstanding with the instance's "joining" state.
	//
	// This field is used to find out subset of instances that could have possibly owned a
	// specific token in the past. Because of this, it's important that either this timestamp
	// is set to the real time the instance has been registered to the ring or it's left
	// 0 (which means unknown).
	//
	// When an instance is already registered in the ring with a value of 0 it's NOT safe to
	// update the timestamp to "now" because it would break the contract, given the instance
	// was already registered before "now". If unknown (0), it should be left as is, and the
	// code will properly deal with that.
	RegisteredTimestamp int64 `protobuf:"varint,8,opt,name=registered_timestamp,json=registeredTimestamp,proto3" json:"registered_timestamp,omitempty"`
	// ID of the instance. This value is the same as the key in the ingesters map in Desc.
	Id string `protobuf:"bytes,9,opt,name=id,proto3" json:"id,omitempty"`
}

InstanceDesc is the top-level type used to model per-instance information in a ring.

func MakeBuffersForGet

func MakeBuffersForGet() (bufDescs []InstanceDesc, bufHosts, bufZones []string)

MakeBuffersForGet returns buffers to use with Ring.Get().

func (*InstanceDesc) Descriptor

func (*InstanceDesc) Descriptor() ([]byte, []int)

func (*InstanceDesc) Equal

func (this *InstanceDesc) Equal(that interface{}) bool

func (*InstanceDesc) GetAddr

func (m *InstanceDesc) GetAddr() string

func (*InstanceDesc) GetId

func (m *InstanceDesc) GetId() string

func (*InstanceDesc) GetRegisteredAt

func (i *InstanceDesc) GetRegisteredAt() time.Time

GetRegisteredAt returns the timestamp when the instance has been registered to the ring or a zero value if unknown.

func (*InstanceDesc) GetRegisteredTimestamp

func (m *InstanceDesc) GetRegisteredTimestamp() int64

func (*InstanceDesc) GetState

func (m *InstanceDesc) GetState() InstanceState

func (*InstanceDesc) GetTimestamp

func (m *InstanceDesc) GetTimestamp() int64

func (*InstanceDesc) GetTokens

func (m *InstanceDesc) GetTokens() []uint32

func (*InstanceDesc) GetZone

func (m *InstanceDesc) GetZone() string

func (*InstanceDesc) GoString

func (this *InstanceDesc) GoString() string

func (*InstanceDesc) IsHealthy

func (i *InstanceDesc) IsHealthy(op Operation, heartbeatTimeout time.Duration, now time.Time) bool

func (*InstanceDesc) IsHeartbeatHealthy

func (i *InstanceDesc) IsHeartbeatHealthy(heartbeatTimeout time.Duration, now time.Time) bool

IsHeartbeatHealthy returns whether the heartbeat timestamp for the ingester is within the specified timeout period. A timeout of zero disables the timeout; the heartbeat is ignored.

func (*InstanceDesc) IsReady

func (i *InstanceDesc) IsReady(now time.Time, heartbeatTimeout time.Duration) error

IsReady returns no error if the instance is ACTIVE and healthy.

func (*InstanceDesc) Marshal

func (m *InstanceDesc) Marshal() (dAtA []byte, err error)

func (*InstanceDesc) MarshalTo

func (m *InstanceDesc) MarshalTo(dAtA []byte) (int, error)

func (*InstanceDesc) MarshalToSizedBuffer

func (m *InstanceDesc) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*InstanceDesc) ProtoMessage

func (*InstanceDesc) ProtoMessage()

func (*InstanceDesc) Reset

func (m *InstanceDesc) Reset()

func (*InstanceDesc) Size

func (m *InstanceDesc) Size() (n int)

func (*InstanceDesc) String

func (this *InstanceDesc) String() string

func (*InstanceDesc) Unmarshal

func (m *InstanceDesc) Unmarshal(dAtA []byte) error

func (*InstanceDesc) XXX_DiscardUnknown

func (m *InstanceDesc) XXX_DiscardUnknown()

func (*InstanceDesc) XXX_Marshal

func (m *InstanceDesc) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*InstanceDesc) XXX_Merge

func (m *InstanceDesc) XXX_Merge(src proto.Message)

func (*InstanceDesc) XXX_Size

func (m *InstanceDesc) XXX_Size() int

func (*InstanceDesc) XXX_Unmarshal

func (m *InstanceDesc) XXX_Unmarshal(b []byte) error

type InstanceRegisterDelegate

type InstanceRegisterDelegate struct {
	// contains filtered or unexported fields
}

InstanceRegisterDelegate generates a new set of tokenCount tokens on instance register, and returns the registerState InstanceState.

func NewInstanceRegisterDelegate

func NewInstanceRegisterDelegate(state InstanceState, tokenCount int) InstanceRegisterDelegate

func (InstanceRegisterDelegate) OnRingInstanceHeartbeat

func (d InstanceRegisterDelegate) OnRingInstanceHeartbeat(*BasicLifecycler, *Desc, *InstanceDesc)

func (InstanceRegisterDelegate) OnRingInstanceRegister

func (d InstanceRegisterDelegate) OnRingInstanceRegister(l *BasicLifecycler, ringDesc Desc, instanceExists bool, _ string, instanceDesc InstanceDesc) (InstanceState, Tokens)

func (InstanceRegisterDelegate) OnRingInstanceStopping

func (d InstanceRegisterDelegate) OnRingInstanceStopping(*BasicLifecycler)

func (InstanceRegisterDelegate) OnRingInstanceTokens

func (d InstanceRegisterDelegate) OnRingInstanceTokens(*BasicLifecycler, Tokens)

type InstanceState

type InstanceState int32
const (
	ACTIVE  InstanceState = 0
	LEAVING InstanceState = 1
	PENDING InstanceState = 2
	JOINING InstanceState = 3
	// This state is only used by gossiping code to distribute information about
	// instances that have been removed from the ring. Ring users should not use it directly.
	LEFT InstanceState = 4
)

func (InstanceState) EnumDescriptor

func (InstanceState) EnumDescriptor() ([]byte, []int)

func (InstanceState) String

func (x InstanceState) String() string

type LeaveOnStoppingDelegate

type LeaveOnStoppingDelegate struct {
	// contains filtered or unexported fields
}

func NewLeaveOnStoppingDelegate

func NewLeaveOnStoppingDelegate(next BasicLifecyclerDelegate, logger log.Logger) *LeaveOnStoppingDelegate

func (*LeaveOnStoppingDelegate) OnRingInstanceHeartbeat

func (d *LeaveOnStoppingDelegate) OnRingInstanceHeartbeat(lifecycler *BasicLifecycler, ringDesc *Desc, instanceDesc *InstanceDesc)

func (*LeaveOnStoppingDelegate) OnRingInstanceRegister

func (d *LeaveOnStoppingDelegate) OnRingInstanceRegister(lifecycler *BasicLifecycler, ringDesc Desc, instanceExists bool, instanceID string, instanceDesc InstanceDesc) (InstanceState, Tokens)

func (*LeaveOnStoppingDelegate) OnRingInstanceStopping

func (d *LeaveOnStoppingDelegate) OnRingInstanceStopping(lifecycler *BasicLifecycler)

func (*LeaveOnStoppingDelegate) OnRingInstanceTokens

func (d *LeaveOnStoppingDelegate) OnRingInstanceTokens(lifecycler *BasicLifecycler, tokens Tokens)

type Lifecycler

type Lifecycler struct {
	*services.BasicService

	KVStore kv.Client

	// These values are initialised at startup, and never change
	ID       string
	Addr     string
	RingName string
	RingKey  string
	Zone     string
	// contains filtered or unexported fields
}

Lifecycler is a Service that is responsible for publishing changes to a ring for a single instance.

func NewLifecycler

func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringName, ringKey string, flushOnShutdown bool, logger log.Logger, reg prometheus.Registerer) (*Lifecycler, error)

NewLifecycler creates new Lifecycler. It must be started via StartAsync.

func (*Lifecycler) ChangeState

func (i *Lifecycler) ChangeState(ctx context.Context, state InstanceState) error

ChangeState of the ingester, for use off of the loop() goroutine.

func (*Lifecycler) CheckReady

func (i *Lifecycler) CheckReady(ctx context.Context) error

CheckReady is used to rate limit the number of ingesters that can be coming or going at any one time, by only returning true if all ingesters are active. The state latches: once we have gone ready we don't go un-ready

func (*Lifecycler) ClaimTokensFor

func (i *Lifecycler) ClaimTokensFor(ctx context.Context, ingesterID string) error

ClaimTokensFor takes all the tokens for the supplied ingester and assigns them to this ingester.

For this method to work correctly (especially when using gossiping), source ingester (specified by ingesterID) must be in the LEAVING state, otherwise ring's merge function may detect token conflict and assign token to the wrong ingester. While we could check for that state here, when this method is called, transfers have already finished -- it's better to check for this *before* transfers start.

func (*Lifecycler) ClearTokensOnShutdown

func (i *Lifecycler) ClearTokensOnShutdown() bool

ClearTokensOnShutdown returns if persisted tokens should be cleared on shutdown.

func (*Lifecycler) FlushOnShutdown

func (i *Lifecycler) FlushOnShutdown() bool

FlushOnShutdown returns if flushing is enabled if transfer fails on a shutdown.

func (*Lifecycler) GetState

func (i *Lifecycler) GetState() InstanceState

GetState returns the state of this ingester.

func (*Lifecycler) HealthyInstancesCount

func (i *Lifecycler) HealthyInstancesCount() int

HealthyInstancesCount returns the number of healthy instances for the Write operation in the ring, updated during the last heartbeat period.

func (*Lifecycler) InstancesCount

func (i *Lifecycler) InstancesCount() int

InstancesCount returns the total number of instances in the ring, updated during the last heartbeat period.

func (*Lifecycler) InstancesInZoneCount

func (i *Lifecycler) InstancesInZoneCount() int

InstancesInZoneCount returns the number of instances in the ring that are registered in this lifecycler's zone, updated during the last heartbeat period.

func (*Lifecycler) ServeHTTP

func (i *Lifecycler) ServeHTTP(w http.ResponseWriter, req *http.Request)

func (*Lifecycler) SetClearTokensOnShutdown

func (i *Lifecycler) SetClearTokensOnShutdown(enabled bool)

SetClearTokensOnShutdown enables/disables deletions of tokens on shutdown. Set to `true` in case one wants to clear tokens on shutdown which are otherwise persisted, e.g. useful in custom shutdown handlers.

func (*Lifecycler) SetFlushOnShutdown

func (i *Lifecycler) SetFlushOnShutdown(flushOnShutdown bool)

SetFlushOnShutdown enables/disables flush on shutdown if transfer fails. Passing 'true' enables it, and 'false' disabled it.

func (*Lifecycler) SetUnregisterOnShutdown

func (i *Lifecycler) SetUnregisterOnShutdown(enabled bool)

SetUnregisterOnShutdown enables/disables unregistering on shutdown.

func (*Lifecycler) ShouldUnregisterOnShutdown

func (i *Lifecycler) ShouldUnregisterOnShutdown() bool

ShouldUnregisterOnShutdown returns if unregistering should be skipped on shutdown.

func (*Lifecycler) ZonesCount

func (i *Lifecycler) ZonesCount() int

ZonesCount returns the number of zones for which there's at least 1 instance registered in the ring.

type LifecyclerConfig

type LifecyclerConfig struct {
	RingConfig Config `yaml:"ring"`

	// Config for the ingester lifecycle control
	NumTokens        int           `yaml:"num_tokens" category:"advanced"`
	HeartbeatPeriod  time.Duration `yaml:"heartbeat_period" category:"advanced"`
	HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout" category:"advanced"`
	ObservePeriod    time.Duration `yaml:"observe_period" category:"advanced"`
	JoinAfter        time.Duration `yaml:"join_after" category:"advanced"`
	MinReadyDuration time.Duration `yaml:"min_ready_duration" category:"advanced"`
	InfNames         []string      `yaml:"interface_names" doc:"default=[<private network interfaces>]"`
	EnableInet6      bool          `yaml:"enable_inet6" category:"advanced"`

	// FinalSleep's default value can be overridden by
	// setting it before calling RegisterFlags or RegisterFlagsWithPrefix.
	FinalSleep               time.Duration `yaml:"final_sleep" category:"advanced"`
	TokensFilePath           string        `yaml:"tokens_file_path"`
	Zone                     string        `yaml:"availability_zone"`
	UnregisterOnShutdown     bool          `yaml:"unregister_on_shutdown" category:"advanced"`
	ReadinessCheckRingHealth bool          `yaml:"readiness_check_ring_health" category:"advanced"`

	// For testing, you can override the address and ID of this ingester
	Addr string `yaml:"address" category:"advanced"`
	Port int    `category:"advanced"`
	ID   string `doc:"default=<hostname>" category:"advanced"`

	// Injected internally
	ListenPort int `yaml:"-"`

	// If set, specifies the TokenGenerator implementation that will be used for generating tokens.
	// Default value is nil, which means that RandomTokenGenerator is used.
	RingTokenGenerator TokenGenerator `yaml:"-"`
}

LifecyclerConfig is the config to build a Lifecycler.

func (*LifecyclerConfig) RegisterFlags

func (cfg *LifecyclerConfig) RegisterFlags(f *flag.FlagSet, logger log.Logger)

RegisterFlags adds the flags required to config this to the given FlagSet. The default values of some flags can be changed; see docs of LifecyclerConfig.

func (*LifecyclerConfig) RegisterFlagsWithPrefix

func (cfg *LifecyclerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet, logger log.Logger)

RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet. The default values of some flags can be changed; see docs of LifecyclerConfig.

func (*LifecyclerConfig) Validate

func (cfg *LifecyclerConfig) Validate() error

Validate checks the consistency of LifecyclerConfig, and fails if this cannot be achieved.

type LifecyclerMetrics

type LifecyclerMetrics struct {
	// contains filtered or unexported fields
}

func NewLifecyclerMetrics

func NewLifecyclerMetrics(ringName string, reg prometheus.Registerer) *LifecyclerMetrics

type NoopFlushTransferer

type NoopFlushTransferer struct{}

NoopFlushTransferer is a FlushTransferer which does nothing and can be used in cases we don't need one

func NewNoopFlushTransferer

func NewNoopFlushTransferer() *NoopFlushTransferer

NewNoopFlushTransferer makes a new NoopFlushTransferer

func (*NoopFlushTransferer) Flush

func (t *NoopFlushTransferer) Flush()

Flush is a noop

func (*NoopFlushTransferer) TransferOut

func (t *NoopFlushTransferer) TransferOut(_ context.Context) error

TransferOut is a noop

type Operation

type Operation uint32

Operation describes which instances can be included in the replica set, based on their state.

Implemented as bitmap, with upper 16-bits used for encoding extendReplicaSet, and lower 16-bits used for encoding healthy states.

func NewOp

func NewOp(healthyStates []InstanceState, shouldExtendReplicaSet func(s InstanceState) bool) Operation

NewOp constructs new Operation with given "healthy" states for operation, and optional function to extend replica set. Result of calling shouldExtendReplicaSet is cached.

func (Operation) IsInstanceInStateHealthy

func (op Operation) IsInstanceInStateHealthy(s InstanceState) bool

IsInstanceInStateHealthy is used during "filtering" phase to remove undesired instances based on their state.

func (Operation) ShouldExtendReplicaSetOnState

func (op Operation) ShouldExtendReplicaSetOnState(s InstanceState) bool

ShouldExtendReplicaSetOnState returns true if given a state of instance that's going to be added to the replica set, the replica set size should be extended by 1 more instance for the given operation.

type OwnerDesc

type OwnerDesc struct {
	// Partition that belongs to this owner. A owner can own only 1 partition, but 1 partition can be
	// owned by multiple owners.
	OwnedPartition int32 `protobuf:"varint,1,opt,name=ownedPartition,proto3" json:"ownedPartition,omitempty"`
	// The owner state. This field is used to propagate deletions via memberlist.
	State OwnerState `protobuf:"varint,2,opt,name=state,proto3,enum=ring.OwnerState" json:"state,omitempty"`
	// Unix timestamp (with seconds precision) of when the data for the owner has been updated the last time.
	// This timestamp is used to resolve conflicts when merging updates via memberlist (the most recent
	// update wins).
	UpdatedTimestamp int64 `protobuf:"varint,3,opt,name=updatedTimestamp,proto3" json:"updatedTimestamp,omitempty"`
}

OwnerDesc holds the information of a partition owner.

func (*OwnerDesc) Descriptor

func (*OwnerDesc) Descriptor() ([]byte, []int)

func (*OwnerDesc) Equal

func (this *OwnerDesc) Equal(that interface{}) bool

func (*OwnerDesc) GetOwnedPartition

func (m *OwnerDesc) GetOwnedPartition() int32

func (*OwnerDesc) GetState

func (m *OwnerDesc) GetState() OwnerState

func (*OwnerDesc) GetUpdatedTimestamp

func (m *OwnerDesc) GetUpdatedTimestamp() int64

func (*OwnerDesc) GoString

func (this *OwnerDesc) GoString() string

func (*OwnerDesc) Marshal

func (m *OwnerDesc) Marshal() (dAtA []byte, err error)

func (*OwnerDesc) MarshalTo

func (m *OwnerDesc) MarshalTo(dAtA []byte) (int, error)

func (*OwnerDesc) MarshalToSizedBuffer

func (m *OwnerDesc) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*OwnerDesc) ProtoMessage

func (*OwnerDesc) ProtoMessage()

func (*OwnerDesc) Reset

func (m *OwnerDesc) Reset()

func (*OwnerDesc) Size

func (m *OwnerDesc) Size() (n int)

func (*OwnerDesc) String

func (this *OwnerDesc) String() string

func (*OwnerDesc) Unmarshal

func (m *OwnerDesc) Unmarshal(dAtA []byte) error

func (*OwnerDesc) XXX_DiscardUnknown

func (m *OwnerDesc) XXX_DiscardUnknown()

func (*OwnerDesc) XXX_Marshal

func (m *OwnerDesc) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*OwnerDesc) XXX_Merge

func (m *OwnerDesc) XXX_Merge(src proto.Message)

func (*OwnerDesc) XXX_Size

func (m *OwnerDesc) XXX_Size() int

func (*OwnerDesc) XXX_Unmarshal

func (m *OwnerDesc) XXX_Unmarshal(b []byte) error

type OwnerState

type OwnerState int32
const (
	OwnerUnknown OwnerState = 0
	// Active owner.
	OwnerActive OwnerState = 1
	// Deleted owner. This state is not visible to ring clients: it's only used to propagate
	// via memberlist the information that a owner has been deleted. Owners in this state
	// are removed before client can see them.
	OwnerDeleted OwnerState = 2
)

func (OwnerState) EnumDescriptor

func (OwnerState) EnumDescriptor() ([]byte, []int)

func (OwnerState) String

func (x OwnerState) String() string

type PartitionDesc

type PartitionDesc struct {
	// The partition ID. This value is the same as the key in the partitions map in PartitionRingDesc.
	Id int32 `protobuf:"varint,4,opt,name=id,proto3" json:"id,omitempty"`
	// Unique tokens, generated with deterministic token generator. Tokens MUST be immutable:
	// if tokens get changed, the change will not be propagated via memberlist.
	Tokens []uint32 `protobuf:"varint,1,rep,packed,name=tokens,proto3" json:"tokens,omitempty"`
	// The state of the partition.
	State PartitionState `protobuf:"varint,2,opt,name=state,proto3,enum=ring.PartitionState" json:"state,omitempty"`
	// Unix timestamp (with seconds precision) of when has the state changed last time for this partition.
	StateTimestamp int64 `protobuf:"varint,3,opt,name=stateTimestamp,proto3" json:"stateTimestamp,omitempty"`
}

PartitionDesc holds the state of a single partition.

func (*PartitionDesc) Clone

func (m *PartitionDesc) Clone() PartitionDesc

func (*PartitionDesc) Descriptor

func (*PartitionDesc) Descriptor() ([]byte, []int)

func (*PartitionDesc) Equal

func (this *PartitionDesc) Equal(that interface{}) bool

func (*PartitionDesc) GetId

func (m *PartitionDesc) GetId() int32

func (*PartitionDesc) GetState

func (m *PartitionDesc) GetState() PartitionState

func (*PartitionDesc) GetStateTime

func (m *PartitionDesc) GetStateTime() time.Time

func (*PartitionDesc) GetStateTimestamp

func (m *PartitionDesc) GetStateTimestamp() int64

func (*PartitionDesc) GetTokens

func (m *PartitionDesc) GetTokens() []uint32

func (*PartitionDesc) GoString

func (this *PartitionDesc) GoString() string

func (*PartitionDesc) IsActive

func (m *PartitionDesc) IsActive() bool

func (*PartitionDesc) IsInactive

func (m *PartitionDesc) IsInactive() bool

func (*PartitionDesc) IsInactiveSince

func (m *PartitionDesc) IsInactiveSince(since time.Time) bool

func (*PartitionDesc) IsPending

func (m *PartitionDesc) IsPending() bool

func (*PartitionDesc) Marshal

func (m *PartitionDesc) Marshal() (dAtA []byte, err error)

func (*PartitionDesc) MarshalTo

func (m *PartitionDesc) MarshalTo(dAtA []byte) (int, error)

func (*PartitionDesc) MarshalToSizedBuffer

func (m *PartitionDesc) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*PartitionDesc) ProtoMessage

func (*PartitionDesc) ProtoMessage()

func (*PartitionDesc) Reset

func (m *PartitionDesc) Reset()

func (*PartitionDesc) Size

func (m *PartitionDesc) Size() (n int)

func (*PartitionDesc) String

func (this *PartitionDesc) String() string

func (*PartitionDesc) Unmarshal

func (m *PartitionDesc) Unmarshal(dAtA []byte) error

func (*PartitionDesc) XXX_DiscardUnknown

func (m *PartitionDesc) XXX_DiscardUnknown()

func (*PartitionDesc) XXX_Marshal

func (m *PartitionDesc) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*PartitionDesc) XXX_Merge

func (m *PartitionDesc) XXX_Merge(src proto.Message)

func (*PartitionDesc) XXX_Size

func (m *PartitionDesc) XXX_Size() int

func (*PartitionDesc) XXX_Unmarshal

func (m *PartitionDesc) XXX_Unmarshal(b []byte) error

type PartitionInstanceLifecycler

type PartitionInstanceLifecycler struct {
	*services.BasicService
	// contains filtered or unexported fields
}

PartitionInstanceLifecycler is responsible to manage the lifecycle of a single partition and partition owner in the ring.

func NewPartitionInstanceLifecycler

func NewPartitionInstanceLifecycler(cfg PartitionInstanceLifecyclerConfig, ringName, ringKey string, store kv.Client, logger log.Logger, reg prometheus.Registerer) *PartitionInstanceLifecycler

func (*PartitionInstanceLifecycler) ChangePartitionState

func (l *PartitionInstanceLifecycler) ChangePartitionState(ctx context.Context, toState PartitionState) error

ChangePartitionState changes the partition state to toState. This function returns ErrPartitionDoesNotExist if the partition doesn't exist, and ErrPartitionStateChangeNotAllowed if the state change is not allowed.

func (*PartitionInstanceLifecycler) CreatePartitionOnStartup

func (l *PartitionInstanceLifecycler) CreatePartitionOnStartup() bool

CreatePartitionOnStartup returns whether the lifecycle creates the partition on startup if it doesn't exist.

func (*PartitionInstanceLifecycler) GetPartitionState

GetPartitionState returns the current state of the partition, and the timestamp when the state was changed the last time.

func (*PartitionInstanceLifecycler) RemoveOwnerOnShutdown

func (l *PartitionInstanceLifecycler) RemoveOwnerOnShutdown() bool

RemoveOwnerOnShutdown returns whether the lifecycler has been configured to remove the partition owner on shutdown.

func (*PartitionInstanceLifecycler) SetCreatePartitionOnStartup

func (l *PartitionInstanceLifecycler) SetCreatePartitionOnStartup(create bool)

SetCreatePartitionOnStartup sets whether the lifecycler should create the partition on startup if it doesn't exist.

func (*PartitionInstanceLifecycler) SetRemoveOwnerOnShutdown

func (l *PartitionInstanceLifecycler) SetRemoveOwnerOnShutdown(remove bool)

SetRemoveOwnerOnShutdown sets whether the lifecycler should remove the partition owner on shutdown.

type PartitionInstanceLifecyclerConfig

type PartitionInstanceLifecyclerConfig struct {
	// PartitionID is the ID of the partition managed by the lifecycler.
	PartitionID int32

	// InstanceID is the ID of the instance managed by the lifecycler.
	InstanceID string

	// WaitOwnersCountOnPending is the minimum number of owners to wait before switching a
	// PENDING partition to ACTIVE.
	WaitOwnersCountOnPending int

	// WaitOwnersDurationOnPending is how long each owner should have been added to the
	// partition before it's considered eligible for the WaitOwnersCountOnPending count.
	WaitOwnersDurationOnPending time.Duration

	// DeleteInactivePartitionAfterDuration is how long the lifecycler should wait before
	// deleting inactive partitions with no owners. Inactive partitions are never removed
	// if this value is 0.
	DeleteInactivePartitionAfterDuration time.Duration

	// PollingInterval is the internal polling interval. This setting is useful to let
	// upstream projects to lower it in unit tests.
	PollingInterval time.Duration
}

type PartitionInstanceRing

type PartitionInstanceRing struct {
	// contains filtered or unexported fields
}

PartitionInstanceRing holds a partitions ring and a instances ring, and provide functions to look up the intersection of the two (e.g. healthy instances by partition).

func NewPartitionInstanceRing

func NewPartitionInstanceRing(partitionsRingWatcher PartitionRingReader, instancesRing *Ring, heartbeatTimeout time.Duration) *PartitionInstanceRing

func (*PartitionInstanceRing) GetReplicationSetsForOperation

func (r *PartitionInstanceRing) GetReplicationSetsForOperation(op Operation) ([]ReplicationSet, error)

GetReplicationSetsForOperation returns one ReplicationSet for each partition in the ring. A ReplicationSet is returned for every partition in ring. If there are no healthy owners for a partition, an error is returned.

func (*PartitionInstanceRing) InstanceRing

func (r *PartitionInstanceRing) InstanceRing() *Ring

func (*PartitionInstanceRing) PartitionRing

func (r *PartitionInstanceRing) PartitionRing() *PartitionRing

func (*PartitionInstanceRing) ShuffleShard

func (r *PartitionInstanceRing) ShuffleShard(identifier string, size int) (*PartitionInstanceRing, error)

ShuffleShard wraps PartitionRing.ShuffleShard().

The PartitionRing embedded in the returned PartitionInstanceRing is based on a snapshot of the partitions ring at the time this function gets called. This means that subsequent changes to the partitions ring will not be reflected in the returned PartitionInstanceRing.

func (*PartitionInstanceRing) ShuffleShardWithLookback

func (r *PartitionInstanceRing) ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) (*PartitionInstanceRing, error)

ShuffleShardWithLookback wraps PartitionRing.ShuffleShardWithLookback().

The PartitionRing embedded in the returned PartitionInstanceRing is based on a snapshot of the partitions ring at the time this function gets called. This means that subsequent changes to the partitions ring will not be reflected in the returned PartitionInstanceRing.

type PartitionRing

type PartitionRing struct {
	// contains filtered or unexported fields
}

PartitionRing holds an immutable view of the partitions ring.

Design principles:

  • Immutable: the PartitionRingDesc hold by PartitionRing is immutable. When PartitionRingDesc changes a new instance of PartitionRing should be created. The partitions ring is expected to change infrequently (e.g. there's no heartbeat), so creating a new PartitionRing each time the partitions ring changes is not expected to have a significant overhead.

func NewPartitionRing

func NewPartitionRing(desc PartitionRingDesc) *PartitionRing

func (*PartitionRing) ActivePartitionForKey

func (r *PartitionRing) ActivePartitionForKey(key uint32) (int32, error)

ActivePartitionForKey returns partition for the given key. Only active partitions are considered. Only one partition is returned: in other terms, the replication factor is always 1.

func (*PartitionRing) ActivePartitionIDs

func (r *PartitionRing) ActivePartitionIDs() []int32

ActivePartitionIDs returns a sorted list of all ACTIVE partition IDs in the ring. The returned slice is a copy, so the caller can freely manipulate it.

func (*PartitionRing) ActivePartitionsCount

func (r *PartitionRing) ActivePartitionsCount() int

ActivePartitionsCount returns the number of active partitions in the ring.

func (*PartitionRing) GetTokenRangesForPartition

func (r *PartitionRing) GetTokenRangesForPartition(partitionID int32) (TokenRanges, error)

GetTokenRangesForPartition returns token-range owned by given partition. Note that this method does NOT take partition state into account, so if only active partitions should be considered, then PartitionRing with only active partitions must be created first (e.g. using ShuffleShard method).

func (*PartitionRing) InactivePartitionIDs

func (r *PartitionRing) InactivePartitionIDs() []int32

InactivePartitionIDs returns a sorted list of all INACTIVE partition IDs in the ring. The returned slice is a copy, so the caller can freely manipulate it.

func (*PartitionRing) PartitionIDs

func (r *PartitionRing) PartitionIDs() []int32

PartitionIDs returns a sorted list of all partition IDs in the ring. The returned slice is a copy, so the caller can freely manipulate it.

func (*PartitionRing) PartitionOwnerIDs

func (r *PartitionRing) PartitionOwnerIDs(partitionID int32) (doNotModify []string)

PartitionOwnerIDs returns a list of owner IDs for the given partitionID. The returned slice is NOT a copy and should be never modified by the caller.

func (*PartitionRing) PartitionOwnerIDsCopy

func (r *PartitionRing) PartitionOwnerIDsCopy(partitionID int32) []string

PartitionOwnerIDsCopy is like PartitionOwnerIDs(), but the returned slice is a copy, so the caller can freely manipulate it.

func (*PartitionRing) Partitions

func (r *PartitionRing) Partitions() []PartitionDesc

Partitions returns the partitions in the ring. The returned slice is a deep copy, so the caller can freely manipulate it.

func (*PartitionRing) PartitionsCount

func (r *PartitionRing) PartitionsCount() int

PartitionsCount returns the number of partitions in the ring.

func (*PartitionRing) PendingPartitionIDs

func (r *PartitionRing) PendingPartitionIDs() []int32

PendingPartitionIDs returns a sorted list of all PENDING partition IDs in the ring. The returned slice is a copy, so the caller can freely manipulate it.

func (*PartitionRing) ShuffleShard

func (r *PartitionRing) ShuffleShard(identifier string, size int) (*PartitionRing, error)

ShuffleShard returns a subring for the provided identifier (eg. a tenant ID) and size (number of partitions).

The algorithm used to build the subring is a shuffle sharder based on probabilistic hashing. We pick N unique partitions, walking the ring starting from random but predictable numbers. The random generator is initialised with a seed based on the provided identifier.

This function returns a subring containing ONLY ACTIVE partitions.

This function supports caching.

This implementation guarantees:

  • Stability: given the same ring, two invocations returns the same result.

  • Consistency: adding/removing 1 partition from the ring generates a resulting subring with no more then 1 difference.

  • Shuffling: probabilistically, for a large enough cluster each identifier gets a different set of instances, with a reduced number of overlapping instances between two identifiers.

func (*PartitionRing) ShuffleShardSize

func (r *PartitionRing) ShuffleShardSize(size int) int

ShuffleShardSize returns number of partitions that would be in the result of ShuffleShard call with the same size.

func (*PartitionRing) ShuffleShardWithLookback

func (r *PartitionRing) ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) (*PartitionRing, error)

ShuffleShardWithLookback is like ShuffleShard() but the returned subring includes all instances that have been part of the identifier's shard in [now - lookbackPeriod, now] time window.

This function can return a mix of ACTIVE and INACTIVE partitions. INACTIVE partitions are only included if they were part of the identifier's shard within the lookbackPeriod. PENDING partitions are never returned.

This function supports caching, but the cache will only be effective if successive calls for the same identifier are with the same lookbackPeriod and increasing values of now.

func (*PartitionRing) String

func (r *PartitionRing) String() string

type PartitionRingDesc

type PartitionRingDesc struct {
	// Mapping between partition ID and partition info.
	Partitions map[int32]PartitionDesc `` /* 152-byte string literal not displayed */
	// Mapping between instance ID and partition ownership info.
	Owners map[string]OwnerDesc `` /* 143-byte string literal not displayed */
}

PartitionRingDesc holds the state of the partitions ring.

func GetOrCreatePartitionRingDesc

func GetOrCreatePartitionRingDesc(in any) *PartitionRingDesc

func NewPartitionRingDesc

func NewPartitionRingDesc() *PartitionRingDesc

func (*PartitionRingDesc) AddOrUpdateOwner

func (m *PartitionRingDesc) AddOrUpdateOwner(id string, state OwnerState, ownedPartition int32, now time.Time) bool

AddOrUpdateOwner adds or updates a partition owner in the ring. Returns true, if the owner was added or updated, false if it was left unchanged.

func (*PartitionRingDesc) AddPartition

func (m *PartitionRingDesc) AddPartition(id int32, state PartitionState, now time.Time)

AddPartition adds a new partition to the ring. Tokens are auto-generated using the spread minimizing strategy which generates deterministic unique tokens.

func (*PartitionRingDesc) Clone

Clone implements memberlist.Mergeable.

func (*PartitionRingDesc) Descriptor

func (*PartitionRingDesc) Descriptor() ([]byte, []int)

func (*PartitionRingDesc) Equal

func (this *PartitionRingDesc) Equal(that interface{}) bool

func (*PartitionRingDesc) GetOwners

func (m *PartitionRingDesc) GetOwners() map[string]OwnerDesc

func (*PartitionRingDesc) GetPartitions

func (m *PartitionRingDesc) GetPartitions() map[int32]PartitionDesc

func (*PartitionRingDesc) GoString

func (this *PartitionRingDesc) GoString() string

func (*PartitionRingDesc) HasOwner

func (m *PartitionRingDesc) HasOwner(id string) bool

HasOwner returns whether a owner exists.

func (*PartitionRingDesc) HasPartition

func (m *PartitionRingDesc) HasPartition(id int32) bool

HasPartition returns whether a partition exists.

func (*PartitionRingDesc) Marshal

func (m *PartitionRingDesc) Marshal() (dAtA []byte, err error)

func (*PartitionRingDesc) MarshalTo

func (m *PartitionRingDesc) MarshalTo(dAtA []byte) (int, error)

func (*PartitionRingDesc) MarshalToSizedBuffer

func (m *PartitionRingDesc) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*PartitionRingDesc) Merge

func (m *PartitionRingDesc) Merge(mergeable memberlist.Mergeable, localCAS bool) (memberlist.Mergeable, error)

Merge implements memberlist.Mergeable.

func (*PartitionRingDesc) MergeContent

func (m *PartitionRingDesc) MergeContent() []string

MergeContent implements memberlist.Mergeable.

func (*PartitionRingDesc) PartitionOwnersCount

func (m *PartitionRingDesc) PartitionOwnersCount(partitionID int32) int

PartitionOwnersCount returns the number of owners for a given partition.

func (*PartitionRingDesc) PartitionOwnersCountUpdatedBefore

func (m *PartitionRingDesc) PartitionOwnersCountUpdatedBefore(partitionID int32, before time.Time) int

PartitionOwnersCountUpdatedBefore returns the number of owners for a given partition, including only owners which have been updated the last time before the input timestamp.

func (*PartitionRingDesc) ProtoMessage

func (*PartitionRingDesc) ProtoMessage()

func (*PartitionRingDesc) RemoveOwner

func (m *PartitionRingDesc) RemoveOwner(id string) bool

RemoveOwner removes a partition owner. Returns true if the ring has been changed.

func (*PartitionRingDesc) RemovePartition

func (m *PartitionRingDesc) RemovePartition(id int32)

RemovePartition removes a partition.

func (*PartitionRingDesc) RemoveTombstones

func (m *PartitionRingDesc) RemoveTombstones(limit time.Time) (total, removed int)

RemoveTombstones implements memberlist.Mergeable.

func (*PartitionRingDesc) Reset

func (m *PartitionRingDesc) Reset()

func (*PartitionRingDesc) Size

func (m *PartitionRingDesc) Size() (n int)

func (*PartitionRingDesc) String

func (this *PartitionRingDesc) String() string

func (*PartitionRingDesc) Unmarshal

func (m *PartitionRingDesc) Unmarshal(dAtA []byte) error

func (*PartitionRingDesc) UpdatePartitionState

func (m *PartitionRingDesc) UpdatePartitionState(id int32, state PartitionState, now time.Time) bool

UpdatePartitionState changes the state of a partition. Returns true if the state was changed, or false if the update was a no-op.

func (*PartitionRingDesc) WithPartitions

func (m *PartitionRingDesc) WithPartitions(partitions map[int32]struct{}) PartitionRingDesc

WithPartitions returns a new PartitionRingDesc with only the specified partitions and their owners included.

func (*PartitionRingDesc) XXX_DiscardUnknown

func (m *PartitionRingDesc) XXX_DiscardUnknown()

func (*PartitionRingDesc) XXX_Marshal

func (m *PartitionRingDesc) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*PartitionRingDesc) XXX_Merge

func (m *PartitionRingDesc) XXX_Merge(src proto.Message)

func (*PartitionRingDesc) XXX_Size

func (m *PartitionRingDesc) XXX_Size() int

func (*PartitionRingDesc) XXX_Unmarshal

func (m *PartitionRingDesc) XXX_Unmarshal(b []byte) error

type PartitionRingEditor

type PartitionRingEditor struct {
	// contains filtered or unexported fields
}

PartitionRingEditor is standalone component that can be used to modify the partitions ring. If you want to implement the partition lifecycle you should use PartitionInstanceLifecycler instead.

func NewPartitionRingEditor

func NewPartitionRingEditor(ringKey string, store kv.Client) *PartitionRingEditor

func (*PartitionRingEditor) ChangePartitionState

func (l *PartitionRingEditor) ChangePartitionState(ctx context.Context, partitionID int32, toState PartitionState) error

ChangePartitionState changes the partition state to toState. This function returns ErrPartitionDoesNotExist if the partition doesn't exist, and ErrPartitionStateChangeNotAllowed if the state change is not allowed.

type PartitionRingPageHandler

type PartitionRingPageHandler struct {
	// contains filtered or unexported fields
}

func NewPartitionRingPageHandler

func NewPartitionRingPageHandler(reader PartitionRingReader, updater PartitionRingUpdater) *PartitionRingPageHandler

func (*PartitionRingPageHandler) ServeHTTP

type PartitionRingReader

type PartitionRingReader interface {
	// PartitionRing returns a snapshot of the PartitionRing. This function must never return nil.
	// If the ring is empty or unknown, an empty PartitionRing can be returned.
	PartitionRing() *PartitionRing
}

type PartitionRingUpdater

type PartitionRingUpdater interface {
	ChangePartitionState(ctx context.Context, partitionID int32, toState PartitionState) error
}

type PartitionRingWatcher

type PartitionRingWatcher struct {
	services.Service
	// contains filtered or unexported fields
}

PartitionRingWatcher watches the partitions ring for changes in the KV store.

func NewPartitionRingWatcher

func NewPartitionRingWatcher(name, key string, kv kv.Client, logger log.Logger, reg prometheus.Registerer) *PartitionRingWatcher

func (*PartitionRingWatcher) PartitionRing

func (w *PartitionRingWatcher) PartitionRing() *PartitionRing

PartitionRing returns the most updated snapshot of the PartitionRing. The returned instance is immutable and will not be updated if new changes are done to the ring.

type PartitionState

type PartitionState int32
const (
	PartitionUnknown PartitionState = 0
	// Pending partition is a partition that is about to be switched to ACTIVE. This state is used
	// to let owners to attach to the partition and get ready to handle the partition.
	//
	// When a partition is in this state, it must not be used for writing or reading.
	PartitionPending PartitionState = 1
	// Active partition in read-write mode.
	PartitionActive PartitionState = 2
	// Inactive partition in read-only mode. This partition will be deleted after a grace period,
	// unless its state changes to Active again.
	PartitionInactive PartitionState = 3
	// Deleted partition. This state is not visible to ring clients: it's only used to propagate
	// via memberlist the information that a partition has been deleted.
	PartitionDeleted PartitionState = 4
)

func (PartitionState) CleanName

func (s PartitionState) CleanName() string

CleanName returns the PartitionState name without the "Partition" prefix.

func (PartitionState) EnumDescriptor

func (PartitionState) EnumDescriptor() ([]byte, []int)

func (PartitionState) String

func (x PartitionState) String() string

type RandomTokenGenerator

type RandomTokenGenerator struct {
	// contains filtered or unexported fields
}

func NewRandomTokenGenerator

func NewRandomTokenGenerator() *RandomTokenGenerator

func NewRandomTokenGeneratorWithSeed

func NewRandomTokenGeneratorWithSeed(seed int64) *RandomTokenGenerator

func (*RandomTokenGenerator) CanJoin

func (t *RandomTokenGenerator) CanJoin(_ map[string]InstanceDesc) error

func (*RandomTokenGenerator) CanJoinEnabled

func (t *RandomTokenGenerator) CanJoinEnabled() bool

func (*RandomTokenGenerator) GenerateTokens

func (t *RandomTokenGenerator) GenerateTokens(requestedTokensCount int, allTakenTokens []uint32) Tokens

GenerateTokens generates at most requestedTokensCount unique random tokens, none of which clashes with the given allTakenTokens, representing the set of all tokens currently present in the ring. Generated tokens are sorted.

type ReadRing

type ReadRing interface {
	// Get returns n (or more) instances which form the replicas for the given key.
	// bufDescs, bufHosts and bufZones are slices to be overwritten for the return value
	// to avoid memory allocation; can be nil, or created with ring.MakeBuffersForGet().
	Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, bufZones []string) (ReplicationSet, error)

	// GetAllHealthy returns all healthy instances in the ring, for the given operation.
	// This function doesn't check if the quorum is honored, so doesn't fail if the number
	// of unhealthy instances is greater than the tolerated max unavailable.
	GetAllHealthy(op Operation) (ReplicationSet, error)

	// GetReplicationSetForOperation returns all instances where the input operation should be executed.
	// The resulting ReplicationSet doesn't necessarily contains all healthy instances
	// in the ring, but could contain the minimum set of instances required to execute
	// the input operation.
	GetReplicationSetForOperation(op Operation) (ReplicationSet, error)

	ReplicationFactor() int

	// InstancesCount returns the number of instances in the ring.
	InstancesCount() int

	// InstancesWithTokensCount returns the number of instances in the ring that have tokens.
	InstancesWithTokensCount() int

	// ShuffleShard returns a subring for the provided identifier (eg. a tenant ID)
	// and size (number of instances).
	ShuffleShard(identifier string, size int) ReadRing

	// GetInstanceState returns the current state of an instance or an error if the
	// instance does not exist in the ring.
	GetInstanceState(instanceID string) (InstanceState, error)

	// ShuffleShardWithLookback is like ShuffleShard() but the returned subring includes
	// all instances that have been part of the identifier's shard since "now - lookbackPeriod".
	ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) ReadRing

	// HasInstance returns whether the ring contains an instance matching the provided instanceID.
	HasInstance(instanceID string) bool

	// CleanupShuffleShardCache should delete cached shuffle-shard subrings for given identifier.
	CleanupShuffleShardCache(identifier string)

	// GetTokenRangesForInstance returns the token ranges owned by an instance in the ring
	GetTokenRangesForInstance(instanceID string) (TokenRanges, error)

	// InstancesInZoneCount returns the number of instances in the ring that are registered in given zone.
	InstancesInZoneCount(zone string) int

	// InstancesWithTokensInZoneCount returns the number of instances in the ring that are registered in given zone and have tokens.
	InstancesWithTokensInZoneCount(zone string) int

	// ZonesCount returns the number of zones for which there's at least 1 instance registered in the ring.
	ZonesCount() int
}

ReadRing represents the read interface to the ring.

type ReplicationSet

type ReplicationSet struct {
	Instances []InstanceDesc

	// Maximum number of tolerated failing instances. Max errors and max unavailable zones are
	// mutually exclusive.
	MaxErrors int

	// Maximum number of different zones in which instances can fail. Max unavailable zones and
	// max errors are mutually exclusive.
	MaxUnavailableZones int

	ZoneAwarenessEnabled bool
}

ReplicationSet describes the instances to talk to for a given key, and how many errors to tolerate.

func (ReplicationSet) Do

func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, f func(context.Context, *InstanceDesc) (interface{}, error)) ([]interface{}, error)

Do function f in parallel for all replicas in the set, erroring if we exceed MaxErrors and returning early otherwise. Return a slice of all results from f, or nil if an error occurred.

func (ReplicationSet) GetAddresses

func (r ReplicationSet) GetAddresses() []string

GetAddresses returns the addresses of all instances within the replication set. Returned slice order is not guaranteed.

func (ReplicationSet) GetAddressesWithout

func (r ReplicationSet) GetAddressesWithout(exclude string) []string

GetAddressesWithout returns the addresses of all instances within the replication set while excluding the specified address. Returned slice order is not guaranteed.

func (ReplicationSet) GetIDs

func (r ReplicationSet) GetIDs() []string

GetIDs returns the IDs of all instances within the replication set. Returned slice order is not guaranteed.

func (ReplicationSet) Includes

func (r ReplicationSet) Includes(addr string) bool

Includes returns whether the replication set includes the replica with the provided addr.

func (*ReplicationSet) ZoneCount

func (r *ReplicationSet) ZoneCount() int

ZoneCount returns the number of unique zones represented by instances within this replication set.

type ReplicationStrategy

type ReplicationStrategy interface {
	// Filter out unhealthy instances and checks if there're enough instances
	// for an operation to succeed. Returns an error if there are not enough
	// instances.
	Filter(instances []InstanceDesc, op Operation, replicationFactor int, heartbeatTimeout time.Duration, zoneAwarenessEnabled bool) (healthy []InstanceDesc, maxFailures int, err error)
}

func NewDefaultReplicationStrategy

func NewDefaultReplicationStrategy() ReplicationStrategy

func NewIgnoreUnhealthyInstancesReplicationStrategy

func NewIgnoreUnhealthyInstancesReplicationStrategy() ReplicationStrategy

type Ring

type Ring struct {
	services.Service

	KVClient kv.Client
	// contains filtered or unexported fields
}

Ring is a Service that maintains an in-memory copy of a ring and watches for changes.

func New

func New(cfg Config, name, key string, logger log.Logger, reg prometheus.Registerer) (*Ring, error)

New creates a new Ring. Being a service, Ring needs to be started to do anything.

func NewWithStoreClientAndStrategy

func NewWithStoreClientAndStrategy(cfg Config, name, key string, store kv.Client, strategy ReplicationStrategy, reg prometheus.Registerer, logger log.Logger) (*Ring, error)

func (*Ring) CleanupShuffleShardCache

func (r *Ring) CleanupShuffleShardCache(identifier string)

func (*Ring) Get

func (r *Ring) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, bufZones []string) (ReplicationSet, error)

Get returns n (or more) instances which form the replicas for the given key.

func (*Ring) GetAllHealthy

func (r *Ring) GetAllHealthy(op Operation) (ReplicationSet, error)

GetAllHealthy implements ReadRing.

func (*Ring) GetInstance

func (r *Ring) GetInstance(instanceID string) (doNotModify InstanceDesc, _ error)

GetInstance return the InstanceDesc for the given instanceID or an error if the instance doesn't exist in the ring. The returned InstanceDesc is NOT a deep copy, so the caller should never modify it.

func (*Ring) GetInstanceState

func (r *Ring) GetInstanceState(instanceID string) (InstanceState, error)

GetInstanceState returns the current state of an instance or an error if the instance does not exist in the ring.

func (*Ring) GetReplicationSetForOperation

func (r *Ring) GetReplicationSetForOperation(op Operation) (ReplicationSet, error)

GetReplicationSetForOperation implements ReadRing.

func (*Ring) GetTokenRangesForInstance

func (r *Ring) GetTokenRangesForInstance(instanceID string) (TokenRanges, error)

GetTokenRangesForInstance returns the token ranges owned by an instance in the ring.

Current implementation only works with multizone setup, where number of zones is equal to replication factor.

func (*Ring) HasInstance

func (r *Ring) HasInstance(instanceID string) bool

HasInstance returns whether the ring contains an instance matching the provided instanceID.

func (*Ring) InstancesCount

func (r *Ring) InstancesCount() int

InstancesCount returns the number of instances in the ring.

func (*Ring) InstancesInZoneCount

func (r *Ring) InstancesInZoneCount(zone string) int

InstancesInZoneCount returns the number of instances in the ring that are registered in given zone.

func (*Ring) InstancesWithTokensCount

func (r *Ring) InstancesWithTokensCount() int

InstancesWithTokensCount returns the number of instances in the ring that have tokens.

func (*Ring) InstancesWithTokensInZoneCount

func (r *Ring) InstancesWithTokensInZoneCount(zone string) int

InstancesWithTokensInZoneCount returns the number of instances in the ring that are registered in given zone and have tokens.

func (*Ring) IsHealthy

func (r *Ring) IsHealthy(instance *InstanceDesc, op Operation, now time.Time) bool

func (*Ring) ReplicationFactor

func (r *Ring) ReplicationFactor() int

ReplicationFactor of the ring.

func (*Ring) ServeHTTP

func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request)

func (*Ring) ShuffleShard

func (r *Ring) ShuffleShard(identifier string, size int) ReadRing

ShuffleShard returns a subring for the provided identifier (eg. a tenant ID) and size (number of instances). The size is expected to be a multiple of the number of zones and the returned subring will contain the same number of instances per zone as far as there are enough registered instances in the ring.

The algorithm used to build the subring is a shuffle sharder based on probabilistic hashing. We treat each zone as a separate ring and pick N unique replicas from each zone, walking the ring starting from random but predictable numbers. The random generator is initialised with a seed based on the provided identifier.

This implementation guarantees:

- Stability: given the same ring, two invocations returns the same result.

- Consistency: adding/removing 1 instance from the ring generates a resulting subring with no more then 1 difference.

- Shuffling: probabilistically, for a large enough cluster each identifier gets a different set of instances, with a reduced number of overlapping instances between two identifiers.

func (*Ring) ShuffleShardWithLookback

func (r *Ring) ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) ReadRing

ShuffleShardWithLookback is like ShuffleShard() but the returned subring includes all instances that have been part of the identifier's shard since "now - lookbackPeriod".

The returned subring may be unbalanced with regard to zones and should never be used for write operations (read only).

This function supports caching, but the cache will only be effective if successive calls for the same identifier are with the same lookbackPeriod and increasing values of now.

func (*Ring) ZonesCount

func (r *Ring) ZonesCount() int

type SpreadMinimizingTokenGenerator

type SpreadMinimizingTokenGenerator struct {
	// contains filtered or unexported fields
}

func NewSpreadMinimizingTokenGenerator

func NewSpreadMinimizingTokenGenerator(instance, zone string, spreadMinimizingZones []string, canJoinEnabled bool) (*SpreadMinimizingTokenGenerator, error)

func NewSpreadMinimizingTokenGeneratorForInstanceAndZoneID

func NewSpreadMinimizingTokenGeneratorForInstanceAndZoneID(instancePrefix string, instanceID, zoneID int, canJoinEnabled bool) *SpreadMinimizingTokenGenerator

func (*SpreadMinimizingTokenGenerator) CanJoin

func (t *SpreadMinimizingTokenGenerator) CanJoin(instances map[string]InstanceDesc) error

func (*SpreadMinimizingTokenGenerator) CanJoinEnabled

func (t *SpreadMinimizingTokenGenerator) CanJoinEnabled() bool

func (*SpreadMinimizingTokenGenerator) GenerateTokens

func (t *SpreadMinimizingTokenGenerator) GenerateTokens(requestedTokensCount int, allTakenTokens []uint32) Tokens

GenerateTokens returns at most requestedTokensCount unique tokens, none of which clashes with the given allTakenTokens, representing the set of all tokens currently present in the ring. Returned tokens are sorted. The optimal number of tokens (optimalTokenPerInstance), i.e., 512, reserved for the underlying instance are generated by generateAllTokens. GenerateTokens selects the first requestedTokensCount tokens from the reserved tokens set, that are not already present in the takenTokens. The number of returned tokens might be lower than the requested number of tokens in the following cases:

  • if tokensCount is higher than 512 (optimalTokensPerInstance), or
  • if among the 512 (optimalTokenPerInstance) reserved tokens there is less than tokenCount tokens not already present in takenTokens.

type Token

type Token uint32

type TokenGenerator

type TokenGenerator interface {
	// GenerateTokens generates at most requestedTokensCount unique tokens, none of which clashes with
	// the given allTakenTokens, representing the set of all tokens currently present in the ring.
	// Generated tokens are sorted.
	GenerateTokens(requestedTokensCount int, allTakenTokens []uint32) Tokens

	// CanJoin checks whether the instance owning this TokenGenerator can join the set of the given instances satisfies,
	// and fails if it is not possible.
	CanJoin(instances map[string]InstanceDesc) error

	// CanJoinEnabled returns true if the instance owning this TokenGenerator should perform the CanJoin check before
	// it tries to join the ring.
	CanJoinEnabled() bool
}

type TokenRanges

type TokenRanges []uint32

TokenRanges describes token ranges owned by an instance. It consists of [start, end] pairs, where both start and end are inclusive. For example TokenRanges with values [5, 10, 20, 30] covers tokens [5..10] and [20..30].

func (TokenRanges) Equal

func (tr TokenRanges) Equal(other TokenRanges) bool

func (TokenRanges) IncludesKey

func (tr TokenRanges) IncludesKey(key uint32) bool

type Tokens

type Tokens []uint32

Tokens is a simple list of tokens.

func LoadTokensFromFile

func LoadTokensFromFile(tokenFilePath string) (Tokens, error)

LoadTokensFromFile loads tokens from given file path.

func (Tokens) Equals

func (t Tokens) Equals(other Tokens) bool

Equals returns whether the tokens are equal to the input ones.

func (Tokens) Len

func (t Tokens) Len() int

func (Tokens) Less

func (t Tokens) Less(i, j int) bool

func (Tokens) Marshal

func (t Tokens) Marshal() ([]byte, error)

Marshal encodes the tokens into JSON.

func (Tokens) StoreToFile

func (t Tokens) StoreToFile(tokenFilePath string) error

StoreToFile stores the tokens in the given directory.

func (Tokens) Swap

func (t Tokens) Swap(i, j int)

func (*Tokens) Unmarshal

func (t *Tokens) Unmarshal(b []byte) error

Unmarshal reads the tokens from JSON byte stream.

type TokensPersistencyDelegate

type TokensPersistencyDelegate struct {
	// contains filtered or unexported fields
}

func NewTokensPersistencyDelegate

func NewTokensPersistencyDelegate(path string, state InstanceState, next BasicLifecyclerDelegate, logger log.Logger) *TokensPersistencyDelegate

func (*TokensPersistencyDelegate) OnRingInstanceHeartbeat

func (d *TokensPersistencyDelegate) OnRingInstanceHeartbeat(lifecycler *BasicLifecycler, ringDesc *Desc, instanceDesc *InstanceDesc)

func (*TokensPersistencyDelegate) OnRingInstanceRegister

func (d *TokensPersistencyDelegate) OnRingInstanceRegister(lifecycler *BasicLifecycler, ringDesc Desc, instanceExists bool, instanceID string, instanceDesc InstanceDesc) (InstanceState, Tokens)

func (*TokensPersistencyDelegate) OnRingInstanceStopping

func (d *TokensPersistencyDelegate) OnRingInstanceStopping(lifecycler *BasicLifecycler)

func (*TokensPersistencyDelegate) OnRingInstanceTokens

func (d *TokensPersistencyDelegate) OnRingInstanceTokens(lifecycler *BasicLifecycler, tokens Tokens)

type ZoneSorter

type ZoneSorter func(zones []string) []string

Directories

Path Synopsis
example
local Module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL