replication

package
v0.0.0-...-b0c7fd6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 18, 2023 License: MIT Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrUnavailable   = errors.New("not enough replicas alive")
	ErrNotEnoughAcks = errors.New("consistency level not satisfied")
)

Functions

This section is empty.

Types

type OpDel

type OpDel struct {
	Logger  kitlog.Logger
	Timeout time.Duration
	Level   consistency.Level
	Cluster membership.Cluster
}

func (*OpDel) Do

func (s *OpDel) Do(ctx context.Context, key, version string) (*OpDelResult, error)

type OpDelResult

type OpDelResult struct {
	Version      string
	Acknowledged int
}

type OpGet

type OpGet struct {
	Logger  kitlog.Logger
	Timeout time.Duration
	Level   consistency.Level
	Cluster membership.Cluster
}

func (*OpGet) Do

func (op *OpGet) Do(ctx context.Context, key string) (*OpGetResult, error)

type OpGetResult

type OpGetResult struct {
	Version string
	Values  [][]byte
}

type OpPut

type OpPut struct {
	Logger  kitlog.Logger
	Timeout time.Duration
	Level   consistency.Level
	Cluster membership.Cluster
}

func (*OpPut) Do

func (op *OpPut) Do(ctx context.Context, key string, value []byte, version string) (*OpPutResult, error)

type OpPutResult

type OpPutResult struct {
	Version      string
	Acknowledged int
}

type Replicate

type Replicate[T any] struct {
	// Cluster is the cluster to use for node discovery and connection management.
	Cluster membership.Cluster
	// Logger is used to log errors and debug information.
	Logger kitlog.Logger
	// Nodes is the set of nodes to send the request to. The nodes are expected
	// to be alive, but the operation will not fail if some of them are not.
	Nodes []membership.Node
	// AckedNodes is a set of node ids that have acknowledged the request, after the
	// operation is complete. You can put the primary node here in advance to skip it
	// during the map phase.
	AckedNodes map[membership.NodeID]struct{}
	// Timeout is the maximum amount of time for each node to respond.
	Timeout time.Duration
	// MinAcks is the minimum number of nodes that must acknowledge the request to
	// consider it successful. It not, the operation will return the ErrNotEnoughAcks error.
	// If MinAcks is 0, the operation will return immediately after the map phase.
	MinAcks int
	// Background indicates whether the operation should continue to run in
	// background for the remaining nodes after the minimum number of acknowledgments
	// has been received.
	Background bool
}

Replicate is used to send a request to a set of nodes and ensure that enough nodes have acknowledged the request. The coordinator will send the request to all nodes in the replica set in parallel and then wait for the minimum number of acknowledgments to be received. If the minimum number of acknowledgments is not received within the timeout, the operation will fail with the ErrNotEnoughAcks error.

func (Replicate[T]) Do

func (o Replicate[T]) Do(ctx context.Context, sendRequest RequestMapper[T], handleResponse ResponseHandler[T]) error

type RequestMapper

type RequestMapper[T any] func(context.Context, membership.NodeID, *nodeapi.Client) (T, error)

RequestMapper is called for each node in the replica set. The function should send a request to the node and then either return the result or an error. If the error is nil, the node is considered to have acknowledged the request.

type ResponseHandler

type ResponseHandler[T any] func(abort func(), nodeID membership.NodeID, ret T, err error) error

ResponseHandler is called for each reply. If the function returns an error, the whole operation is aborted and the error is propagated to the caller. The abort function can be called to manually abort the operation based on nodes' replies.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL