cluster

package
v0.40.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 5, 2024 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Overview

Package cluster implements the cluster service for Flow, where multiple instances of Flow connect to each other for work distribution.

Index

Constants

View Source
const ServiceName = "cluster"

ServiceName defines the name used for the cluster service.

Variables

This section is empty.

Functions

This section is empty.

Types

type Cluster

type Cluster interface {
	// Lookup determines the set of replicationFactor owners for a given key.
	// peer.Peer.Self can be used to determine if the local node is the owner,
	// allowing for short-circuiting logic to connect directly to the local node
	// instead of using the network.
	//
	// Callers can use github.com/grafana/ckit/shard.StringKey or
	// shard.NewKeyBuilder to create a key.
	Lookup(key shard.Key, replicationFactor int, op shard.Op) ([]peer.Peer, error)

	// Peers returns the current set of peers for a Node.
	Peers() []peer.Peer
}

Cluster is a read-only view of a cluster.

func Mock

func Mock() Cluster

Mock returns a mock implementation of the Cluster interface.

type Component

type Component interface {
	component.Component

	// NotifyClusterChange notifies the component that the state of the cluster
	// has changed.
	//
	// Implementations should ignore calls to this method if they are configured
	// to not utilize clustering.
	NotifyClusterChange()
}

Component is a Flow component which subscribes to clustering updates.

type ComponentBlock added in v0.38.0

type ComponentBlock struct {
	Enabled bool `river:"enabled,attr"`
}

ComponentBlock holds common arguments for clustering settings within a component. ComponentBlock is intended to be exposed as a block called "clustering".

type Options

type Options struct {
	Log     log.Logger            // Where to send logs to.
	Metrics prometheus.Registerer // Where to send metrics to.
	Tracer  trace.TracerProvider  // Where to send traces.

	// EnableClustering toggles clustering as a whole. When EnableClustering is
	// false, the instance of Flow acts as a single-node cluster and it is not
	// possible for other nodes to join the cluster.
	EnableClustering bool

	NodeName            string        // Name to use for this node in the cluster.
	AdvertiseAddress    string        // Address to advertise to other nodes in the cluster.
	RejoinInterval      time.Duration // How frequently to rejoin the cluster to address split brain issues.
	ClusterMaxJoinPeers int           // Number of initial peers to join from the discovered set.
	ClusterName         string        // Name to prevent nodes without this identifier from joining the cluster.

	// Function to discover peers to join. If this function is nil or returns an
	// empty slice, no peers will be joined.
	DiscoverPeers func() ([]string, error)
}

Options are used to configure the cluster service. Options are constant for the lifetime of the cluster service.

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service is the cluster service.

func New

func New(opts Options) (*Service, error)

New returns a new, unstarted instance of the cluster service.

func (*Service) ChangeState

func (s *Service) ChangeState(ctx context.Context, targetState peer.State) error

ChangeState changes the state of the service. If clustering is enabled, ChangeState will block until the state change has been propagated to another node; cancel the current context to stop waiting. ChangeState fails if the current state cannot move to the provided targetState.

Note that the state must be StateParticipant to receive writes.

func (*Service) Data

func (s *Service) Data() any

Data returns an instance of Cluster.

func (*Service) Definition

func (s *Service) Definition() service.Definition

Definition returns the definition of the cluster service.

func (*Service) Run

func (s *Service) Run(ctx context.Context, host service.Host) error

Run starts the cluster service. It will run until the provided context is canceled or there is a fatal error.

func (*Service) ServiceHandler

func (s *Service) ServiceHandler(host service.Host) (base string, handler http.Handler)

ServiceHandler returns the service handler for the clustering service. The resulting handler always returns 404 when clustering is disabled.

func (*Service) Update

func (s *Service) Update(newConfig any) error

Update implements service.Service. It returns an error since the cluster service does not support runtime configuration.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL