Documentation
¶
Overview ¶
Example ¶
package main import ( "context" "fmt" "time" ) const ( Packed = "packed" PickedUp = "picked-up" Shipped = "shipped" ) type ShipmentPacked struct{ EventSkeleton } type ShipmentPickedUp struct{ EventSkeleton } type ShipmentShipped struct{ EventSkeleton } type Shipment struct { AggregateRootBase Status string } func (s *Shipment) Handle(ctx context.Context, v interface{}) error { var e Event switch cmd := v.(type) { case PackShipment: e = &ShipmentPacked{EventSkeleton{ At: time.Now(), ID: cmd.AggregateID(), }} case PickupShipment: e = &ShipmentPickedUp{EventSkeleton{ At: time.Now(), ID: cmd.AggregateID(), }} case ShipShipment: e = &ShipmentShipped{EventSkeleton{ At: time.Now(), ID: cmd.AggregateID(), }} default: return fmt.Errorf("unexpected command %T", cmd) } return s.Apply(s, e, true) } func (s *Shipment) On(e Event) error { switch v := e.(type) { case *ShipmentPacked: s.ID = v.AggregateID() s.Status = Packed case *ShipmentPickedUp: s.ID = v.AggregateID() s.Status = PickedUp case *ShipmentShipped: s.ID = v.AggregateID() s.Status = Shipped default: return fmt.Errorf("unexpected event %T", e) } return nil } type PackShipment struct{ Command } type PickupShipment struct{ Command } type ShipShipment struct{ Command } func (PackShipment) New() bool { return true } func main() { marshaler := new(JsonEventMarshaler) marshaler.Bind(ShipmentPacked{}, ShipmentPickedUp{}, ShipmentShipped{}) var ( ctx = context.Background() shipment = &Shipment{} repo = NewRepository(shipment, WithMarshaler(marshaler)) bus = NewCommandBus(repo) ) bus.Send(ctx, PackShipment{Command{ID: "abc123"}}) aggregate, _ := repo.GetByID(ctx, "abc123") shipment, _ = aggregate.(*Shipment) fmt.Println(shipment.Status) fmt.Println(shipment.GetVersion()) // Output: packed // Output: 0 bus.Send(ctx, PickupShipment{Command{ID: shipment.AggregateRootID()}}) aggregate, _ = repo.GetByID(ctx, shipment.AggregateRootID()) shipment, _ = aggregate.(*Shipment) fmt.Println(shipment.Status) fmt.Println(shipment.GetVersion()) // Output: picked-up
Output: shipped Output: 2
Index ¶
- Variables
- func NewCommandBus(repo AggregateRootRepository, middlewares ...command.Middleware) command.CommandSender
- type AggregateCommand
- type AggregateHandler
- type AggregateRepository
- func (r *AggregateRepository) GetByID(ctx context.Context, aggrID string) (AggregateRoot, error)
- func (r *AggregateRepository) LoadFromSnap(ctx context.Context, aggrID string) (SnapshottingBehaviour, error)
- func (r *AggregateRepository) New() interface{}
- func (r *AggregateRepository) Save(ctx context.Context, aggr AggregateRoot) error
- type AggregateRoot
- type AggregateRootBase
- func (aggr *AggregateRootBase) AggregateRootID() string
- func (aggr *AggregateRootBase) Apply(aggregate AggregateRoot, e Event, isNew bool) error
- func (aggr *AggregateRootBase) CommitEvents()
- func (aggr *AggregateRootBase) GetUncommitedEvents() []Event
- func (aggr *AggregateRootBase) GetVersion() int
- func (aggr *AggregateRootBase) LoadFromHistory(aggregate AggregateRoot, history []Event) error
- func (i *AggregateRootBase) StreamSize() int
- type AggregateRootRepository
- type AggregateRootSnapshotRepository
- type Command
- type Constructor
- type EpochMillis
- type Event
- type EventMarshaler
- type EventModel
- type EventSkeleton
- type EventStore
- type History
- type JsonEventMarshaler
- type JsonSnapshotMarshaler
- type Option
- type Snapshot
- type SnapshotMarshaler
- type SnapshotModel
- type SnapshotRepository
- type SnapshotSkeleton
- type SnapshotStore
- type SnapshottingBehaviour
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrSnapNotFound = errors.New("snapshot not found")
Functions ¶
func NewCommandBus ¶
func NewCommandBus(repo AggregateRootRepository, middlewares ...command.Middleware) command.CommandSender
Types ¶
type AggregateCommand ¶
type AggregateCommand interface {
AggregateID() string
}
AggregateCommand confirms that the command must have aggregate id which can be helpful to replay aggreate events from event store to build aggregate current state.
type AggregateHandler ¶
type AggregateRepository ¶
type AggregateRepository struct { Marshaler EventMarshaler // contains filtered or unexported fields }
func (*AggregateRepository) GetByID ¶
func (r *AggregateRepository) GetByID(ctx context.Context, aggrID string) (AggregateRoot, error)
func (*AggregateRepository) LoadFromSnap ¶
func (r *AggregateRepository) LoadFromSnap(ctx context.Context, aggrID string) (SnapshottingBehaviour, error)
func (*AggregateRepository) New ¶
func (r *AggregateRepository) New() interface{}
func (*AggregateRepository) Save ¶
func (r *AggregateRepository) Save(ctx context.Context, aggr AggregateRoot) error
type AggregateRoot ¶
type AggregateRootBase ¶
func (*AggregateRootBase) AggregateRootID ¶
func (aggr *AggregateRootBase) AggregateRootID() string
func (*AggregateRootBase) Apply ¶
func (aggr *AggregateRootBase) Apply(aggregate AggregateRoot, e Event, isNew bool) error
func (*AggregateRootBase) CommitEvents ¶
func (aggr *AggregateRootBase) CommitEvents()
func (*AggregateRootBase) GetUncommitedEvents ¶
func (aggr *AggregateRootBase) GetUncommitedEvents() []Event
func (*AggregateRootBase) GetVersion ¶
func (aggr *AggregateRootBase) GetVersion() int
func (*AggregateRootBase) LoadFromHistory ¶
func (aggr *AggregateRootBase) LoadFromHistory(aggregate AggregateRoot, history []Event) error
func (*AggregateRootBase) StreamSize ¶
func (i *AggregateRootBase) StreamSize() int
type AggregateRootRepository ¶
type AggregateRootRepository interface { New() interface{} Save(ctx context.Context, agr AggregateRoot) error GetByID(ctx context.Context, aggregateRootID string) (AggregateRoot, error) }
func NewRepository ¶
func NewRepository(aggr AggregateRoot, opts ...Option) AggregateRootRepository
type AggregateRootSnapshotRepository ¶
type AggregateRootSnapshotRepository interface { Save(ctx context.Context, snap Snapshot) error GetByID(ctx context.Context, aggregateRootID string, version int) (Snapshot, error) }
func NewSnapRepository ¶
func NewSnapRepository(store SnapshotStore, marshaler SnapshotMarshaler) AggregateRootSnapshotRepository
type Command ¶
type Command struct {
ID string
}
Command is an model for AggregateCommand used to avoid code boilerplate for commands implmenting AggregateCommand
func (Command) AggregateID ¶
type Constructor ¶
type Constructor interface {
New() bool
}
Constructor command implementing this interface can be used to construct new aggregate
type EpochMillis ¶
type EpochMillis int64
func Now ¶
func Now() EpochMillis
func Time ¶
func Time(t time.Time) EpochMillis
func (EpochMillis) Int64 ¶
func (e EpochMillis) Int64() int64
func (EpochMillis) String ¶
func (e EpochMillis) String() string
func (EpochMillis) Time ¶
func (e EpochMillis) Time() time.Time
type EventMarshaler ¶
type EventMarshaler interface { Bind(e ...Event) error Marshal(e Event) (EventModel, error) Unmarshal(e EventModel) (Event, error) }
type EventModel ¶
type EventModel struct { // Version is the event version the Data represents Version int // At indicates when the event happened; provided as a utility for the store At EpochMillis // Data contains the Serializer encoded version of the data Data []byte }
EventModel provides the shape of the records to be saved to the db
type EventSkeleton ¶
type EventSkeleton struct { // ID contains the AggregateID ID string // Version holds the event version Version int // At contains the event time At time.Time }
EventSkeleton provides a default implementation of an Event that is suitable for being embedded
func (EventSkeleton) AggregateID ¶
func (m EventSkeleton) AggregateID() string
AggregateID implements part of the Event interface
func (EventSkeleton) EventAt ¶
func (m EventSkeleton) EventAt() time.Time
EventAt implements part of the Event interface
func (EventSkeleton) EventVersion ¶
func (m EventSkeleton) EventVersion() int
EventVersion implements part of the Event interface
type EventStore ¶
type EventStore interface { SaveEvents(ctx context.Context, aggrID string, models History, version int) error GetEventsForAggregate(ctx context.Context, aggrID string, version int) (History, error) }
func NewInmemEventStore ¶
func NewInmemEventStore() EventStore
type History ¶
type History []EventModel
type JsonEventMarshaler ¶
type JsonEventMarshaler struct {
// contains filtered or unexported fields
}
func (*JsonEventMarshaler) Bind ¶
func (m *JsonEventMarshaler) Bind(events ...Event) error
func (*JsonEventMarshaler) Marshal ¶
func (m *JsonEventMarshaler) Marshal(e Event) (EventModel, error)
func (*JsonEventMarshaler) Unmarshal ¶
func (m *JsonEventMarshaler) Unmarshal(model EventModel) (Event, error)
type JsonSnapshotMarshaler ¶
type JsonSnapshotMarshaler struct {
// contains filtered or unexported fields
}
func (*JsonSnapshotMarshaler) Bind ¶
func (m *JsonSnapshotMarshaler) Bind(states ...interface{}) error
func (*JsonSnapshotMarshaler) Marshal ¶
func (m *JsonSnapshotMarshaler) Marshal(s Snapshot) (SnapshotModel, error)
func (*JsonSnapshotMarshaler) Unmarshal ¶
func (m *JsonSnapshotMarshaler) Unmarshal(model SnapshotModel) (Snapshot, error)
type Option ¶
type Option func(r *AggregateRepository)
func WithDefaultSnapRepository ¶
func WithDefaultSnapRepository(states ...interface{}) Option
func WithEventStore ¶
func WithEventStore(s EventStore) Option
func WithMarshaler ¶
func WithMarshaler(m EventMarshaler) Option
func WithSnapRepository ¶
func WithSnapRepository(s SnapshotStore, m SnapshotMarshaler) Option
type SnapshotMarshaler ¶
type SnapshotMarshaler interface { Bind(v ...interface{}) error Marshal(s Snapshot) (SnapshotModel, error) Unmarshal(m SnapshotModel) (Snapshot, error) }
type SnapshotModel ¶
type SnapshotRepository ¶
type SnapshotRepository struct {
// contains filtered or unexported fields
}
type SnapshotSkeleton ¶
func (SnapshotSkeleton) AggregateRootID ¶
func (s SnapshotSkeleton) AggregateRootID() string
func (SnapshotSkeleton) CurrentVersion ¶
func (s SnapshotSkeleton) CurrentVersion() int
func (SnapshotSkeleton) GetState ¶
func (s SnapshotSkeleton) GetState() interface{}
type SnapshotStore ¶
type SnapshotStore interface { SaveSnapshot(ctx context.Context, agrID string, model SnapshotModel, version int) error GetSnapshotForAggregate(ctx context.Context, agrID string, version int) (SnapshotModel, error) }
func NewInmemSnapStore ¶
func NewInmemSnapStore() SnapshotStore
type SnapshottingBehaviour ¶
type SnapshottingBehaviour interface { AggregateRoot SnapshotInterval() int GetState() interface{} ApplyState(s Snapshot) SnapshottingEnable() bool }