eventstore

package module
v2.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 1, 2023 License: Apache-2.0 Imports: 8 Imported by: 0

README

Eventstore

eventstore is a library where I try out new stuff related to an eventstore as a single point of truth.

At the moment I'm writing this it's not meant to evolve to a service. If you want to use it as a framework in your app I'm happy to share my thoughts with you.

My initial intention of this repo is to showcase ideas i develop durring days, months or even years working with eventsourcing and CQRS on the groundbreaking IAM called ZITADEL.

The current development relatest to the subject based messaging pattern of NATS. I enjoy to read what these people are doing.

Why?

I love eventsourcing. It helps me a lot durring engineering processes because it is crystal clear that everything I decide now won't change. I can change the future but i can't change the past. Happily, thanks to GDPR the internet has to forget my data. With this fact in mind as an application developer it must be possible to manipulate or forget the past without loss of an activity stream.

It makes development hard because you have to think of what you do before you start doing it, a definition of an event can evolve durring time but you're not able to enrich information to an event after it happened.

Ideas

Some ideas which probably will be implemented. The list is unordered and some points might never be implemented.

  • allow multiple filters in eventstore.Filter
    • storage: provide an Optimize method to simpify queries
    • maybe two layers of optimizations would be more useful. First in eventstore to collect filters and one in storage optimized on it's internal data structures.
  • additional storage types
    • sql (crdb) storage
    • file storage
  • memory: optimize tree
    • self balanced
    • check out different tree styles
  • testing suite
  • fuzzy testing with go1.18
  • Think of an option to register event types to return the concrete type instead of the Event-struct (Event would change to interface)
  • Subscriber: add the possibility to listen to message queues
    • Pub/Sub (NATS, ...)
    • (Web-)hook
  • Publisher: add the possibility to push events
    • Third party tools (NATS, ...)
    • Specifications (MQTT, ...)
    • No dependencies
      • Webhooks
      • GRPC streams

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	SingleToken = Subject(singleToken{})
	MultiToken  = Subject(multiToken{})
)
View Source
var (
	ErrSequenceNotMatched = errors.New("sequence of aggregate did not match")
)

Functions

func FilterBenchTests

func FilterBenchTests(ctx context.Context, b *testing.B, store TestEventstore)

func FilterComplianceTests

func FilterComplianceTests(ctx context.Context, t *testing.T, store TestEventstore)

func PushComplianceTests

func PushComplianceTests(ctx context.Context, t *testing.T, store TestEventstore)

func PushParallelOnDifferentAggregates

func PushParallelOnDifferentAggregates(ctx context.Context, b *testing.B, store TestEventstore)

func PushParallelOnSameAggregate

func PushParallelOnSameAggregate(ctx context.Context, b *testing.B, store TestEventstore)

Types

type Action

type Action interface {
	// Action represent the change of an object
	//
	// most likely the [Aggregate()] list will be the first elements of the
	// [Action]
	// e.g. add user A was added: {"users", "A", "added"}
	Action() TextSubjects
	// Revision is an upcounting number which represents the version of the schema of the payload
	// the revision must change as soon as the logic to create the payload or schema of the payload changes
	Revision() uint16
}

Action describes the base data of Command's and Event's

type Aggregate

type Aggregate interface {
	// ID is the unique identifier of the stream
	ID() TextSubjects
	// Commands is the list of write intents
	Commands() []Command
	// CurrentSequence returns the current sequence of the aggregate
	// If the aggregate doesn't care about the current sequence it returns nil
	// If it's the first command return 0
	// If it's the nth command return the specific sequence
	CurrentSequence() *uint32
}

Aggregate represents the stream the events are written to If the aggregate implements [AggregatePredefinedSequence], the current sequence of the aggregate is verified from the storage

type Command

type Command interface {
	Action
	// Payload returns the payload of the event. It represent the changed fields by the event
	// valid types are:
	// - nil (no payload),
	// - struct which can be marshalled
	// - pointer to struct which can be marshalled
	Payload() any

	SetSequence(sequence uint32)
	SetCreationDate(creationDate time.Time)
}

Command represents a change to be made

type CreatedAtFilter

type CreatedAtFilter struct {
	From time.Time
	To   time.Time
}

type Event

type Event interface {
	Action
	// Aggregate represents the object the command belongs to
	// and is used to generate the `Sequence` of the [Event]
	// e.g. user A: {"users", "A"}
	Aggregate() TextSubjects
	// Sequence represents the position of the event inside a specific subject
	Sequence() uint32
	// CreationDate is the timestamp the event was stored to the eventstore
	CreationDate() time.Time
	// UnmarshalPayload maps the stored payload into the given object
	// object must be of type *struct
	UnmarshalPayload(object any) error
}

Event is the abstraction if a user wants to get events mapped by the eventstore

type Eventstore

type Eventstore interface {
	// Ready checks if the storage is available
	Ready(ctx context.Context) error
	// Push stores the commands and sets the resulting metadata on the command
	// the commands should be stored in a single transaction
	// if the current sequence of an [AggregatePredefinedSequence] does not match
	// [ErrSequenceNotMatched] is returned
	Push(ctx context.Context, aggregates ...Aggregate) error
	// Filter applies the events matching the subjects on the reducer
	Filter(ctx context.Context, filter *Filter, reducer Reducer) error
}

Eventstore abstracts all functions needed to store events and filters the stored events

type Filter

type Filter struct {
	// Queries are queries on subjects
	Queries []*FilterQuery
	// Limit represents the maximum events returned
	Limit uint64
}

Filter represents a query

type FilterQuery

type FilterQuery struct {
	// Sequence limits the sequences for this query
	Sequence SequenceFilter
	// CreatedAt filters the time and event was created
	CreatedAt CreatedAtFilter
	// Action represents the event type
	Subjects []Subject
}

type Reducer

type Reducer interface {
	// Reduce maps events to a model
	Reduce(events ...Event) error
}

Reducer represents a model

type SequenceFilter

type SequenceFilter struct {
	From uint32
	To   uint32
}

type Subject

type Subject interface {
	// contains filtered or unexported methods
}

type TestEventstore

type TestEventstore interface {
	Eventstore
	Before(ctx context.Context, t testing.TB) error
	After(ctx context.Context, t testing.TB) error
}

type TextSubject

type TextSubject string

type TextSubjects

type TextSubjects []TextSubject

func (TextSubjects) Compare

func (ts TextSubjects) Compare(comp ...Subject) bool

func (TextSubjects) Join

func (ts TextSubjects) Join(sep string) string

Directories

Path Synopsis
x

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL