Documentation ¶
Overview ¶
The sync package contains the distributed coordination and choreography facility of Testground.
The sync service is lightweight, and uses Redis recipes to implement coordination primitives like barriers, signalling, and pubsub. Additional primitives like locks, semaphores, etc. are in scope, and may be added in the future.
Constructing sync.Clients ¶
To use the sync service, test plan writers must create a sync.DefaultClient via the sync.NewBoundClient constructor, passing a context that governs the lifetime of the sync.DefaultClient, as well as a runtime.RunEnv to bind to. All sync operations will be automatically scoped/namespaced to the runtime.RunEnv.
Infrastructure services, such as sidecar instances, can create generic sync.Clients via the sync.NewGenericClient constructor. Such clients are not bound/constrained to a runtime.RunEnv, and instead are required to pass in runtime.RunParams in the context.Context to all operations. See WithRunParams for more info.
Recommendations for test plan writers ¶
All constructors and methods on sync.DefaultClient have Must* versions, which panic if an error occurs. Using these methods in combination with runtime.Invoke is safe, as the runner captures panics and records them as test crashes. The resulting code will be less pedantic.
We have added sugar methods that compose basic primitives into frequently used katas, such as client.PublishSubscribe, client.SignalAndWait, client.PublishAndWait, etc. These katas also have Must* variations. We encourage developers to adopt them in order to streamline their code.
Garbage collection ¶
The sync service is decentralised: it has no centralised actor, dispatcher, or coordinator that supervises the lifetime of a test. All participants in a test hit Redis directly, using its operations to implement the sync primitives. As a result, keys from past runs can accumulate.
Sync clients can participate in collaborative garbage collection by enabling background GC:
client.EnableBackgroundGC(ch) // see method godoc for info on ch
GC uses SCAN and OBJECT IDLETIME operations to find keys to purge, and its configuration is controlled by the GC* variables.
In the standard testground architecture, only sidecar processes are participate in GC:
Index ¶
- Constants
- Variables
- func GetRunParams(ctx context.Context) *runtime.RunParams
- func NewInmemClient() *inmemClient
- func WithRunParams(ctx context.Context, rp *runtime.RunParams) context.Context
- type Barrier
- type Client
- type DefaultClient
- func MustBoundClient(ctx context.Context, runenv *runtime.RunEnv) *DefaultClient
- func MustGenericClient(ctx context.Context, log *zap.SugaredLogger) *DefaultClient
- func NewBoundClient(ctx context.Context, runenv *runtime.RunEnv) (*DefaultClient, error)
- func NewGenericClient(ctx context.Context, log *zap.SugaredLogger) (*DefaultClient, error)
- func (c *DefaultClient) Barrier(ctx context.Context, state State, target int) (*Barrier, error)
- func (c *DefaultClient) Close() error
- func (c DefaultClient) MustBarrier(ctx context.Context, state State, required int) *Barrier
- func (c DefaultClient) MustPublish(ctx context.Context, topic *Topic, payload interface{}) (seq int64)
- func (c DefaultClient) MustPublishAndWait(ctx context.Context, topic *Topic, payload interface{}, state State, ...) (seq int64)
- func (c DefaultClient) MustPublishSubscribe(ctx context.Context, topic *Topic, payload interface{}, ch interface{}) (seq int64, sub *Subscription)
- func (c DefaultClient) MustSignalAndWait(ctx context.Context, state State, target int) (seq int64)
- func (c DefaultClient) MustSignalEntry(ctx context.Context, state State) (current int64)
- func (c DefaultClient) MustSubscribe(ctx context.Context, topic *Topic, ch interface{}) (sub *Subscription)
- func (c *DefaultClient) Publish(ctx context.Context, topic *Topic, payload interface{}) (int64, error)
- func (c DefaultClient) PublishAndWait(ctx context.Context, topic *Topic, payload interface{}, state State, ...) (seq int64, err error)
- func (c DefaultClient) PublishSubscribe(ctx context.Context, topic *Topic, payload interface{}, ch interface{}) (seq int64, sub *Subscription, err error)
- func (c DefaultClient) SignalAndWait(ctx context.Context, state State, target int) (seq int64, err error)
- func (c *DefaultClient) SignalEntry(ctx context.Context, state State) (int64, error)
- func (c *DefaultClient) SignalEvent(ctx context.Context, event *runtime.Event) (err error)
- func (c *DefaultClient) Subscribe(ctx context.Context, topic *Topic, ch interface{}) (sub *Subscription, err error)
- func (c *DefaultClient) SubscribeEvents(ctx context.Context, rp *runtime.RunParams) (chan *runtime.Event, error)
- type State
- type Subscription
- type Topic
Constants ¶
const ( EnvServiceHost = "SYNC_SERVICE_HOST" EnvServicePort = "SYNC_SERVICE_PORT" )
Variables ¶
var ErrNoRunParameters = fmt.Errorf("no run parameters provided")
ErrNoRunParameters is returned by the generic client when an unbound context is passed in. See WithRunParams to bind RunParams to the context.
Functions ¶
func GetRunParams ¶
GetRunParams extracts the RunParams from a context, previously set by calling WithRunParams.
func NewInmemClient ¶ added in v0.2.2
func NewInmemClient() *inmemClient
NewInmemClient creates an in-memory sync client for testing.
Types ¶
type Barrier ¶
type Barrier struct { C chan error // contains filtered or unexported fields }
Barrier represents a barrier over a State. A Barrier is a synchronisation checkpoint that will fire once the `target` number of entries on that state have been registered.
type Client ¶
type Client interface { io.Closer Publish(ctx context.Context, topic *Topic, payload interface{}) (seq int64, err error) Subscribe(ctx context.Context, topic *Topic, ch interface{}) (*Subscription, error) PublishAndWait(ctx context.Context, topic *Topic, payload interface{}, state State, target int) (seq int64, err error) PublishSubscribe(ctx context.Context, topic *Topic, payload interface{}, ch interface{}) (seq int64, sub *Subscription, err error) Barrier(ctx context.Context, state State, target int) (*Barrier, error) SignalEntry(ctx context.Context, state State) (after int64, err error) SignalAndWait(ctx context.Context, state State, target int) (seq int64, err error) MustBarrier(ctx context.Context, state State, target int) *Barrier MustSignalEntry(ctx context.Context, state State) int64 MustSubscribe(ctx context.Context, topic *Topic, ch interface{}) *Subscription MustPublish(ctx context.Context, topic *Topic, payload interface{}) (seq int64) MustPublishAndWait(ctx context.Context, topic *Topic, payload interface{}, state State, target int) (seq int64) MustPublishSubscribe(ctx context.Context, topic *Topic, payload interface{}, ch interface{}) (seq int64, sub *Subscription) MustSignalAndWait(ctx context.Context, state State, target int) (seq int64) SignalEvent(context.Context, *runtime.Event) error }
type DefaultClient ¶ added in v0.2.3
type DefaultClient struct {
// contains filtered or unexported fields
}
func MustBoundClient ¶
func MustBoundClient(ctx context.Context, runenv *runtime.RunEnv) *DefaultClient
MustBoundClient creates a new bound client by calling NewBoundClient, and panicking if it errors.
func MustGenericClient ¶
func MustGenericClient(ctx context.Context, log *zap.SugaredLogger) *DefaultClient
MustGenericClient creates a new generic client by calling NewGenericClient, and panicking if it errors.
func NewBoundClient ¶
NewBoundClient returns a new sync DefaultClient that is bound to the provided RunEnv. All operations will be automatically scoped to the keyspace of that run.
The context passed in here will govern the lifecycle of the client. Cancelling it will cancel all ongoing operations. However, for a clean closure, the user should call Close().
For test plans, a suitable context to pass here is the background context.
func NewGenericClient ¶
func NewGenericClient(ctx context.Context, log *zap.SugaredLogger) (*DefaultClient, error)
NewGenericClient returns a new sync DefaultClient that is bound to no RunEnv. It is intended to be used by testground services like the sidecar.
All operations expect to find the RunParams of the run to scope its actions inside the supplied context.Context. Call WithRunParams to bind the appropriate RunParams.
The context passed in here will govern the lifecycle of the client. Cancelling it will cancel all ongoing operations. However, for a clean closure, the user should call Close().
A suitable context to pass here is the background context of the main process.
func (*DefaultClient) Barrier ¶ added in v0.2.3
Barrier sets a barrier on the supplied State that fires when it reaches its target value (or higher).
The caller should monitor the channel C returned inside the Barrier object. It fires when the barrier reaches its target, is cancelled, or fails. If the barrier is satisfied, the value sent will be nil. C must not be closed by the caller.
When the context fires, the context's error will be propagated instead. The same will occur if the DefaultClient's context fires.
It is safe to use a non-cancellable context here, like the background context. No cancellation is needed unless you want to stop the process early.
func (*DefaultClient) Close ¶ added in v0.2.3
func (c *DefaultClient) Close() error
Close closes this client, cancels ongoing operations, and releases resources.
func (DefaultClient) MustBarrier ¶ added in v0.2.3
MustBarrier calls Barrier, panicking if it errors.
Suitable for shorthanding in test plans.
func (DefaultClient) MustPublish ¶ added in v0.2.3
func (c DefaultClient) MustPublish(ctx context.Context, topic *Topic, payload interface{}) (seq int64)
MustPublish calls Publish, panicking if it errors.
Suitable for shorthanding in test plans.
func (DefaultClient) MustPublishAndWait ¶ added in v0.2.3
func (c DefaultClient) MustPublishAndWait(ctx context.Context, topic *Topic, payload interface{}, state State, target int) (seq int64)
MustPublishAndWait calls PublishAndWait, panicking if it errors.
Suitable for shorthanding in test plans.
func (DefaultClient) MustPublishSubscribe ¶ added in v0.2.3
func (c DefaultClient) MustPublishSubscribe(ctx context.Context, topic *Topic, payload interface{}, ch interface{}) (seq int64, sub *Subscription)
MustPublishSubscribe calls PublishSubscribe, panicking if it errors.
Suitable for shorthanding in test plans.
func (DefaultClient) MustSignalAndWait ¶ added in v0.2.3
MustSignalAndWait calls SignalAndWait, panicking if it errors.
Suitable for shorthanding in test plans.
func (DefaultClient) MustSignalEntry ¶ added in v0.2.3
MustSignalEntry calls SignalEntry, panicking if it errors.
Suitable for shorthanding in test plans.
func (DefaultClient) MustSubscribe ¶ added in v0.2.3
func (c DefaultClient) MustSubscribe(ctx context.Context, topic *Topic, ch interface{}) (sub *Subscription)
MustSubscribe calls Subscribe, panicking if it errors.
Suitable for shorthanding in test plans.
func (*DefaultClient) Publish ¶ added in v0.2.3
func (c *DefaultClient) Publish(ctx context.Context, topic *Topic, payload interface{}) (int64, error)
Publish publishes an item on the supplied topic. The payload type must match the payload type on the Topic; otherwise Publish will error.
This method returns synchronously, once the item has been published successfully, returning the sequence number of the new item in the ordered topic, or an error if one occurred, starting with 1 (for the first item).
If error is non-nil, the sequence number must be disregarded.
func (DefaultClient) PublishAndWait ¶ added in v0.2.3
func (c DefaultClient) PublishAndWait(ctx context.Context, topic *Topic, payload interface{}, state State, target int) (seq int64, err error)
PublishAndWait composes Publish and a Barrier. It first publishes the provided payload to the specified topic, then awaits for a barrier on the supplied state to reach the indicated target.
If any operation fails, PublishAndWait short-circuits and returns a non-nil error and a negative sequence. If Publish succeeds, but the Barrier fails, the seq number will be greater than zero.
func (DefaultClient) PublishSubscribe ¶ added in v0.2.3
func (c DefaultClient) PublishSubscribe(ctx context.Context, topic *Topic, payload interface{}, ch interface{}) (seq int64, sub *Subscription, err error)
PublishSubscribe publishes the payload on the supplied Topic, then subscribes to it, sending paylods to the supplied channel.
If any operation fails, PublishSubscribe short-circuits and returns a non-nil error and a negative sequence. If Publish succeeds, but Subscribe fails, the seq number will be greater than zero, but the returned Subscription will be nil, and the error, non-nil.
func (DefaultClient) SignalAndWait ¶ added in v0.2.3
func (c DefaultClient) SignalAndWait(ctx context.Context, state State, target int) (seq int64, err error)
SignalAndWait composes SignalEntry and Barrier, signalling entry on the supplied state, and then awaiting until the required value has been reached.
The returned error will be nil if the barrier was met successfully, or non-nil if the context expired, or some other error ocurred.
func (*DefaultClient) SignalEntry ¶ added in v0.2.3
SignalEntry increments the state counter by one, returning the value of the new value of the counter, or an error if the operation fails.
func (*DefaultClient) SignalEvent ¶ added in v0.2.6
SignalEvent emits an event attached to a certain test plan.
func (*DefaultClient) Subscribe ¶ added in v0.2.3
func (c *DefaultClient) Subscribe(ctx context.Context, topic *Topic, ch interface{}) (sub *Subscription, err error)
Subscribe subscribes to a topic, consuming ordered, typed elements from index 0, and sending them to channel ch.
The supplied channel must be buffered, and its type must be a value or pointer type matching the topic type. If these conditions are unmet, this method will error immediately.
The caller must consume from this channel promptly; failure to do so will backpressure the DefaultClient's subscription event loop.
func (*DefaultClient) SubscribeEvents ¶ added in v0.3.0
func (c *DefaultClient) SubscribeEvents(ctx context.Context, rp *runtime.RunParams) (chan *runtime.Event, error)
SubscribeEvents monitors the events sent by a specific test plan. This function is used by Testground to monitor all emitted events by the testplans, in particular the terminal events, such as SuccessEvent, FailureEvent and CrashEvent.
type State ¶
type State string
State represents a state in a distributed state machine, identified by a unique string within the test case.
type Subscription ¶
type Subscription struct {
// contains filtered or unexported fields
}
Subscription represents a receive channel for data being published in a Topic.
func (*Subscription) Done ¶
func (s *Subscription) Done() <-chan error
type Topic ¶
type Topic struct {
// contains filtered or unexported fields
}
Topic represents a meeting place for test instances to exchange arbitrary data.