evt

package
v0.0.0-...-86a089b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 15, 2019 License: BSD-2-Clause Imports: 8 Imported by: 0

Documentation

Overview

Package evt defines an interface for plain event sourcing and some generic event processors.

Event sourcing as a concept can be interpreted in various ways. Daql uses mostly 'dump' events, that have generic create, update or delete commands. It can be used for more specific events, those however must resolve to a sequence of generic events. Events have one central and authoritative ledger, that assigns a revision, and with that order to all events.

Each event has a topic, key and command string and optional a argument map. Usually the topic refers to record model name and the key to its primary key as string. The string key allows models with uuid, integer and other character typed keys to share a ledger.

Custom commands have more meaningful names, validation and implementations. They must resolve to one or more generic events to allow a simple and consistent interface for backends.

A ledger is a strictly ordered sequence of events and can be used to recreate a state at a revision. Users publish one or more events as a transaction. The events are resolved, validated and then assigned a revision and audit id and then written to the ledger. A revision is a timestamp with millisecond granularity. It is usually the arrival time of the event but cannot be before the latest applied revision in the persisted ledger. Every transaction generates an audit log entry that has additional information about the user, creation and arrival time and a map of extra information.

To backup and restore both audit and event log are required, as well as other date not covered by the event system.

Servers usually update the latest state of the ledger in the same transaction that applies the event to the ledger. This allows us to avoid event aggregates and materialized view consistency for most operations. We might at some point introduce stateless topics, that have their only persistent representation in the ledger.

Satellite should be able to persist transactions failed due to recoverable errors like network outage for later reconciliation and may serve their clients the projected state of the ledger where appropriate.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CollectAll

func CollectAll(evs []*Event) map[Sig][]*Event

func NextRev

func NextRev(last, rev time.Time) time.Time

NextRev returns a rev truncated to ms or if rev is not after last the next possible revision one millisecond after the last.

Types

type Action

type Action struct {
	Sig
	Cmd string    `json:"cmd"`
	Arg *lit.Dict `json:"arg,omitempty"`
}

Action is an unpublished event represented by a command string and argument map. It usually is a data operation on a record identified by a topic and primary key.

func Merge

func Merge(a, b Action) (_ Action, err error)

type Audit

type Audit struct {
	Rev time.Time `json:"rev"`
	Detail
}

Audit holds detailed information for a published revision.

type ByID

type ByID []*Event

func (ByID) Len

func (s ByID) Len() int

func (ByID) Less

func (s ByID) Less(i, j int) bool

func (ByID) Swap

func (s ByID) Swap(i, j int)

type ByRev

type ByRev []*Event

func (ByRev) Len

func (s ByRev) Len() int

func (ByRev) Less

func (s ByRev) Less(i, j int) bool

func (ByRev) Swap

func (s ByRev) Swap(i, j int)

type Detail

type Detail struct {
	Created time.Time `json:"created,omitempty"`
	Arrived time.Time `json:"arrived,omitempty"`
	Acct    [16]byte  `json:"acct,omitempty"`
	Extra   *lit.Dict `json:"extra,omitempty"`
}

Detail holds extra information for audits and translations.

type Event

type Event struct {
	ID  int64     `json:"id"`
	Rev time.Time `json:"rev"`
	Action
}

Event is an action published to a ledger with revision and unique id.

func Collect

func Collect(evs []*Event, s Sig) (res []*Event)

type HistFunc

type HistFunc func(*hub.Msg, HistReq) (*Update, error)

func (HistFunc) Serve

func (f HistFunc) Serve(m *hub.Msg) interface{}

type HistReq

type HistReq struct {
	Sig
}

type HistRes

type HistRes struct {
	Res *Update `json:"res,omitempty"`
	Err string  `json:"err,omitempty"`
}

type Ledger

type Ledger interface {
	// Rev returns the latest event revision or the zero time.
	Rev() time.Time
	Project() *dom.Project
	// Events returns the ledger events filtered by the given expression and parameters.
	Events(whr exp.Dyn, param lit.Lit) ([]*Event, error)
}

Ledger abstracts over the event storage. It allows to access the latest revision and query events. Ledger implemetations are usually not thread-safe unless explicitly documented.

type MetaFunc

type MetaFunc func(*hub.Msg, MetaReq) (*Audit, error)

func (MetaFunc) Serve

func (f MetaFunc) Serve(m *hub.Msg) interface{}

type MetaReq

type MetaReq struct {
	Rev time.Time `json:"rev"`
}

type MetaRes

type MetaRes struct {
	Res *Audit `json:"res,omitempty"`
	Err string `json:"err,omitempty"`
}

type PubFunc

type PubFunc func(*hub.Msg, PubReq) (*Update, error)

func (PubFunc) Serve

func (f PubFunc) Serve(m *hub.Msg) interface{}

type PubReq

type PubReq struct {
	Trans
}

type PubRes

type PubRes struct {
	Res *Update `json:"res,omitempty"`
	Err string  `json:"err,omitempty"`
}

type Publisher

type Publisher interface {
	Ledger
	Publish(Trans) ([]*Event, error)
}

Publisher is a ledger that can publish transactions.

type Replicator

type Replicator interface {
	Ledger
	Replicate([]*Event) error
}

Replicator is a ledger that can replicate events.

type Sig

type Sig struct {
	Top string `json:"top"`
	Key string `json:"key"`
}

Sig is the event signature.

type SubFunc

type SubFunc func(*hub.Msg, SubReq) (*Update, error)

func (SubFunc) Serve

func (f SubFunc) Serve(m *hub.Msg) interface{}

type SubReq

type SubReq struct {
	List []Watch `json:"list"`
}

type SubRes

type SubRes struct {
	Res *Update `json:"res,omitempty"`
	Err string  `json:"err,omitempty"`
}

type Subscriber

type Subscriber struct {
	hub.Conn
	Rev   time.Time
	Watch map[string]*Watch
	Bufr  []*Event
}

func (*Subscriber) Accept

func (s *Subscriber) Accept(ev *Event) bool

func (*Subscriber) Update

func (s *Subscriber) Update(from hub.Conn, rev time.Time)

type Subscribers

type Subscribers struct {
	// contains filtered or unexported fields
}

func NewSubscribers

func NewSubscribers() *Subscribers

func (*Subscribers) Bcast

func (subs *Subscribers) Bcast(from hub.Conn, rev time.Time)

Bcast sends all buffered events up to revision rev out to subscribers.

func (*Subscribers) Btrig

func (subs *Subscribers) Btrig(from hub.Conn)

Btrig trigger fires a delayed, de-duped broadcast request with header _bcast.

func (*Subscribers) Show

func (subs *Subscribers) Show(c hub.Conn, evs []*Event) (sender *Subscriber)

func (*Subscribers) Stop

func (subs *Subscribers) Stop()

func (*Subscribers) Sub

func (subs *Subscribers) Sub(c hub.Conn, ws []Watch) *Subscriber

func (*Subscribers) Unsub

func (subs *Subscribers) Unsub(c hub.Conn, ws []Watch)

type Trans

type Trans struct {
	Base time.Time `json:"base"`
	Acts []Action  `json:"acts"`
	Detail
}

Trans is an request to publish a list of actions for a base revision.

type UnsFunc

type UnsFunc func(*hub.Msg, UnsReq) (bool, error)

func (UnsFunc) Serve

func (f UnsFunc) Serve(m *hub.Msg) interface{}

type UnsReq

type UnsReq struct {
	List []Watch `json:"list"`
}

type UnsRes

type UnsRes struct {
	Res bool   `json:"res,omitempty"`
	Err string `json:"err,omitempty"`
}

type Update

type Update struct {
	Rev time.Time `json:"rev"`
	Evs []*Event  `json:"evs"`
}

type Watch

type Watch struct {
	Top string    `json:"top"`
	Rev time.Time `json:"rev,omitempty"`
	IDs []string  `json:"ids,omitempty"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL