evt

package
v0.0.0-...-b226945 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 10, 2024 License: BSD-2-Clause Imports: 15 Imported by: 1

README

evt

Package evt provides servers and tools for event sourcing. Event sourcing in this context means a data model that is based on a sequence of events that can recreate the data model at any point in time.

An Event consists of a string topic, a key, command a revision time and optionally an argument. Daql uses mostly dumb events where the topic is a model name, the key a primary id and command is a generic new, mod or del command. It can be used for more specific events, those however must resolve to a sequence of generic events, to allow a clean interface for backends.

Ledger represents a sequence of events ordered by revision. Publisher is a ledger that publishes transactions and assigns new revisions to events. Replicator is a replicated ledger and the LocalPublisher is a Replicator that can publish some events locally.

The event and ledger revision is a timestamp with millisecond granularity. It is usually the arrival time of the event but must be greater than the last revision in the persisted ledger.

Every transaction generates an audit log entry that has extra information. Backup and restore require both audit and event logs, as well as other data not covered by the event sourcing.

Server provides hub services to subscribe and publish to a ledger. Servers usually use a ledger implementation that updates the latest model state to support queries without event aggregation for most operations. We might at some point introduce stateless topics, that have their only persistent representation in the ledger.

Satellite connects to a server hub, replicates events, and manages local subscriptions. Satellites can publish authoritative events locally to support offline use to some extent.

Documentation

Overview

Package evt provides servers and tools for event sourcing.

Index

Constants

View Source
const (
	CmdNew = "new"
	CmdMod = "mod"
	CmdDel = "del"
)

Variables

This section is empty.

Functions

func CollectAll

func CollectAll(evs []*Event) map[Sig][]*Event

func MergeDeltas

func MergeDeltas(a, b lit.Delta) error

func NextRev

func NextRev(last, rev time.Time) time.Time

NextRev returns a rev truncated to ms or if rev is not after last the next possible revision one millisecond after last.

func RawSchema

func RawSchema() string

Types

type Action

type Action struct {
	Sig
	Cmd string    `json:"cmd"`
	Arg *lit.Dict `json:"arg,omitempty"`
}

Action is an unpublished event represented by a command string and argument map. It usually is a data operation on a record identified by a topic and primary key.

func Merge

func Merge(a, b Action) (_ Action, err error)

type Audit

type Audit struct {
	Rev     time.Time `json:"rev"`
	Created time.Time `json:"created,omitempty"`
	Arrived time.Time `json:"arrived,omitempty"`
	Usr     string    `json:"usr,omitempty"`
	Extra   *lit.Dict `json:"extra,omitempty"`
}

Audit holds detailed information for a published revision.

type ByID

type ByID []*Event

func (ByID) Len

func (s ByID) Len() int

func (ByID) Less

func (s ByID) Less(i, j int) bool

func (ByID) Swap

func (s ByID) Swap(i, j int)

type ByRev

type ByRev []*Event

func (ByRev) Len

func (s ByRev) Len() int

func (ByRev) Less

func (s ByRev) Less(i, j int) bool

func (ByRev) Swap

func (s ByRev) Swap(i, j int)

type Ctrl

type Ctrl struct {
	Ledger
	Input chan *hub.Msg
	Subs  *Subscribers
	log.Logger
	// contains filtered or unexported fields
}

Ctrl manages subscription updates common to both the event server and satellite.

func NewCtrl

func NewCtrl(l Ledger) *Ctrl

NewCtrl returns a new controller for ledger l.

func (*Ctrl) Bcast

func (ctr *Ctrl) Bcast(from hub.Conn, rev time.Time)

Bcast sends all buffered events up to revision rev out to subscribers.

func (*Ctrl) Btrig

func (ctr *Ctrl) Btrig()

Btrig throttles a trigger to send a _bcast messages at least 200ms apart.

func (*Ctrl) Handle

func (ctr *Ctrl) Handle(m *hub.Msg)

func (*Ctrl) Services

func (ctr *Ctrl) Services() hub.Services

func (*Ctrl) Stop

func (ctr *Ctrl) Stop()

type Event

type Event struct {
	ID  int64     `json:"id"`
	Rev time.Time `json:"rev"`
	Action
}

Event is an action published to a ledger with revision and unique id.

func Collect

func Collect(evs []*Event, s Sig) (res []*Event)

type Ledger

type Ledger interface {
	// Rev returns the latest event revision or the zero time.
	Rev() time.Time
	Project() *dom.Project
	// Events returns all events for the given topics since rev.
	// This methods is primarily used by the event central to manage subscribed events.
	// The qry package can be used for more complex event queries.
	Events(ctx context.Context, rev time.Time, tops ...string) ([]*Event, error)
}

Ledger abstracts over the event storage. It allows to access the latest revision and query events. Ledger implementations are usually not thread-safe unless explicitly documented.

type LocalPublisher

type LocalPublisher interface {
	Replicator
	LocalRev() time.Time
	PublishLocal(Trans) (time.Time, []*Event, error)
	Locals() []Trans
}

LocalPublisher is a replicator that can publish events locally.

type MemLedger

type MemLedger struct {
	Reg  lit.Regs
	Bend *qry.MemBackend
	// contains filtered or unexported fields
}

MemLedger implements an in-memory ledger.

func NewMemLedger

func NewMemLedger(reg *lit.Regs, b *qry.MemBackend) (*MemLedger, error)

NewMemLedger returns a new ledger for testing, that is backed by the memory query backend b. This ledger provides no persistence and expands only minimal effort to roll back after a failed event publish. It should only be used for testing or if these constraints are well understood. The ledger assumes sole and full control over b and converts event data to an event slice and swaps previous event values with proxies.

func (*MemLedger) Events

func (l *MemLedger) Events(rev time.Time, tops ...string) (res []*Event, _ error)

func (*MemLedger) Project

func (l *MemLedger) Project() *dom.Project

func (*MemLedger) Publish

func (l *MemLedger) Publish(t Trans) (time.Time, []*Event, error)

Publish publishes transaction t the ledger it attempts to roll back failed transactions. Failed reverts may panic. Use only for testing.

func (*MemLedger) Rev

func (l *MemLedger) Rev() time.Time

type ModelAuthority

type ModelAuthority = func(evs []Action) bool

type MonFunc

type MonFunc func(*hub.Msg, MonReq) (int64, error)

func (MonFunc) Serve

func (f MonFunc) Serve(m *hub.Msg) (*hub.Msg, error)

type MonReq

type MonReq struct {
	Rev   time.Time `json:"rev"`
	Watch []Watch   `json:"watch"`
}

type MonRes

type MonRes struct {
	Res int64  `json:"res,omitempty"`
	Err string `json:"err,omitempty"`
}

type Monitor

type Monitor struct {
	Sub   *Subscriber
	ID    int64
	Watch []Watch
	Bufr  []Sig
}

type Note

type Note struct {
	Mon   int64   `json:"mon"`
	Watch []Watch `json:"watch"`
}

type PubFunc

type PubFunc func(*hub.Msg, PubReq) (*Update, error)

func (PubFunc) Serve

func (f PubFunc) Serve(m *hub.Msg) (*hub.Msg, error)

type PubReq

type PubReq struct {
	Trans
}

type PubRes

type PubRes struct {
	Res *Update `json:"res,omitempty"`
	Err string  `json:"err,omitempty"`
}

type Publisher

type Publisher interface {
	Ledger
	Publish(Trans) (time.Time, []*Event, error)
}

Publisher is a ledger that can publish transactions.

type Replicator

type Replicator interface {
	Ledger
	Replicate(rev time.Time, evs []*Event) error
}

Replicator is a ledger that can replicate events.

type SatFunc

type SatFunc func(*hub.Msg, SatReq) (*Update, error)

func (SatFunc) Serve

func (f SatFunc) Serve(m *hub.Msg) (*hub.Msg, error)

type SatReq

type SatReq struct {
	Rev   time.Time `json:"rev"`
	Trans []Trans   `json:"trans"`
	Tops  []string  `json:"tops"`
}

type SatRes

type SatRes struct {
	Res *Update `json:"res,omitempty"`
	Err string  `json:"err,omitempty"`
}

type Satellite

type Satellite struct {
	LocalPublisher
	*Ctrl
	// contains filtered or unexported fields
}

Satellite connects to a server hub, replicates events, and manages local subscriptions. Satellites can publish authoritative events locally to support offline use to some extent.

func New

func New(rep LocalPublisher, cli hub.Conn, auth ModelAuthority) *Satellite

func (*Satellite) CliRouter

func (sat *Satellite) CliRouter() hub.Router

func (*Satellite) Router

func (sat *Satellite) Router() hub.Router

func (*Satellite) Run

func (sat *Satellite) Run()

func (*Satellite) Services

func (sat *Satellite) Services() hub.Services

type Server

type Server struct {
	Publisher
	*Ctrl
}

Server provides hub services to subscribe and publish to a ledger.

func NewServer

func NewServer(pubr Publisher) *Server

func (*Server) Router

func (srv *Server) Router() hub.Router

func (*Server) Run

func (srv *Server) Run()

func (*Server) Services

func (srv *Server) Services() hub.Services

type Sig

type Sig struct {
	Top string `json:"top"`
	Key string `json:"key"`
}

Sig is the event signature.

type StatFunc

type StatFunc func(*hub.Msg) (Status, error)

func (StatFunc) Serve

func (f StatFunc) Serve(m *hub.Msg) (*hub.Msg, error)

type StatRes

type StatRes struct {
	Res Status `json:"res,omitempty"`
	Err string `json:"err,omitempty"`
}

type Status

type Status struct {
	Rev time.Time `json:"rev"`
	Mig string    `json:"mig"`
	On  time.Time `json:"on,omitempty"`
	Off time.Time `json:"off,omitempty"`
}

Status holds the current ledger revision migration information.

type SubFunc

type SubFunc func(*hub.Msg, SubReq) (*Update, error)

func (SubFunc) Serve

func (f SubFunc) Serve(m *hub.Msg) (*hub.Msg, error)

type SubReq

type SubReq struct {
	Rev  time.Time `json:"rev"`
	Tops []string  `json:"tops"`
}

type SubRes

type SubRes struct {
	Res *Update `json:"res,omitempty"`
	Err string  `json:"err,omitempty"`
}

type Subscriber

type Subscriber struct {
	hub.Conn
	Rev  time.Time
	Subs []string
	Mons []*Monitor
	Bufr []*Event
	Note bool
	// contains filtered or unexported fields
}

func (*Subscriber) Update

func (s *Subscriber) Update(rev time.Time) *Update

type Subscribers

type Subscribers struct {
	// contains filtered or unexported fields
}

func NewSubscribers

func NewSubscribers() *Subscribers

func (*Subscribers) Bcast

func (subs *Subscribers) Bcast(from hub.Conn, rev time.Time)

Bcast sends all buffered events up to revision rev out to subscribers.

func (*Subscribers) BcastMsg

func (subs *Subscribers) BcastMsg(msg *hub.Msg)

func (*Subscribers) Get

func (subs *Subscribers) Get(id int64) *Subscriber

func (*Subscribers) Mon

func (subs *Subscribers) Mon(c hub.Conn, rev time.Time, ws []Watch) int64

func (*Subscribers) Show

func (subs *Subscribers) Show(from hub.Conn, evs []*Event) (sub *Subscriber, trig bool)

Show updates all matching subscribers with evs except for the sender itself and returns a sender subscription and an indicator whether a broadcast should be triggered. The returned sub is always usable to send a result update even if the sender was unknown.

func (*Subscribers) Sub

func (subs *Subscribers) Sub(c hub.Conn, rev time.Time, tops []string) (*Subscriber, []string)

func (*Subscribers) Unmon

func (subs *Subscribers) Unmon(c hub.Conn, mon int64) bool

func (*Subscribers) Unsub

func (subs *Subscribers) Unsub(c hub.Conn, tops []string) *Subscriber

type Trans

type Trans struct {
	ID   int64     `json:"id"`
	Base time.Time `json:"base"`
	Audit
	Acts []Action `json:"acts"`
}

Trans is an request to publish a list of actions for a base revision.

type UnmonFunc

type UnmonFunc func(*hub.Msg, UnmonReq) (bool, error)

func (UnmonFunc) Serve

func (f UnmonFunc) Serve(m *hub.Msg) (*hub.Msg, error)

type UnmonReq

type UnmonReq struct {
	Mon int64 `json:"mon"`
}

type UnmonRes

type UnmonRes struct {
	Res bool   `json:"res,omitempty"`
	Err string `json:"err,omitempty"`
}

type UnsubFunc

type UnsubFunc func(*hub.Msg, UnsubReq) (bool, error)

func (UnsubFunc) Serve

func (f UnsubFunc) Serve(m *hub.Msg) (*hub.Msg, error)

type UnsubReq

type UnsubReq struct {
	Tops []string `json:"tops"`
}

type UnsubRes

type UnsubRes struct {
	Res bool   `json:"res,omitempty"`
	Err string `json:"err,omitempty"`
}

type Update

type Update struct {
	Rev  time.Time `json:"rev"`
	Evs  []*Event  `json:"evs,omitempty"`
	Note []Note    `json:"note,omitempty"`
}

Update holds a list of events and notes

type Watch

type Watch struct {
	Top  string   `json:"top"`
	Keys []string `json:"keys"`
}

Watch is topic name and list of keys to monitor.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL