raftlease

package
v0.0.0-...-b0bff92 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 7, 2021 License: AGPL-3.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// CommandVersion is the current version of the command format. If
	// this changes then we need to be sure that reading and applying
	// commands for previous versions still works.
	CommandVersion = 1

	// SnapshotVersion is the current version of the snapshot
	// format. Similarly, changes to the snapshot representation need
	// to be backward-compatible.
	SnapshotVersion = 1

	// OperationClaim denotes claiming a new lease.
	OperationClaim = "claim"

	// OperationExtend denotes extending an already-held lease.
	OperationExtend = "extend"

	// OperationRevoke denotes revoking an existing lease.
	OperationRevoke = "revoke"

	// OperationSetTime denotes updating stored global time (which
	// will also remove any expired leases).
	OperationSetTime = "setTime"

	// OperationPin pins a lease, preventing it from expiring
	// until it is unpinned.
	OperationPin = "pin"

	// OperationUnpin unpins a lease, restoring normal
	// lease expiry behaviour.
	OperationUnpin = "unpin"
)

Variables

This section is empty.

Functions

func RecoverError

func RecoverError(resp *ResponseError) error

RecoverError converts a ResponseError back into the specific error it represents, or into a generic error if it wasn't one of the singleton errors handled.

Types

type BatchFSM

type BatchFSM struct {
	*FSM
}

BatchFSM creates a FSM that allows for batching operations. Raft takes care of applying the batches in chunked sizes, which allows for restoring and snapshotting a the library level. Those should be transparent to the the FSM.

func NewBatchFSM

func NewBatchFSM(fsm *FSM) *BatchFSM

NewBatchFSM creates a BatchFSM from an existing FSM. By lifting the FSM into a BatchFSM allows for better performance when applying logs, it does this by only stealing a lock only when required.

func (*BatchFSM) ApplyBatch

func (f *BatchFSM) ApplyBatch(logs []*raft.Log) interface{}

ApplyBatch is invoked once a batch of log entries has been committed and are ready to be applied to the FSM. ApplyBatch will take in an array of log entries. These log entries will be in the order they were committed, will not have gaps, and could be of a few log types. Clients should check the log type prior to attempting to decode the data attached. Presently the LogCommand and LogConfiguration types will be sent.

The returned slice must be the same length as the input and each response should correlate to the log at the same index of the input. The returned values will be made available in the ApplyFuture returned by Raft.Apply method if that method was called on the same Raft node as the FSM. ApplyBatch is part of raft.BatchingFSM.

type Client

type Client interface {
	Request(context.Context, *Command) error
}

Client defines the methods for broadcasting a command.

type ClientMetrics

type ClientMetrics interface {
	RecordOperation(string, string, time.Time)
}

ClientMetrics represents the metrics during a client request.

type Command

type Command struct {
	// Version of the command format in case it changes,
	// and we need to handle multiple formats.
	Version int `yaml:"version"`

	// Operation is one of claim, extend, expire or setTime.
	Operation string `yaml:"operation"`

	// Namespace is the kind of lease.
	Namespace string `yaml:"namespace,omitempty"`

	// ModelUUID identifies the model the lease belongs to.
	ModelUUID string `yaml:"model-uuid,omitempty"`

	// Lease is the name of the lease the command affects.
	Lease string `yaml:"lease,omitempty"`

	// Holder is the name of the party claiming or extending the
	// lease.
	Holder string `yaml:"holder,omitempty"`

	// Duration is how long the lease should last.
	Duration time.Duration `yaml:"duration,omitempty"`

	// OldTime is the previous time for time updates (to avoid
	// applying stale ones).
	OldTime time.Time `yaml:"old-time,omitempty"`

	// NewTime is the time to store as the global time.
	NewTime time.Time `yaml:"new-time,omitempty"`

	// PinEntity is a tag representing an entity concerned
	// with a pin or unpin operation.
	PinEntity string `yaml:"pin-entity,omitempty"`
}

Command captures the details of an operation to be run on the FSM.

func (*Command) LeaseKey

func (c *Command) LeaseKey() lease.Key

LeaseKey makes a lease key from the fields in the command.

func (*Command) Marshal

func (c *Command) Marshal() ([]byte, error)

Marshal converts this command to a byte slice.

func (*Command) String

func (c *Command) String() string

String implements fmt.Stringer for the Command type.

func (*Command) Validate

func (c *Command) Validate() error

Validate checks that the command describes a valid state change.

type FSM

type FSM struct {
	// contains filtered or unexported fields
}

FSM stores the state of leases in the system.

func NewFSM

func NewFSM() *FSM

NewFSM returns a new FSM to store lease information.

func (*FSM) Apply

func (f *FSM) Apply(log *raft.Log) interface{}

Apply log is invoked once a log entry is committed. It returns a value which will be made available in the ApplyFuture returned by Raft.Apply method if that method was called on the same Raft node as the FSM. Apply is part of raft.FSM.

func (*FSM) GlobalTime

func (f *FSM) GlobalTime() time.Time

GlobalTime returns the FSM's internal time.

func (*FSM) LeaseGroup

func (f *FSM) LeaseGroup(getLocalTime func() time.Time, namespace, modelUUID string) map[lease.Key]lease.Info

LeaseGroup returns all leases matching the namespace and model - when there are many models this is more efficient than getting all the leases and filtering by model.

func (*FSM) Leases

func (f *FSM) Leases(getLocalTime func() time.Time, keys ...lease.Key) map[lease.Key]lease.Info

Leases gets information about all the leases in the system, optionally filtered by the input lease keys.

func (*FSM) Pinned

func (f *FSM) Pinned() map[lease.Key][]string

Pinned returns all the currently known lease pins and applications requiring the pinned behaviour.

func (*FSM) Restore

func (f *FSM) Restore(reader io.ReadCloser) error

Restore is part of raft.FSM.

func (*FSM) Snapshot

func (f *FSM) Snapshot() (raft.FSMSnapshot, error)

Snapshot is part of raft.FSM.

type FSMResponse

type FSMResponse interface {
	// Error is a lease error (rather than anything to do with the
	// raft machinery).
	Error() error

	// Notify tells the target what changes occurred because of the
	// applied command.
	// On encountering an error, notify will continue until all resulting
	// claims and expiries are exhausted, before returning the error messages.
	Notify(NotifyTarget) error
}

FSMResponse defines what will be available on the return value from FSM apply calls.

type ForwardRequest

type ForwardRequest struct {
	Command       string `yaml:"command"`
	ResponseTopic string `yaml:"response-topic"`
}

ForwardRequest is a message sent over the hub to the raft forwarder (only running on the raft leader node).

type ForwardResponse

type ForwardResponse struct {
	Error *ResponseError `yaml:"error"`
}

ForwardResponse is the response sent back from the raft forwarder.

type MetricsCollector

type MetricsCollector interface {
	// Describe is part of prometheus.Collector.
	Describe(ch chan<- *prometheus.Desc)

	// Collect is part of prometheus.Collector.
	Collect(ch chan<- prometheus.Metric)
}

type NotifyTarget

type NotifyTarget interface {
	// Claimed will be called when a new lease has been claimed.
	Claimed(lease.Key, string) error

	// Expired will be called when an existing lease has expired.
	Expired(lease.Key) error
}

NotifyTarget defines methods needed to keep an external database updated with who holds leases. (In non-test code the notify target will generally be the state DB.)

type OperationClientMetrics

type OperationClientMetrics struct {
	// contains filtered or unexported fields
}

func NewOperationClientMetrics

func NewOperationClientMetrics(clock clock.Clock) *OperationClientMetrics

func (*OperationClientMetrics) Collect

func (c *OperationClientMetrics) Collect(ch chan<- prometheus.Metric)

Collect is part of prometheus.Collector.

func (*OperationClientMetrics) Describe

func (c *OperationClientMetrics) Describe(ch chan<- *prometheus.Desc)

Describe is part of prometheus.Collector.

func (OperationClientMetrics) RecordOperation

func (m OperationClientMetrics) RecordOperation(operation, result string, start time.Time)

type PubsubClient

type PubsubClient struct {
	// contains filtered or unexported fields
}

func NewPubsubClient

func NewPubsubClient(config PubsubClientConfig) *PubsubClient

NewPubsubClient creates a PubSub raftlease client.

func (*PubsubClient) Request

func (c *PubsubClient) Request(ctx context.Context, command *Command) error

type PubsubClientConfig

type PubsubClientConfig struct {
	Hub            *pubsub.StructuredHub
	RequestTopic   string
	ClientMetrics  ClientMetrics
	Clock          clock.Clock
	ForwardTimeout time.Duration
}

PubsubClientConfig holds resources and settings needed to run the PubsubClient.

type ReadOnlyClock

type ReadOnlyClock interface {
	GlobalTime() time.Time
}

ReadOnlyClock describes a clock from which global time can be read.

type ReadonlyFSM

type ReadonlyFSM interface {
	ReadOnlyClock

	// Leases and LeaseGroup receive a func for retrieving time,
	// because it needs to be determined after potential lock-waiting
	// to be accurate.
	Leases(func() time.Time, ...lease.Key) map[lease.Key]lease.Info
	LeaseGroup(func() time.Time, string, string) map[lease.Key]lease.Info
	Pinned() map[lease.Key][]string
}

ReadonlyFSM defines the methods of the lease FSM the store can use - any writes must go through the hub.

type ResponseError

type ResponseError struct {
	Message string `yaml:"message"`
	Code    string `yaml:"code"`
}

ResponseError is used for sending error values back to the lease store via the hub.

func AsResponseError

func AsResponseError(err error) *ResponseError

AsResponseError returns a *ResponseError that can be sent back over the hub in response to a forwarded FSM command.

type Snapshot

type Snapshot struct {
	Version    int                           `yaml:"version"`
	Entries    map[SnapshotKey]SnapshotEntry `yaml:"entries"`
	Pinned     map[SnapshotKey][]string      `yaml:"pinned"`
	GlobalTime time.Time                     `yaml:"global-time"`
}

Snapshot defines the format of the FSM snapshot.

func (*Snapshot) Persist

func (s *Snapshot) Persist(sink raft.SnapshotSink) (err error)

Persist is part of raft.FSMSnapshot.

func (*Snapshot) Release

func (s *Snapshot) Release()

Release is part of raft.FSMSnapshot.

type SnapshotEntry

type SnapshotEntry struct {
	Holder   string        `yaml:"holder"`
	Start    time.Time     `yaml:"start"`
	Duration time.Duration `yaml:"duration"`
}

SnapshotEntry defines the format of a lease entry in a snapshot.

type SnapshotKey

type SnapshotKey struct {
	Namespace string `yaml:"namespace"`
	ModelUUID string `yaml:"model-uuid"`
	Lease     string `yaml:"lease"`
}

SnapshotKey defines the format of a lease key in a snapshot.

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store manages a raft FSM and forwards writes through a pubsub hub.

func NewStore

func NewStore(config StoreConfig) *Store

NewStore returns a core/lease.Store that manages leases in Raft.

func (*Store) ClaimLease

func (s *Store) ClaimLease(key lease.Key, req lease.Request, stop <-chan struct{}) error

ClaimLease is part of lease.Store.

func (*Store) Collect

func (s *Store) Collect(ch chan<- prometheus.Metric)

Collect is part of prometheus.Collector.

func (*Store) Describe

func (s *Store) Describe(ch chan<- *prometheus.Desc)

Describe is part of prometheus.Collector.

func (*Store) ExtendLease

func (s *Store) ExtendLease(key lease.Key, req lease.Request, stop <-chan struct{}) error

ExtendLease is part of lease.Store.

func (*Store) LeaseGroup

func (s *Store) LeaseGroup(namespace, modelUUID string) map[lease.Key]lease.Info

LeaseGroup is part of Lease.Store.

func (*Store) Leases

func (s *Store) Leases(keys ...lease.Key) map[lease.Key]lease.Info

Leases is part of lease.Store.

func (*Store) PinLease

func (s *Store) PinLease(key lease.Key, entity string, stop <-chan struct{}) error

PinLease is part of lease.Store.

func (*Store) Pinned

func (s *Store) Pinned() map[lease.Key][]string

Pinned is part of the Store interface.

func (*Store) RevokeLease

func (s *Store) RevokeLease(key lease.Key, holder string, stop <-chan struct{}) error

RevokeLease is part of lease.Store.

func (*Store) UnpinLease

func (s *Store) UnpinLease(key lease.Key, entity string, stop <-chan struct{}) error

UnpinLease is part of lease.Store.

type StoreConfig

type StoreConfig struct {
	FSM              ReadonlyFSM
	Client           Client
	Trapdoor         TrapdoorFunc
	Clock            clock.Clock
	MetricsCollector MetricsCollector
}

StoreConfig holds resources and settings needed to run the Store.

type TrapdoorFunc

type TrapdoorFunc func(lease.Key, string) lease.Trapdoor

TrapdoorFunc returns a trapdoor to be attached to lease details for use by clients. This is intended to hold assertions that can be added to state transactions to ensure the lease is still held when the transaction is applied.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL