subscription

package
v0.1.0-alpha.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 21, 2021 License: MIT Imports: 9 Imported by: 0

Documentation

Overview

Package subscription contains Event Subscription implementations to listens and process Events coming from an Event Store, such as running Projections.

Choose the Subscription type that is suited to your Event processor. Catch-up Subscriptions are the most commonly used type of Subscription, especially for Projections or Process Managers.

Volatile Subscriptions might be used for volatile Projections, e.g. when process restarts should erase the previous Projection value, typical for instance summaries or metrics recording, for example.

Index

Constants

View Source
const (
	DefaultPullCatchUpBufferSize = 48
	DefaultPullInterval          = 100 * time.Millisecond
	DefaultMaxPullInterval       = 1 * time.Second
)

Default values used by a CatchUp subscription.

Variables

This section is empty.

Functions

This section is empty.

Types

type CatchUp

type CatchUp struct {
	SubscriptionName string
	Target           stream.Target
	EventStore       eventstore.Streamer
	Checkpointer     checkpoint.Checkpointer
	Logger           logger.Logger

	// PullEvery is the minimum interval between each streaming call to the Event Store.
	//
	// Defaults to DefaultPullInterval if unspecified or negative value
	// has been provided.
	PullEvery time.Duration

	// MaxInterval is the maximum interval between each streaming call to the Event Store.
	// Use this value to ensure a specific eventual consistency window.
	//
	// Defaults to DefaultMaxPullInterval if unspecified or negative value
	// has been provided.
	MaxInterval time.Duration

	// BufferSize is the size of buffered channels used as EventStreams
	// by the Subscription when receiving Events from the Event Store.
	//
	// Defaults to DefaultPullCatchUpBufferSize if unspecified or a negative
	// value has been provided.
	BufferSize int
}

CatchUp represents a catch-up subscription that uses a Streamer Event Store to process messages, by "pulling" new Events, compared to a CatchUp subscription, which uses a combination of streaming (pulling) and subscribing to updates.

func (*CatchUp) Checkpoint

func (s *CatchUp) Checkpoint(ctx context.Context, event eventstore.Event) error

Checkpoint uses the Subscription Checkpointer instance to save the Global Sequence Number of the Event specified.

func (*CatchUp) Name

func (s *CatchUp) Name() string

Name is the name of the subscription.

func (*CatchUp) Start

func (s *CatchUp) Start(ctx context.Context, eventStream eventstore.EventStream) error

Start starts sending messages on the provided EventStream channel by calling the Event Store from where it last left off.

type Subscription

type Subscription interface {
	Name() string
	Start(ctx context.Context, eventStream eventstore.EventStream) error
	Checkpoint(ctx context.Context, event eventstore.Event) error
}

Subscription represents a named subscription to Events coming from an Event Stream, usually opened from an Event Store.

Starting a Subscription will open an Event Stream and start sinking messages on the provided channel.

Start should be implemented as a synchronous method, which should be stopped only when either the underlying Event Store call fails, or when the context is explicitly canceled.

Checkpoint should be implemented as a synchronous method, and should be called when an event has finished processing successfully. Implementations should forward the relevant event data to a Checkpointer to persist current state of the Subscription.

type Volatile

type Volatile struct {
	SubscriptionName string
	Target           stream.Target
	Logger           logger.Logger
	EventStore       interface {
		eventstore.Streamer
		eventstore.SequenceNumberGetter
	}

	// PullEvery is the minimum interval between each streaming call to the Event Store.
	//
	// Defaults to DefaultPullInterval if unspecified or negative value
	// has been provided.
	PullEvery time.Duration

	// MaxInterval is the maximum interval between each streaming call to the Event Store.
	// Use this value to ensure a specific eventual consistency window.
	//
	// Defaults to DefaultMaxPullInterval if unspecified or negative value
	// has been provided.
	MaxInterval time.Duration

	// BufferSize is the size of buffered channels used as EventStreams
	// by the Subscription when receiving Events from the Event Store.
	//
	// Defaults to DefaultPullCatchUpBufferSize if unspecified or a negative
	// value has been provided.
	BufferSize int
}

Volatile is a Subscription type that does not keep state of the last Event processed or received, nor survives the Subscription checkpoint between restarts.

Use this Subscription type for volatile processes, such as projecting realtime metrics, or when you're only interested in newer events committed to the Event Store.

func (*Volatile) Checkpoint

func (*Volatile) Checkpoint(ctx context.Context, event eventstore.Event) error

Checkpoint is a no-op operation, since the transient nature of the Subscription does not require to persist its current state.

func (*Volatile) Name

func (v *Volatile) Name() string

Name is the name of the subscription.

func (*Volatile) Start

func (v *Volatile) Start(ctx context.Context, es eventstore.EventStream) error

Start starts the Subscription by opening a subscribing Event Stream using the subscription's Subscriber instance.

Directories

Path Synopsis
Package checkpoint expose the Checkpointer interface, used to checkpoint, or save, the current progress of a Subscription, so that it might survive application restarts without reprocessing Events.
Package checkpoint expose the Checkpointer interface, used to checkpoint, or save, the current progress of a Subscription, so that it might survive application restarts without reprocessing Events.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL