snapshot

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 11, 2022 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrUnimplemented = errors.New("aggregate does not implement (Un)Marshaler, encoding.Binary(Un)Marshaler or encoding.Text(Un)Marshaler")

ErrUnimplemented is returned when trying to marshal or unmarshal a snapshot into an aggregate that does not implement one of the supported marshalers.

View Source
var ForEvery = ForEach

ForEvery is an alias for ForEach.

Deprecated: Use ForEach instead.

Functions

func ForEach

func ForEach(
	ctx context.Context,
	snapFn func(Snapshot),
	errFn func(error),
	str <-chan Snapshot,
	errs ...<-chan error,
)

ForEach iterates over the given Snapshot and error channels and for every Snapshot s calls snapFn(s) and for every error e calls errorFn(e) until all channels are closed or ctx is canceled.

func Marshal

func Marshal(a interface{}) ([]byte, error)

Marshal encodes the given aggregate into a byte slice. If a implements Marshaler, a.MarshalSnapshot() is returned. If a implements encoding.BinaryMarshaler, a.MarshalBinary() is returned and if a implements encoding.TextMarshaler, a.MarshalText is returned. If a implements none of these interfaces, Marshal uses encoding/gob to marshal the snapshot.

func Unmarshal

func Unmarshal(s Snapshot, a Target) error

Unmarshal decodes the given snapshot into the given aggregate. If a implements Unmarshaler, a.UnmarshalSnapshot() is returned. If a implements encoding.BinaryMarshaler, a.UnmarshalBinary() is returned and if a implements encoding.TextUnmarshaler, a.UnmarshalText() is returned. If a implements none of these interfaces, encoding/gob is used to unmarshal the snapshot.

func Walk

func Walk(
	ctx context.Context,
	walkFn func(Snapshot) error,
	snaps <-chan Snapshot,
	errs ...<-chan error,
) error

Walk retrieves from the given Snapshot channel until it and the error channels are closed, ctx is canceled or any of the provided error channels receives an error. For every Snapshot s that is received from the Snapshot channel, walkFn(s) is called. Should ctx be canceled before the Snapshot channel is closed, ctx.Err() is returned. Should an error be received from one of the error channels, that error is returned. Otherwise Walk returns nil.

Example:

var store snapshot.Store
snaps, errs, err := store.Query(context.TODO(), query.New())
// handle err
err := snapshot.Walk(context.TODO(), func(s snapshot.Snapshot) {
	log.Println(fmt.Sprintf("Received Snapshot: %v", s))
	return nil
}, snaps, errs)
// handle err

Types

type Marshaler

type Marshaler interface {
	MarshalSnapshot() ([]byte, error)
}

A Marshaler can encode itself into bytes. Aggregates must implement Marshaler & Unmarshaler for Snapshots to work.

Example using encoding/gob:

type foo struct {
	aggregate.Aggregate
	state
}

type state struct {
	Name string
	Age uint8
}

func (f *foo) MarshalSnapshot() ([]byte, error) {
	var buf bytes.Buffer
	err := gob.NewEncoder(&buf).Encode(f.state)
	return buf.Bytes(), err
}

func (f *foo) UnmarshalSnapshot(p []byte) error {
	return gob.NewDecoder(bytes.NewReader(p)).Decode(&f.state)
}

type Option

type Option func(*snapshot)

Option is an option for creating a snapshot.

func Data

func Data(b []byte) Option

Data returns an Option that overrides the encoded data of a snapshot.

func Time

func Time(t time.Time) Option

Time returns an Option that sets the Time of a snapshot.

type Query

type Query interface {
	aggregate.Query

	Times() time.Constraints
}

Query is a query for snapshots.

type Schedule

type Schedule interface {
	// Test returns true if the given Aggregate should be snapshotted.
	Test(aggregate.Aggregate) bool
}

A Schedule determines if an Aggregate is scheduled to be snapshotted.

func Every

func Every(n int) Schedule

Every returns a Schedule that instructs to make Snapshots of an Aggregate every nth Event of that Aggregate.

type Snapshot

type Snapshot interface {
	// AggregateName returns the name of the aggregate.
	AggregateName() string

	// AggregateID returns the UUID of the aggregate.
	AggregateID() uuid.UUID

	// AggregateVersion returns the version of the aggregate at the time of the snapshot.
	AggregateVersion() int

	// Time returns the time of the snapshot.
	Time() time.Time

	// State returns the encoded state of the aggregate at the time of the snapshot.
	State() []byte
}

Snapshot is a snapshot of an Aggregate.

func Drain

func Drain(
	ctx context.Context,
	snaps <-chan Snapshot,
	errs ...<-chan error,
) ([]Snapshot, error)

Drain drains the given Snapshot channel and returns its Snapshots.

Drain accepts optional error channels which will cause Drain to fail on any error. When Drain encounters an error from any of the error channels, the already drained Snapshots and that error are returned. Similarly, when ctx is canceled, the drained Snapshots and ctx.Err() are returned.

Drain returns when the provided Snapshot channel is closed or it encounters an error from an error channel and does not wait for the error channels to be closed.

func New

func New(a aggregate.Aggregate, opts ...Option) (Snapshot, error)

New creates and returns a snapshot of the given aggregate.

func Sort

func Sort(snaps []Snapshot, s aggregate.Sorting, dir aggregate.SortDirection) []Snapshot

Sort sorts Snapshot and returns the sorted Snapshots.

func SortMulti

func SortMulti(snaps []Snapshot, sorts ...aggregate.SortOptions) []Snapshot

SortMulti sorts Snapshots by multiple fields and returns the sorted aggregates.

type Store

type Store interface {
	// Save saves the given Snapshot into the Store.
	Save(context.Context, Snapshot) error

	// Latest returns the latest Snapshot for the Aggregate with the given name
	// and UUID.
	Latest(context.Context, string, uuid.UUID) (Snapshot, error)

	// Version returns the Snapshot with the given version for the Aggregate
	// with the given name and UUID. Implementations should return an error if
	// the specified Snapshot does not exist in the Store.
	Version(context.Context, string, uuid.UUID, int) (Snapshot, error)

	// Limit returns the latest Snapshot that has a version equal to or lower
	// than the given version. Implementations should return an error if no
	// such Snapshot can be found.
	Limit(context.Context, string, uuid.UUID, int) (Snapshot, error)

	// Query queries the Store for Snapshots that fit the given Query and
	// returns a channel of Snapshots and a channel of errors.
	//
	// Example:
	//
	//	var store snapshot.Store
	//	snaps, errs, err := store.Query(context.TODO(), query.New())
	//	// handle err
	//	err := snapshot.Walk(context.TODO(), func(snap snapshot.Snapshot) {
	//		log.Println(fmt.Sprintf("Queried Snapshot: %v", snap))
	//		foo := newFoo(snap.AggregateID())
	//		err := snapshot.Unmarshal(snap, foo)
	//		// handle err
	//	}, snaps, errs)
	//	// handle err
	Query(context.Context, Query) (<-chan Snapshot, <-chan error, error)

	// Delete deletes a Snapshot from the Store.
	Delete(context.Context, Snapshot) error
}

Store is a database for aggregate snapshots.

type Target

type Target interface {
	SetVersion(int)
}

Target is a snapshot target. This should be an aggregate that implements the SetVersion function.

type Unmarshaler

type Unmarshaler interface {
	UnmarshalSnapshot([]byte) error
}

An Unmarshaler can decode itself from bytes.

Directories

Path Synopsis
Package mock_snapshot is a generated GoMock package.
Package mock_snapshot is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL