Documentation ¶
Index ¶
- Variables
- func ForEach(ctx context.Context, snapFn func(Snapshot), errFn func(error), ...)
- func Marshal(a interface{}) ([]byte, error)
- func Unmarshal(s Snapshot, a Target) error
- func Walk(ctx context.Context, walkFn func(Snapshot) error, snaps <-chan Snapshot, ...) error
- type Marshaler
- type Option
- type Query
- type Schedule
- type Snapshot
- func Drain(ctx context.Context, snaps <-chan Snapshot, errs ...<-chan error) ([]Snapshot, error)
- func New(a aggregate.Aggregate, opts ...Option) (Snapshot, error)
- func Sort(snaps []Snapshot, s aggregate.Sorting, dir aggregate.SortDirection) []Snapshot
- func SortMulti(snaps []Snapshot, sorts ...aggregate.SortOptions) []Snapshot
- type Store
- type Target
- type Unmarshaler
Constants ¶
This section is empty.
Variables ¶
var ErrUnimplemented = errors.New("aggregate does not implement (Un)Marshaler, encoding.Binary(Un)Marshaler or encoding.Text(Un)Marshaler")
ErrUnimplemented is returned when trying to marshal or unmarshal a snapshot into an aggregate that does not implement one of the supported marshalers.
var ForEvery = ForEach
ForEvery is an alias for ForEach.
Deprecated: Use ForEach instead.
Functions ¶
func ForEach ¶
func ForEach( ctx context.Context, snapFn func(Snapshot), errFn func(error), str <-chan Snapshot, errs ...<-chan error, )
ForEach iterates over the given Snapshot and error channels and for every Snapshot s calls snapFn(s) and for every error e calls errorFn(e) until all channels are closed or ctx is canceled.
func Marshal ¶
Marshal encodes the given aggregate into a byte slice. If a implements Marshaler, a.MarshalSnapshot() is returned. If a implements encoding.BinaryMarshaler, a.MarshalBinary() is returned and if a implements encoding.TextMarshaler, a.MarshalText is returned. If a implements none of these interfaces, Marshal uses encoding/gob to marshal the snapshot.
func Unmarshal ¶
Unmarshal decodes the given snapshot into the given aggregate. If a implements Unmarshaler, a.UnmarshalSnapshot() is returned. If a implements encoding.BinaryMarshaler, a.UnmarshalBinary() is returned and if a implements encoding.TextUnmarshaler, a.UnmarshalText() is returned. If a implements none of these interfaces, encoding/gob is used to unmarshal the snapshot.
func Walk ¶
func Walk( ctx context.Context, walkFn func(Snapshot) error, snaps <-chan Snapshot, errs ...<-chan error, ) error
Walk retrieves from the given Snapshot channel until it and the error channels are closed, ctx is canceled or any of the provided error channels receives an error. For every Snapshot s that is received from the Snapshot channel, walkFn(s) is called. Should ctx be canceled before the Snapshot channel is closed, ctx.Err() is returned. Should an error be received from one of the error channels, that error is returned. Otherwise Walk returns nil.
Example:
var store snapshot.Store snaps, errs, err := store.Query(context.TODO(), query.New()) // handle err err := snapshot.Walk(context.TODO(), func(s snapshot.Snapshot) { log.Println(fmt.Sprintf("Received Snapshot: %v", s)) return nil }, snaps, errs) // handle err
Types ¶
type Marshaler ¶
A Marshaler can encode itself into bytes. Aggregates must implement Marshaler & Unmarshaler for Snapshots to work.
Example using encoding/gob:
type foo struct { aggregate.Aggregate state } type state struct { Name string Age uint8 } func (f *foo) MarshalSnapshot() ([]byte, error) { var buf bytes.Buffer err := gob.NewEncoder(&buf).Encode(f.state) return buf.Bytes(), err } func (f *foo) UnmarshalSnapshot(p []byte) error { return gob.NewDecoder(bytes.NewReader(p)).Decode(&f.state) }
type Option ¶
type Option func(*snapshot)
Option is an option for creating a snapshot.
type Query ¶
type Query interface { aggregate.Query Times() time.Constraints }
Query is a query for snapshots.
type Schedule ¶
type Schedule interface { // Test returns true if the given Aggregate should be snapshotted. Test(aggregate.Aggregate) bool }
A Schedule determines if an Aggregate is scheduled to be snapshotted.
type Snapshot ¶
type Snapshot interface { // AggregateName returns the name of the aggregate. AggregateName() string // AggregateID returns the UUID of the aggregate. AggregateID() uuid.UUID // AggregateVersion returns the version of the aggregate at the time of the snapshot. AggregateVersion() int // Time returns the time of the snapshot. Time() time.Time // State returns the encoded state of the aggregate at the time of the snapshot. State() []byte }
Snapshot is a snapshot of an Aggregate.
func Drain ¶
Drain drains the given Snapshot channel and returns its Snapshots.
Drain accepts optional error channels which will cause Drain to fail on any error. When Drain encounters an error from any of the error channels, the already drained Snapshots and that error are returned. Similarly, when ctx is canceled, the drained Snapshots and ctx.Err() are returned.
Drain returns when the provided Snapshot channel is closed or it encounters an error from an error channel and does not wait for the error channels to be closed.
type Store ¶
type Store interface { // Save saves the given Snapshot into the Store. Save(context.Context, Snapshot) error // Latest returns the latest Snapshot for the Aggregate with the given name // and UUID. Latest(context.Context, string, uuid.UUID) (Snapshot, error) // Version returns the Snapshot with the given version for the Aggregate // with the given name and UUID. Implementations should return an error if // the specified Snapshot does not exist in the Store. Version(context.Context, string, uuid.UUID, int) (Snapshot, error) // Limit returns the latest Snapshot that has a version equal to or lower // than the given version. Implementations should return an error if no // such Snapshot can be found. Limit(context.Context, string, uuid.UUID, int) (Snapshot, error) // Query queries the Store for Snapshots that fit the given Query and // returns a channel of Snapshots and a channel of errors. // // Example: // // var store snapshot.Store // snaps, errs, err := store.Query(context.TODO(), query.New()) // // handle err // err := snapshot.Walk(context.TODO(), func(snap snapshot.Snapshot) { // log.Println(fmt.Sprintf("Queried Snapshot: %v", snap)) // foo := newFoo(snap.AggregateID()) // err := snapshot.Unmarshal(snap, foo) // // handle err // }, snaps, errs) // // handle err Query(context.Context, Query) (<-chan Snapshot, <-chan error, error) // Delete deletes a Snapshot from the Store. Delete(context.Context, Snapshot) error }
Store is a database for aggregate snapshots.
type Target ¶
type Target interface {
SetVersion(int)
}
Target is a snapshot target. This should be an aggregate that implements the SetVersion function.
type Unmarshaler ¶
An Unmarshaler can decode itself from bytes.