cluster

package
v0.0.0-...-27b3d77 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 25, 2024 License: MIT Imports: 35 Imported by: 0

Documentation

Overview

Package cluster implements rendezvous hashing (a.k.a. highest random weight hashing). See http://en.wikipedia.org/wiki/Rendezvous_hashing for more information. This is based off of https://github.com/tysonmote/rendezvous

Index

Constants

View Source
const (
	ReasonLargeUploadQueue     = "LargeUploadQueue"
	ReasonLargeTransferQueue   = "LargeTransferQueue"
	ReasonMaxSegmentsExceeded  = "MaxSegmentsExceeded"
	ReasonMaxDiskUsageExceeded = "MaxDiskUsageExceeded"
)

Variables

View Source
var (
	ErrPeerOverloaded = fmt.Errorf("peer overloaded")
	ErrSegmentExists  = fmt.Errorf("segment already exists")
)

Functions

func GetOutboundIP

func GetOutboundIP() (net.IP, error)

Get preferred outbound ip of this machine

func IsInitReady

func IsInitReady(pod *v1.Pod) bool

IsInitReady returns true if all init containers are in a ready state

func IsPodReady

func IsPodReady(pod *v1.Pod) bool

IsPodReady returns true if all containers in a pod are in a ready state

Types

type Batch

type Batch struct {
	Segments []wal.SegmentInfo
	Database string
	Table    string
	Prefix   string
	// contains filtered or unexported fields
}

func (*Batch) IsReleased

func (b *Batch) IsReleased() bool

func (*Batch) IsRemoved

func (b *Batch) IsRemoved() bool

func (*Batch) Paths

func (b *Batch) Paths() []string

func (*Batch) Release

func (b *Batch) Release()

Release releases the segments in the batch so they can be processed again.

func (*Batch) Remove

func (b *Batch) Remove() error

Remove removes the segments in the batch from disk.

type Batcher

type Batcher interface {
	service.Component
	BatchSegments() error
	UploadQueueSize() int
	TransferQueueSize() int
	SegmentsTotal() int64
	SegmentsSize() int64

	Release(batch *Batch)
	Remove(batch *Batch) error
}

func NewBatcher

func NewBatcher(opts BatcherOpts) Batcher

type BatcherOpts

type BatcherOpts struct {
	StorageDir      string
	MinUploadSize   int64
	MaxSegmentAge   time.Duration
	MaxTransferSize int64
	MaxTransferAge  time.Duration

	Partitioner MetricPartitioner
	Segmenter   Segmenter

	UploadQueue        chan *Batch
	TransferQueue      chan *Batch
	PeerHealthReporter PeerHealthReporter

	TransfersDisabled bool
}

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(opts ClientOpts) (*Client, error)

func (*Client) Write

func (c *Client) Write(ctx context.Context, endpoint string, filename string, body io.Reader) error

Write writes the given paths to the given endpoint. If multiple paths are given, they are merged into the first file at the destination. This ensures we transfer the full batch atomimcally.

type ClientOpts

type ClientOpts struct {
	// Close controls whether the client closes the connection after each request.
	Close bool

	// Timeout is the timeout for the http client and the http request.
	Timeout time.Duration

	// InsecureSkipVerify controls whether the client verifies the server's certificate chain and host name.
	InsecureSkipVerify bool

	// IdleConnTimeout is the maximum amount of time an idle (keep-alive) connection
	// will remain idle before closing itself.
	IdleConnTimeout time.Duration

	// ResponseHeaderTimeout is the amount of time to wait for a server's response headers
	// after fully writing the request (including its body, if any).
	ResponseHeaderTimeout time.Duration

	// MaxIdleConns controls the maximum number of idle (keep-alive) connections across all hosts.
	MaxIdleConns int

	// MaxIdleConnsPerHost, if non-zero, controls the maximum idle (keep-alive) per host.
	MaxIdleConnsPerHost int

	// MaxConnsPerHost, if non-zero, controls the maximum connections per host.
	MaxConnsPerHost int

	// TLSHandshakeTimeout specifies the maximum amount of time to
	// wait for a TLS handshake. Zero means no timeout.
	TLSHandshakeTimeout time.Duration

	// DisableHTTP2 controls whether the client disables HTTP/2 support.
	DisableHTTP2 bool

	// DisableKeepAlives controls whether the client disables HTTP keep-alives.
	DisableKeepAlives bool
}

func (ClientOpts) WithDefaults

func (c ClientOpts) WithDefaults() ClientOpts

type Coordinator

type Coordinator interface {
	MetricPartitioner
	service.Component

	// IsLeader returns true if the current node is the leader.
	IsLeader() bool

	// Write writes the time series to the correct peer.
	Write(ctx context.Context, wr *prompb.WriteRequest) error

	// WriteOTLPLogs writes the logs to the correct peer.
	WriteOTLPLogs(ctx context.Context, database, table string, logs *otlp.Logs) error
}

func NewCoordinator

func NewCoordinator(opts *CoordinatorOpts) (Coordinator, error)

type CoordinatorOpts

type CoordinatorOpts struct {
	WriteTimeSeriesFn TimeSeriesWriter
	WriteOTLPLogsFn   OTLPLogsWriter
	K8sCli            kubernetes.Interface

	// Namespace is the namespace used to discover peers.  If not specified, the coordinator will
	// try to use the namespace of the current pod.
	Namespace string

	// Hostname is the hostname of the current node.  This should be the statefulset hostname format
	// in order to discover peers correctly
	Hostname string

	// InsecureSkipVerify controls whether a client verifies the server's certificate chain and host name.
	InsecureSkipVerify bool

	// PartitionSize is the max size of the group of nodes forming a partition.  A partition is a set of nodes where
	// keys are distributed.
	// NOTE: This is not used in the current implementation.
	PartitionSize int
}

type ErrBadRequest

type ErrBadRequest struct {
	Msg string
}

func (ErrBadRequest) Error

func (e ErrBadRequest) Error() string

func (ErrBadRequest) Is

func (e ErrBadRequest) Is(target error) bool

type FakeReplicator

type FakeReplicator struct {
	// contains filtered or unexported fields
}

func NewFakeReplicator

func NewFakeReplicator() *FakeReplicator

func (*FakeReplicator) Close

func (f *FakeReplicator) Close() error

func (*FakeReplicator) Open

func (f *FakeReplicator) Open(ctx context.Context) error

func (*FakeReplicator) TransferQueue

func (f *FakeReplicator) TransferQueue() chan *Batch

type Hash

type Hash struct {
	// contains filtered or unexported fields
}

func NewRendezvous

func NewRendezvous(nodes ...string) *Hash

New returns a new Hash ready for use with the given nodes.

func (*Hash) Add

func (h *Hash) Add(nodes ...string)

Add adds additional nodes to the Hash.

func (*Hash) Get

func (h *Hash) Get(key string) string

Get returns the node with the highest score for the given key. If this Hash has no nodes, an empty string is returned.

func (*Hash) GetN

func (h *Hash) GetN(n int, key string) []string

GetN returns no more than n nodes for the given key, ordered by descending score. GetN is not goroutine-safe.

type Health

type Health struct {
	QueueSizer QueueSizer
	// contains filtered or unexported fields
}

Health tracks the health of peers in the cluster. If a peer is overloaded, it will be marked as unhealthy which will cause the service to stop sending writes to that peer for timeout period. Similarly, the of the current peer is tracked here and if it is unhealthy, the service will stop accepting writes.

func NewHealth

func NewHealth(opts HealthOpts) *Health

func (*Health) Close

func (h *Health) Close() error

func (*Health) IsHealthy

func (h *Health) IsHealthy() bool

func (*Health) IsPeerHealthy

func (h *Health) IsPeerHealthy(peer string) bool

func (*Health) Open

func (h *Health) Open(ctx context.Context) error

func (*Health) SegmentsSize

func (h *Health) SegmentsSize() int64

func (*Health) SegmentsTotal

func (h *Health) SegmentsTotal() int64

func (*Health) SetPeerHealthy

func (h *Health) SetPeerHealthy(peer string)

func (*Health) SetPeerUnhealthy

func (h *Health) SetPeerUnhealthy(peer string)

func (*Health) TransferQueueSize

func (h *Health) TransferQueueSize() int

func (*Health) UnhealthyReason

func (h *Health) UnhealthyReason() string

func (*Health) UploadQueueSize

func (h *Health) UploadQueueSize() int

type HealthOpts

type HealthOpts struct {
	// UnhealthyTimeout is the amount of time to wait before marking a peer as healthy.
	UnhealthyTimeout time.Duration

	QueueSizer      QueueSizer
	MaxSegmentCount int64
	MaxDiskUsage    int64
}

type HealthStatus

type HealthStatus struct {
	Healthy   bool
	NextCheck time.Time
}

type MetricPartitioner

type MetricPartitioner interface {
	Owner([]byte) (string, string)
}

type OTLPLogsWriter

type OTLPLogsWriter func(ctx context.Context, database, table string, logs *otlp.Logs) error

type Partitioner

type Partitioner struct {
	// contains filtered or unexported fields
}

Partitioner manages the distribution of metrics across nodes. It uses rendezvous hashing to distribute metrics roughly evenly. When nodes are added or removed, the distribution of metrics will change, but only by a proportional amount for each node. For example, if four nodes exists and a fifth is added, only 20% of the metrics will be reassigned to the new node.

func NewPartition

func NewPartition(nodes map[string]string) (*Partitioner, error)

func (*Partitioner) Owner

func (p *Partitioner) Owner(b []byte) (string, string)

Owner returns the hostname and address of the node that owns the given key and the address of that node.

type PeerHealthReporter

type PeerHealthReporter interface {
	IsPeerHealthy(peer string) bool
	SetPeerUnhealthy(peer string)
	SetPeerHealthy(peer string)
}

type QueueSizer

type QueueSizer interface {
	TransferQueueSize() int
	UploadQueueSize() int
	SegmentsTotal() int64
	SegmentsSize() int64
}

type Replicator

type Replicator interface {
	service.Component
	// TransferQueue returns a channel that can be used to transfer files to other nodes.
	TransferQueue() chan *Batch
}

Replicator manages the transfer of local segments to other nodes.

func NewReplicator

func NewReplicator(opts ReplicatorOpts) (Replicator, error)

type ReplicatorOpts

type ReplicatorOpts struct {
	// Partitioner is used to determine which node owns a given metric.
	Partitioner MetricPartitioner

	// Health is used to report the health of the peer replication.
	Health PeerHealthReporter

	// SegmentRemover is used to remove segments after they have been replicated.
	SegmentRemover SegmentRemover

	// InsecureSkipVerify controls whether a client verifies the server's certificate chain and host name.
	InsecureSkipVerify bool

	// Hostname is the name of the current node.
	Hostname string
}

type SegmentRemover

type SegmentRemover interface {
	Remove(path string) error
}

type Segmenter

type Segmenter interface {
	Get(prefix string) []wal.SegmentInfo
	PrefixesByAge() []string
	Remove(si wal.SegmentInfo)
}

type TimeSeriesWriter

type TimeSeriesWriter func(ctx context.Context, ts []*prompb.TimeSeries) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL