rita

package module
v0.0.0-...-1c52b4b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 22, 2023 License: Apache-2.0 Imports: 11 Imported by: 3

README

Rita

Rita is a toolkit of various event-centric and reactive abstractions build on top of NATS.

NOTE: This package is under heavy development, so breaking changes will likely be introduced. Feedback is welcome on API design and scope. Please open an issue if you have something to share!

GoDoc ReportCard GitHub Actions

Install

Requires Go 1.18+

go get github.com/bruth/rita

Usage

See the blog post that introduces the initial design of Rita.

Type Registry

Rita comes with an opt-in, but recommended, type registry which provides [de]serialization transparency as well as providing consistency of usage. Fundamentally, we are defining names and mapping them to concrete types.

Given a couple of types in a domain model:

type OrderPlaced struct {}
type OrderShipped struct {}

We can create a registry, associating names. If the names are intended to be shared and portable across other languages, then some thought should be put into the naming structure. For example, the CloudEvents spec suggest using the reverse DNS name.

tr, err := rita.NewTypeRegistry(map[string]*rita.Type{
  "com.example.order-placed": {
    Init: func() any { return &OrderPlaced{} },
  },
  "com.example.order-shipped": {
    Init: func() any { return &OrderShipped{} },
  },
})

Each type currently supports a Init function to allocate a new value with any default fields set. The registry also accepts an optional rita.Codec() option for overriding the default codec for [de]serialization.

Once the set of types that will be worked with are reigstered, we can initialize a Rita instance by passing the NATS connection and the registry as an option.

r, err := rita.New(nc, rita.TypeRegistry(tr))

What is the behavior without a type registry? The expectation is that the type name is explicitly provided and the data is a []byte.

EventStore

Create an event store for orders. This will default to a subject of orders.>. Note, other than a few different defaults to follow the semantics of an event store, the stream is just an vanilla JetStream and can be managed as such.

// Get a handle to the event store with a name.
es := r.EventStore("orders")

// Create the store by providing a stream config. By default, the bound
// subject will be "orders.>". This operation is idempotent, so it can be
// safely during application startup time.
err := es.Create(&nats.StreamConfig{
  Replicas: 3,
})

Append an event to the orders.1 subject on the stream. ExpectSequence can be set to zero means no other events should be associated with this subject yet.

seq1, err := es.Append("orders.1", []*rita.Event{
    {Data: &OrderPlaced{}},
}, rita.ExpectSequence(0))

Append an another event, using the previously returned sequence.

seq2, err := es.Append("orders.1", []*rita.Event{
    {Data: &OrderShipped{}},
rita.ExpectSequence(seq1))

Load the events for the subject. This returns a slice of *Event values where the Data value for each event has been pre-allocated based on the Type using the registry.

events, lastSeq, err := es.Load("orders.1")

The lastSeq value indicates the sequence of the last event appended for this subject. If a new event needs to be appended, this should be used with ExpectSequence.

State

Although event sourcing involves modeling and persisting the state transitions as events, we still need to derive state in order to make decisions when commands are received.

Fundamentally, we have a model of state, and then we need to evolve the state given each event. We can model this as an interface in Go.

type Evolver interface {
  Evolve(event *Event) error
}

An implementation would then look like:

type Order struct {
  // fields..
}

func (o *Order) Evolve(event *Event) error {
  // Switch on the event type or data (if using the type registry).
  switch e := event.Data.(type) {
  }
}

Given this model, we can use the Evolve method on EventStore for convenience.

var order Order
lastSeq, err = es.Evolve("orders.1", &order)

This also works for a cross-cutting state (all orders) using a subject wildcard.

var orderList OrderList
lastSeq, err = es.Evolve("orders.*", &orderlist)
Long-lived state

It may be desirable to keep some state in memory for a period of time and then only request new events that have occurred since the state was created previously.

This can be achieved by simply passing the same state value (already evolved) along with rita.AfterSequence(lastSeq) option.

lastSeq, err = es.Evolve("orders.1", &order, rita.AfterSequence(lastSeq))

This will only fetch the events after the last event that was received previously and evolve the state up to the latest known event.

Planned Features

Although features are checked off, they are all in a pre-1.0 state and subject to change.

  • type registry
    • transparent mapping from string to type
    • support for labeling types, event, state, command, etc.
  • event store
    • layer on JetStream for event store
    • encoder to/decoder from nats message
    • simple api with event store semantics
    • each store maps to one stream
  • event-sourced state
    • model for event-sourced state representations
    • interface for user-implemented type
    • maps to a subject
    • snapshot or state up to some sequence for on-demand updates
  • command deciders
    • model for handling commands and emitting events
    • provide consistency boundary for state transitions
    • state is event-sourced, single subject or wildcard (with concurrency detection)
  • state store
    • leverage JetStream KV or Object Store for state persistence
  • timers and tickers
    • set a schedule or a time that will publish a message on a subject
    • use stream with max messages per subject + interest policy
    • each timer or ticker maps to a subject
    • use expected sequence on subject to prevent duplicate instance firing

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrSequenceConflict  = errors.New("rita: sequence conflict")
	ErrEventIDRequired   = errors.New("rita: event id required")
	ErrEventTypeRequired = errors.New("rita: event type required")
)

Functions

This section is empty.

Types

type AppendOption

type AppendOption interface {
	// contains filtered or unexported methods
}

AppendOption is an option for the event store Append operation.

func ExpectSequence

func ExpectSequence(seq uint64) AppendOption

ExpectSequence indicates that the expected sequence of the subject sequence should be the value provided. If not, a conflict is indicated.

type Command

type Command struct {
	ID   string
	Time time.Time
	Type string
	Data any
}

type Decider

type Decider interface {
	Decide(command *Command) ([]*Event, error)
}

type Event

type Event struct {
	// ID of the event. This will be used as the NATS msg ID
	// for de-duplication.
	ID string

	// Time is the time of when the event occurred which may be different
	// from the time the event is appended to the store. If no time is provided,
	// the current local time will be used.
	Time time.Time

	// Type is a unique name for the event itself. This can be ommitted
	// if a type registry is being used, otherwise it must be set explicitly
	// to identity the encoded data.
	Type string

	// Data is the event data. This must be a byte slice (pre-encoded) or a value
	// of a type registered in the type registry.
	Data any

	// Metadata is application-defined metadata about the event.
	Meta map[string]string

	// Subject is the the subject the event is associated with. Read-only.
	Subject string

	// Sequence is the sequence where this event exists in the stream. Read-only.
	Sequence uint64
}

Event is a wrapper for application-defined events.

type EventStore

type EventStore struct {
	// contains filtered or unexported fields
}

EventStore provides event store semantics over a NATS stream.

func (*EventStore) Append

func (s *EventStore) Append(ctx context.Context, subject string, events []*Event, opts ...AppendOption) (uint64, error)

Append appends a one or more events to the subject's event sequence. It returns the resulting sequence number of the last appended event.

func (*EventStore) Create

func (s *EventStore) Create(config *nats.StreamConfig) error

Create creates the event store given the configuration. The stream name is the name of the store and the subjects default to "{name}}.>".

func (*EventStore) Delete

func (s *EventStore) Delete() error

Delete deletes the event store.

func (*EventStore) Evolve

func (s *EventStore) Evolve(ctx context.Context, subject string, model Evolver, opts ...LoadOption) (uint64, error)

Evolve loads events and evolves a model of state. The sequence of the last event that evolved the state is returned, including when an error occurs.

func (*EventStore) Load

func (s *EventStore) Load(ctx context.Context, subject string, opts ...LoadOption) ([]*Event, uint64, error)

Load fetches all events for a specific subject. The primary use case is to use a concrete subject, e.g. "orders.1" corresponding to an aggregate/entity identifier. The second use case is to load events for a cross-cutting view which can use subject wildcards.

func (*EventStore) Update

func (s *EventStore) Update(config *nats.StreamConfig) error

Update updates the event store configuration.

type Evolver

type Evolver interface {
	Evolve(event *Event) error
}

type LoadOption

type LoadOption interface {
	// contains filtered or unexported methods
}

LoadOption is an option for the event store Load operation.

func AfterSequence

func AfterSequence(seq uint64) LoadOption

AfterSequence specifies the sequence of the first event that should be fetched from the sequence up to the end of the sequence. This useful when partially applied state has been derived up to a specific sequence and only the latest events need to be fetched.

type Rita

type Rita struct {
	// contains filtered or unexported fields
}

func New

func New(nc *nats.Conn, opts ...RitaOption) (*Rita, error)

New initializes a new Rita instance with a NATS connection.

func (*Rita) EventStore

func (r *Rita) EventStore(name string) *EventStore

func (*Rita) UnpackEvent

func (r *Rita) UnpackEvent(msg *nats.Msg) (*Event, error)

UnpackEvent unpacks an Event from a NATS message.

type RitaOption

type RitaOption interface {
	// contains filtered or unexported methods
}

RegistryOption models a option when creating a type registry.

func Clock

func Clock(clock clock.Clock) RitaOption

Clock sets a clock implementation. Default it clock.Time.

func ID

func ID(id id.ID) RitaOption

ID sets a unique ID generator implementation. Default is id.NUID.

func TypeRegistry

func TypeRegistry(types *types.Registry) RitaOption

TypeRegistry sets an explicit type registry.

Directories

Path Synopsis
internal
pb

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL