sync

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 24, 2021 License: Apache-2.0, MIT Imports: 15 Imported by: 57

Documentation

Overview

The sync package contains the distributed coordination and choreography facility of Testground.

The sync service is lightweight, and uses Redis recipes to implement coordination primitives like barriers, signalling, and pubsub. Additional primitives like locks, semaphores, etc. are in scope, and may be added in the future.

Constructing sync.Clients

To use the sync service, test plan writers must create a sync.DefaultClient via the sync.NewBoundClient constructor, passing a context that governs the lifetime of the sync.DefaultClient, as well as a runtime.RunEnv to bind to. All sync operations will be automatically scoped/namespaced to the runtime.RunEnv.

Infrastructure services, such as sidecar instances, can create generic sync.Clients via the sync.NewGenericClient constructor. Such clients are not bound/constrained to a runtime.RunEnv, and instead are required to pass in runtime.RunParams in the context.Context to all operations. See WithRunParams for more info.

Recommendations for test plan writers

All constructors and methods on sync.DefaultClient have Must* versions, which panic if an error occurs. Using these methods in combination with runtime.Invoke is safe, as the runner captures panics and records them as test crashes. The resulting code will be less pedantic.

We have added sugar methods that compose basic primitives into frequently used katas, such as client.PublishSubscribe, client.SignalAndWait, client.PublishAndWait, etc. These katas also have Must* variations. We encourage developers to adopt them in order to streamline their code.

Garbage collection

The sync service is decentralised: it has no centralised actor, dispatcher, or coordinator that supervises the lifetime of a test. All participants in a test hit Redis directly, using its operations to implement the sync primitives. As a result, keys from past runs can accumulate.

Sync clients can participate in collaborative garbage collection by enabling background GC:

client.EnableBackgroundGC(ch) // see method godoc for info on ch

GC uses SCAN and OBJECT IDLETIME operations to find keys to purge, and its configuration is controlled by the GC* variables.

In the standard testground architecture, only sidecar processes are participate in GC:

Index

Constants

View Source
const (
	EnvServiceHost = "SYNC_SERVICE_HOST"
	EnvServicePort = "SYNC_SERVICE_PORT"
)

Variables

View Source
var ErrNoRunParameters = fmt.Errorf("no run parameters provided")

ErrNoRunParameters is returned by the generic client when an unbound context is passed in. See WithRunParams to bind RunParams to the context.

Functions

func GetRunParams

func GetRunParams(ctx context.Context) *runtime.RunParams

GetRunParams extracts the RunParams from a context, previously set by calling WithRunParams.

func NewInmemClient added in v0.2.2

func NewInmemClient() *inmemClient

NewInmemClient creates an in-memory sync client for testing.

func WithRunParams

func WithRunParams(ctx context.Context, rp *runtime.RunParams) context.Context

WithRunParams returns a context that embeds the supplied RunParams, such that it can be passed to a GenericClient.

Types

type Barrier

type Barrier struct {
	C chan error
	// contains filtered or unexported fields
}

Barrier represents a barrier over a State. A Barrier is a synchronisation checkpoint that will fire once the `target` number of entries on that state have been registered.

type Client

type Client interface {
	io.Closer

	Publish(ctx context.Context, topic *Topic, payload interface{}) (seq int64, err error)
	Subscribe(ctx context.Context, topic *Topic, ch interface{}) (*Subscription, error)
	PublishAndWait(ctx context.Context, topic *Topic, payload interface{}, state State, target int) (seq int64, err error)
	PublishSubscribe(ctx context.Context, topic *Topic, payload interface{}, ch interface{}) (seq int64, sub *Subscription, err error)

	Barrier(ctx context.Context, state State, target int) (*Barrier, error)
	SignalEntry(ctx context.Context, state State) (after int64, err error)
	SignalAndWait(ctx context.Context, state State, target int) (seq int64, err error)

	MustBarrier(ctx context.Context, state State, target int) *Barrier
	MustSignalEntry(ctx context.Context, state State) int64
	MustSubscribe(ctx context.Context, topic *Topic, ch interface{}) *Subscription
	MustPublish(ctx context.Context, topic *Topic, payload interface{}) (seq int64)

	MustPublishAndWait(ctx context.Context, topic *Topic, payload interface{}, state State, target int) (seq int64)
	MustPublishSubscribe(ctx context.Context, topic *Topic, payload interface{}, ch interface{}) (seq int64, sub *Subscription)
	MustSignalAndWait(ctx context.Context, state State, target int) (seq int64)

	SignalEvent(context.Context, *runtime.Event) error
}

type DefaultClient added in v0.2.3

type DefaultClient struct {
	// contains filtered or unexported fields
}

func MustBoundClient

func MustBoundClient(ctx context.Context, runenv *runtime.RunEnv) *DefaultClient

MustBoundClient creates a new bound client by calling NewBoundClient, and panicking if it errors.

func MustGenericClient

func MustGenericClient(ctx context.Context, log *zap.SugaredLogger) *DefaultClient

MustGenericClient creates a new generic client by calling NewGenericClient, and panicking if it errors.

func NewBoundClient

func NewBoundClient(ctx context.Context, runenv *runtime.RunEnv) (*DefaultClient, error)

NewBoundClient returns a new sync DefaultClient that is bound to the provided RunEnv. All operations will be automatically scoped to the keyspace of that run.

The context passed in here will govern the lifecycle of the client. Cancelling it will cancel all ongoing operations. However, for a clean closure, the user should call Close().

For test plans, a suitable context to pass here is the background context.

func NewGenericClient

func NewGenericClient(ctx context.Context, log *zap.SugaredLogger) (*DefaultClient, error)

NewGenericClient returns a new sync DefaultClient that is bound to no RunEnv. It is intended to be used by testground services like the sidecar.

All operations expect to find the RunParams of the run to scope its actions inside the supplied context.Context. Call WithRunParams to bind the appropriate RunParams.

The context passed in here will govern the lifecycle of the client. Cancelling it will cancel all ongoing operations. However, for a clean closure, the user should call Close().

A suitable context to pass here is the background context of the main process.

func (*DefaultClient) Barrier added in v0.2.3

func (c *DefaultClient) Barrier(ctx context.Context, state State, target int) (*Barrier, error)

Barrier sets a barrier on the supplied State that fires when it reaches its target value (or higher).

The caller should monitor the channel C returned inside the Barrier object. It fires when the barrier reaches its target, is cancelled, or fails. If the barrier is satisfied, the value sent will be nil. C must not be closed by the caller.

When the context fires, the context's error will be propagated instead. The same will occur if the DefaultClient's context fires.

It is safe to use a non-cancellable context here, like the background context. No cancellation is needed unless you want to stop the process early.

func (*DefaultClient) Close added in v0.2.3

func (c *DefaultClient) Close() error

Close closes this client, cancels ongoing operations, and releases resources.

func (DefaultClient) MustBarrier added in v0.2.3

func (c DefaultClient) MustBarrier(ctx context.Context, state State, required int) *Barrier

MustBarrier calls Barrier, panicking if it errors.

Suitable for shorthanding in test plans.

func (DefaultClient) MustPublish added in v0.2.3

func (c DefaultClient) MustPublish(ctx context.Context, topic *Topic, payload interface{}) (seq int64)

MustPublish calls Publish, panicking if it errors.

Suitable for shorthanding in test plans.

func (DefaultClient) MustPublishAndWait added in v0.2.3

func (c DefaultClient) MustPublishAndWait(ctx context.Context, topic *Topic, payload interface{}, state State, target int) (seq int64)

MustPublishAndWait calls PublishAndWait, panicking if it errors.

Suitable for shorthanding in test plans.

func (DefaultClient) MustPublishSubscribe added in v0.2.3

func (c DefaultClient) MustPublishSubscribe(ctx context.Context, topic *Topic, payload interface{}, ch interface{}) (seq int64, sub *Subscription)

MustPublishSubscribe calls PublishSubscribe, panicking if it errors.

Suitable for shorthanding in test plans.

func (DefaultClient) MustSignalAndWait added in v0.2.3

func (c DefaultClient) MustSignalAndWait(ctx context.Context, state State, target int) (seq int64)

MustSignalAndWait calls SignalAndWait, panicking if it errors.

Suitable for shorthanding in test plans.

func (DefaultClient) MustSignalEntry added in v0.2.3

func (c DefaultClient) MustSignalEntry(ctx context.Context, state State) (current int64)

MustSignalEntry calls SignalEntry, panicking if it errors.

Suitable for shorthanding in test plans.

func (DefaultClient) MustSubscribe added in v0.2.3

func (c DefaultClient) MustSubscribe(ctx context.Context, topic *Topic, ch interface{}) (sub *Subscription)

MustSubscribe calls Subscribe, panicking if it errors.

Suitable for shorthanding in test plans.

func (*DefaultClient) Publish added in v0.2.3

func (c *DefaultClient) Publish(ctx context.Context, topic *Topic, payload interface{}) (int64, error)

Publish publishes an item on the supplied topic. The payload type must match the payload type on the Topic; otherwise Publish will error.

This method returns synchronously, once the item has been published successfully, returning the sequence number of the new item in the ordered topic, or an error if one occurred, starting with 1 (for the first item).

If error is non-nil, the sequence number must be disregarded.

func (DefaultClient) PublishAndWait added in v0.2.3

func (c DefaultClient) PublishAndWait(ctx context.Context, topic *Topic, payload interface{}, state State, target int) (seq int64, err error)

PublishAndWait composes Publish and a Barrier. It first publishes the provided payload to the specified topic, then awaits for a barrier on the supplied state to reach the indicated target.

If any operation fails, PublishAndWait short-circuits and returns a non-nil error and a negative sequence. If Publish succeeds, but the Barrier fails, the seq number will be greater than zero.

func (DefaultClient) PublishSubscribe added in v0.2.3

func (c DefaultClient) PublishSubscribe(ctx context.Context, topic *Topic, payload interface{}, ch interface{}) (seq int64, sub *Subscription, err error)

PublishSubscribe publishes the payload on the supplied Topic, then subscribes to it, sending paylods to the supplied channel.

If any operation fails, PublishSubscribe short-circuits and returns a non-nil error and a negative sequence. If Publish succeeds, but Subscribe fails, the seq number will be greater than zero, but the returned Subscription will be nil, and the error, non-nil.

func (DefaultClient) SignalAndWait added in v0.2.3

func (c DefaultClient) SignalAndWait(ctx context.Context, state State, target int) (seq int64, err error)

SignalAndWait composes SignalEntry and Barrier, signalling entry on the supplied state, and then awaiting until the required value has been reached.

The returned error will be nil if the barrier was met successfully, or non-nil if the context expired, or some other error ocurred.

func (*DefaultClient) SignalEntry added in v0.2.3

func (c *DefaultClient) SignalEntry(ctx context.Context, state State) (int64, error)

SignalEntry increments the state counter by one, returning the value of the new value of the counter, or an error if the operation fails.

func (*DefaultClient) SignalEvent added in v0.2.6

func (c *DefaultClient) SignalEvent(ctx context.Context, event *runtime.Event) (err error)

SignalEvent emits an event attached to a certain test plan.

func (*DefaultClient) Subscribe added in v0.2.3

func (c *DefaultClient) Subscribe(ctx context.Context, topic *Topic, ch interface{}) (sub *Subscription, err error)

Subscribe subscribes to a topic, consuming ordered, typed elements from index 0, and sending them to channel ch.

The supplied channel must be buffered, and its type must be a value or pointer type matching the topic type. If these conditions are unmet, this method will error immediately.

The caller must consume from this channel promptly; failure to do so will backpressure the DefaultClient's subscription event loop.

func (*DefaultClient) SubscribeEvents added in v0.3.0

func (c *DefaultClient) SubscribeEvents(ctx context.Context, rp *runtime.RunParams) (chan *runtime.Event, error)

SubscribeEvents monitors the events sent by a specific test plan. This function is used by Testground to monitor all emitted events by the testplans, in particular the terminal events, such as SuccessEvent, FailureEvent and CrashEvent.

type State

type State string

State represents a state in a distributed state machine, identified by a unique string within the test case.

func (State) Key

func (s State) Key(rp *runtime.RunParams) string

Key gets the Redis key for this State, contextualized to a set of RunParams.

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

Subscription represents a receive channel for data being published in a Topic.

func (*Subscription) Done

func (s *Subscription) Done() <-chan error

type Topic

type Topic struct {
	// contains filtered or unexported fields
}

Topic represents a meeting place for test instances to exchange arbitrary data.

func NewTopic

func NewTopic(name string, typ interface{}) *Topic

NewTopic constructs a Topic with the provided name, and the type of the supplied value, derived via reflect.TypeOf, unless the supplied value is already a reflect.Type. This method does not retain actual value from which the type is derived.

func (Topic) Key

func (t Topic) Key(rp *runtime.RunParams) string

Key gets the key for this Topic, contextualized to a set of RunParams.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL