Documentation ¶
Overview ¶
Package cluster implements rendezvous hashing (a.k.a. highest random weight hashing). See http://en.wikipedia.org/wiki/Rendezvous_hashing for more information. This is based off of https://github.com/tysonmote/rendezvous
Index ¶
- Constants
- Variables
- func GetOutboundIP() (net.IP, error)
- func IsInitReady(pod *v1.Pod) bool
- func IsPodReady(pod *v1.Pod) bool
- type Batch
- type Batcher
- type BatcherOpts
- type Client
- type ClientOpts
- type Coordinator
- type CoordinatorOpts
- type ErrBadRequest
- type FakeReplicator
- type Hash
- type Health
- func (h *Health) Close() error
- func (h *Health) IsHealthy() bool
- func (h *Health) IsPeerHealthy(peer string) bool
- func (h *Health) Open(ctx context.Context) error
- func (h *Health) SegmentsSize() int64
- func (h *Health) SegmentsTotal() int64
- func (h *Health) SetPeerHealthy(peer string)
- func (h *Health) SetPeerUnhealthy(peer string)
- func (h *Health) TransferQueueSize() int
- func (h *Health) UnhealthyReason() string
- func (h *Health) UploadQueueSize() int
- type HealthOpts
- type HealthStatus
- type MetricPartitioner
- type OTLPLogsWriter
- type Partitioner
- type PeerHealthReporter
- type QueueSizer
- type Replicator
- type ReplicatorOpts
- type SegmentRemover
- type Segmenter
- type TimeSeriesWriter
Constants ¶
const ( ReasonLargeUploadQueue = "LargeUploadQueue" ReasonLargeTransferQueue = "LargeTransferQueue" ReasonMaxSegmentsExceeded = "MaxSegmentsExceeded" ReasonMaxDiskUsageExceeded = "MaxDiskUsageExceeded" )
Variables ¶
var ( ErrPeerOverloaded = fmt.Errorf("peer overloaded") ErrSegmentExists = fmt.Errorf("segment already exists") )
Functions ¶
func IsInitReady ¶
IsInitReady returns true if all init containers are in a ready state
func IsPodReady ¶
IsPodReady returns true if all containers in a pod are in a ready state
Types ¶
type Batch ¶
type Batch struct { Segments []wal.SegmentInfo Database string Table string Prefix string // contains filtered or unexported fields }
func (*Batch) IsReleased ¶
type Batcher ¶
type Batcher interface { service.Component BatchSegments() error UploadQueueSize() int TransferQueueSize() int SegmentsTotal() int64 SegmentsSize() int64 Release(batch *Batch) Remove(batch *Batch) error }
func NewBatcher ¶
func NewBatcher(opts BatcherOpts) Batcher
type BatcherOpts ¶
type BatcherOpts struct { StorageDir string MinUploadSize int64 MaxSegmentAge time.Duration MaxTransferSize int64 MaxTransferAge time.Duration Partitioner MetricPartitioner Segmenter Segmenter UploadQueue chan *Batch TransferQueue chan *Batch PeerHealthReporter PeerHealthReporter TransfersDisabled bool }
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
func NewClient ¶
func NewClient(opts ClientOpts) (*Client, error)
type ClientOpts ¶
type ClientOpts struct { // Close controls whether the client closes the connection after each request. Close bool // Timeout is the timeout for the http client and the http request. Timeout time.Duration // InsecureSkipVerify controls whether the client verifies the server's certificate chain and host name. InsecureSkipVerify bool // IdleConnTimeout is the maximum amount of time an idle (keep-alive) connection // will remain idle before closing itself. IdleConnTimeout time.Duration // ResponseHeaderTimeout is the amount of time to wait for a server's response headers // after fully writing the request (including its body, if any). ResponseHeaderTimeout time.Duration // MaxIdleConns controls the maximum number of idle (keep-alive) connections across all hosts. MaxIdleConns int // MaxIdleConnsPerHost, if non-zero, controls the maximum idle (keep-alive) per host. MaxIdleConnsPerHost int // MaxConnsPerHost, if non-zero, controls the maximum connections per host. MaxConnsPerHost int // TLSHandshakeTimeout specifies the maximum amount of time to // wait for a TLS handshake. Zero means no timeout. TLSHandshakeTimeout time.Duration // DisableHTTP2 controls whether the client disables HTTP/2 support. DisableHTTP2 bool // DisableKeepAlives controls whether the client disables HTTP keep-alives. DisableKeepAlives bool }
func (ClientOpts) WithDefaults ¶
func (c ClientOpts) WithDefaults() ClientOpts
type Coordinator ¶
type Coordinator interface { MetricPartitioner service.Component // IsLeader returns true if the current node is the leader. IsLeader() bool // Write writes the time series to the correct peer. Write(ctx context.Context, wr *prompb.WriteRequest) error // WriteOTLPLogs writes the logs to the correct peer. WriteOTLPLogs(ctx context.Context, database, table string, logs *otlp.Logs) error }
func NewCoordinator ¶
func NewCoordinator(opts *CoordinatorOpts) (Coordinator, error)
type CoordinatorOpts ¶
type CoordinatorOpts struct { WriteTimeSeriesFn TimeSeriesWriter WriteOTLPLogsFn OTLPLogsWriter K8sCli kubernetes.Interface // Namespace is the namespace used to discover peers. If not specified, the coordinator will // try to use the namespace of the current pod. Namespace string // Hostname is the hostname of the current node. This should be the statefulset hostname format // in order to discover peers correctly Hostname string // InsecureSkipVerify controls whether a client verifies the server's certificate chain and host name. InsecureSkipVerify bool // PartitionSize is the max size of the group of nodes forming a partition. A partition is a set of nodes where // keys are distributed. // NOTE: This is not used in the current implementation. PartitionSize int }
type ErrBadRequest ¶
type ErrBadRequest struct {
Msg string
}
func (ErrBadRequest) Error ¶
func (e ErrBadRequest) Error() string
func (ErrBadRequest) Is ¶
func (e ErrBadRequest) Is(target error) bool
type FakeReplicator ¶
type FakeReplicator struct {
// contains filtered or unexported fields
}
func NewFakeReplicator ¶
func NewFakeReplicator() *FakeReplicator
func (*FakeReplicator) Close ¶
func (f *FakeReplicator) Close() error
func (*FakeReplicator) TransferQueue ¶
func (f *FakeReplicator) TransferQueue() chan *Batch
type Hash ¶
type Hash struct {
// contains filtered or unexported fields
}
func NewRendezvous ¶
New returns a new Hash ready for use with the given nodes.
type Health ¶
type Health struct { QueueSizer QueueSizer // contains filtered or unexported fields }
Health tracks the health of peers in the cluster. If a peer is overloaded, it will be marked as unhealthy which will cause the service to stop sending writes to that peer for timeout period. Similarly, the of the current peer is tracked here and if it is unhealthy, the service will stop accepting writes.
func NewHealth ¶
func NewHealth(opts HealthOpts) *Health
func (*Health) IsPeerHealthy ¶
func (*Health) SegmentsSize ¶
func (*Health) SegmentsTotal ¶
func (*Health) SetPeerHealthy ¶
func (*Health) SetPeerUnhealthy ¶
func (*Health) TransferQueueSize ¶
func (*Health) UnhealthyReason ¶
func (*Health) UploadQueueSize ¶
type HealthOpts ¶
type HealthOpts struct { // UnhealthyTimeout is the amount of time to wait before marking a peer as healthy. UnhealthyTimeout time.Duration QueueSizer QueueSizer MaxSegmentCount int64 MaxDiskUsage int64 }
type HealthStatus ¶
type MetricPartitioner ¶
type OTLPLogsWriter ¶
type Partitioner ¶
type Partitioner struct {
// contains filtered or unexported fields
}
Partitioner manages the distribution of metrics across nodes. It uses rendezvous hashing to distribute metrics roughly evenly. When nodes are added or removed, the distribution of metrics will change, but only by a proportional amount for each node. For example, if four nodes exists and a fifth is added, only 20% of the metrics will be reassigned to the new node.
func NewPartition ¶
func NewPartition(nodes map[string]string) (*Partitioner, error)
type PeerHealthReporter ¶
type QueueSizer ¶
type Replicator ¶
type Replicator interface { service.Component // TransferQueue returns a channel that can be used to transfer files to other nodes. TransferQueue() chan *Batch }
Replicator manages the transfer of local segments to other nodes.
func NewReplicator ¶
func NewReplicator(opts ReplicatorOpts) (Replicator, error)
type ReplicatorOpts ¶
type ReplicatorOpts struct { // Partitioner is used to determine which node owns a given metric. Partitioner MetricPartitioner // Health is used to report the health of the peer replication. Health PeerHealthReporter // SegmentRemover is used to remove segments after they have been replicated. SegmentRemover SegmentRemover // InsecureSkipVerify controls whether a client verifies the server's certificate chain and host name. InsecureSkipVerify bool // Hostname is the name of the current node. Hostname string }
type SegmentRemover ¶
type Segmenter ¶
type Segmenter interface { Get(prefix string) []wal.SegmentInfo PrefixesByAge() []string Remove(si wal.SegmentInfo) }
type TimeSeriesWriter ¶
type TimeSeriesWriter func(ctx context.Context, ts []*prompb.TimeSeries) error