event

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 6, 2024 License: MIT Imports: 12 Imported by: 1

Documentation

Index

Constants

View Source
const (
	VersionPartZeroStr = "00000000000000000000"
	VersionFracZeroStr = "000"
)
View Source
const (
	VersionSuffixEOF    = "e"
	VersionSuffixNotEOF = "~"
)
View Source
const GlobalRegistryName = ""
View Source
const (
	StreamIDPartsDelimiter = "#"
)
View Source
const (
	StreamerReplayQueryDefaultLimit uint = 500
)

Variables

View Source
var (
	ContextNamespaceKey = ContextKey("namespace")
	ContextUserKey      = ContextKey("user")
)
View Source
var (
	ErrMarshalEventFailed   = errors.New("marshal event(s) failed")
	ErrMarshalEmptyEvent    = errors.New("event to marshal is empty")
	ErrUnmarshalEventFailed = errors.New("unmarshal event(s) failed")
)
View Source
var (
	ErrAppendEventsConflict    = errors.New("append events conflict")
	ErrAppendEventsFailed      = errors.New("append events failure")
	ErrUnsupportedAppendOption = errors.New("unsupported append option")
	ErrLoadEventFailed         = errors.New("load events failure")
	ErrEventSizeLimitExceeded  = errors.New("event record size limit exceeded")
)
View Source
var (
	ErrInvalidStreamID = errors.New("invalid stream ID")
	ErrInvalidStream   = errors.New("invalid event stream")
	ErrCursorNotFound  = errors.New("cursor not found")
	ErrInvalidCursor   = errors.New("invalid cursor")
)
View Source
var (
	ErrVersionLimitExceeded = errors.New("version limit exceeded")
	ErrVersionMalformed     = errors.New("version malformed")
	ErrInvalidSequenceIncr  = errors.New("invalid sequence increment")
	ErrVersionEOFReached    = errors.New("version EOF reached")
)
View Source
var (
	VersionMax = Version{
				// contains filtered or unexported fields
	}

	VersionMin = Version{
				// contains filtered or unexported fields
	}

	VersionZero = Version{
				// contains filtered or unexported fields
	}
)
View Source
var (
	ErrNotFoundInRegistry = errors.New("event not found in registry")
)
View Source
var (
	ErrPublishEventFailed = errors.New("publish events failed")
)

Functions

func NormalizeTypeWithNamespace

func NormalizeTypeWithNamespace(namespace string, t reflect.Type) string

func TimeRange added in v0.0.2

func TimeRange(times []time.Time) (since, until time.Time)

func Transform

func Transform(ctx context.Context, envs []Envelope, fn func(ctx context.Context, evts ...any) ([]any, error)) error

Transform the given slice of events by replacing the data (aka domain event) with the result of fn function.

func TypeOf

func TypeOf(v any) (vType string)

TypeOf returns the type of a value or its pointer

func TypeOfWithContext

func TypeOfWithContext(ctx context.Context, v any) string

TypeOfWithContext uses TypeOfWithNamespace under the hood and looks for the namespace value from the context.

func TypeOfWithNamespace

func TypeOfWithNamespace(namespace string, v any) string

TypeOfWithNamespace returns the type of the value using the given namespace. By default the type format is {package name}.{value type name}. The return is changed to {namespace}.{value type name} if namespace is not empty

func ValidateEvent

func ValidateEvent(env Envelope, cur *Cursor, opts ...func(v *Validation)) (ignore bool, err error)

ValidateEvent validates the event according to its sequence in the stream. It returns an error, or an 'ignored' boolean flag if the event is out of validation boundaries

func WithAliases

func WithAliases(aliases ...string) func(registryEntryProps)

Types

type AppendConfig added in v0.0.2

type AppendConfig struct {
	AddToTx    func(ctx context.Context) (items []any)
	AddTracing func(ctx context.Context) (traceID string)
}

AppendConfig presents a generic definition of write operation options. the behavior might differs based on the event store implementation.

type ContextKey

type ContextKey string

func (ContextKey) String

func (c ContextKey) String() string

type Cursor

type Cursor struct {
	StreamID string
	Ver      Version
	At       time.Time
}

Cursor is used to validate stream sequence.

func NewCursor

func NewCursor(streamID string) *Cursor

NewCursor returns a cursor for the given a stream

type Envelope

type Envelope interface {
	ID() string
	Type() string
	Event() any
	At() time.Time
	StreamID() string
	Version() Version
	GlobalStreamID() string
	GlobalVersion() Version
	User() string
	Dests() []string
	TTL() time.Duration
}

Envelope wraps and adds meta-data to events such us timestamp, stream ID, version

func Wrap

func Wrap(ctx context.Context, stmID StreamID, events []any, opts ...EnvelopeOption) []Envelope

Envelop wraps (with options) the given events. By default it creates a valid timestamp-based stream chunk. Not that it does not set event version or global version.

type EnvelopeOption

type EnvelopeOption func(env RWEnvelope)

EnvelopeOption defines a functional option that allows to override some envelope properties.

func WithGlobalVersionIncr

func WithGlobalVersionIncr(startingVer Version, limit int, diff VersionSequenceDiff) EnvelopeOption

func WithNameSpace

func WithNameSpace(namespace string) EnvelopeOption

func WithVersionIncr

func WithVersionIncr(startingVer Version, limit int, diff VersionSequenceDiff) EnvelopeOption

type GlobalVersionSetter added in v0.0.2

type GlobalVersionSetter interface {
	Envelope
	SetGlobalVersion(v Version) Envelope
}

GlobalVersionSetter defines an envelope with a global version setter.

func MustGlobalVersionSetter added in v0.0.2

func MustGlobalVersionSetter(env Envelope) GlobalVersionSetter

MustGlobalVersionSetter asserts that the given envelope implements GlobalVersionSetter interface.

type ID

type ID interface {
	String() string
}

ID presents the interface responsible for generating UID. It serves as an abstraction to avoid strong coupling to a specific library/algorithm

func UID

func UID() ID

type NamespaceSetter added in v0.0.2

type NamespaceSetter interface {
	Envelope
	SetNamespace(namespace string) Envelope
}

NamespaceSetter defines an envelope with a namespace setter.

func MustNamespaceSetter added in v0.0.2

func MustNamespaceSetter(env Envelope) NamespaceSetter

MustNamespaceSetter asserts that the given envelope implements NamespaceSetter interface.

type Publishable

type Publishable interface{ EvDests() []string }

Publishable presents an event that must be forwarded (in a push fashion) to some system Processors (ex: push-based Projectors) Note that not all events must be publishable, i.e poll-based Projectors may query the durable event store and replay a chunk of events in a regular basis

type Publisher

type Publisher interface {
	// Publish a chunk of events to their according destinations.
	Publish(ctx context.Context, events []Envelope) error
}

Publisher presents the service responsible for publishing events to the given destinations

type RWEnvelope

type RWEnvelope interface {
	Envelope
	SetAt(t time.Time) Envelope
	SetUser(userID string) Envelope
	SetVersion(v Version) Envelope
	SetDests(dests []string) Envelope
	SetTTL(ttl time.Duration) Envelope
}

RWEnvelope defines an envelope with some properties setter methods.

type Register

type Register interface {
	// Set register the given event in the registry.
	// In case the event is pointer it register its value instead.
	Set(event any, opts ...func(registryEntryProps)) Register

	// Get returns a pointer of a zero type's value.
	Get(name string) (ptr any, err error)

	// GetFromGlobal returns a pointer to equivalent event of the given one from the global registry.
	GetFromGlobal(evt any) (ptr any, err error)

	// All returns all registered events in the current namespace. It returns both event type and
	// default value.
	All() map[string]registryEntry

	// Clear all namespace registries. It's mainly used for internal tests
	Clear()
}

Register defines the registry service for domain events

func NewRegister

func NewRegister(namespace string) Register

NewRegister returns a Register instance for the given namespace.

func NewRegisterFrom

func NewRegisterFrom(ctx context.Context) Register

NewRegisterFrom context returns a new instance of the register using the namespace found in the context. Otherwise, it returns an instance base on the global namespace

type Serializer

type Serializer interface {

	// MarshalEvent returns a binary version of the event and its size according to the supported format.
	MarshalEvent(ctx context.Context, event Envelope) ([]byte, error)

	// MarshalEventBatch returns a binary version of the given chunk of events. It also returns a slice of events' size.
	// It fails if chunk is empty.
	MarshalEventBatch(ctx context.Context, events []Envelope) ([]byte, error)

	// UnmarshalEvent returns an event envelope based on the binary/raw given event.
	// The returned envelope might be nil in case the event type if not found in the registry.
	UnmarshalEvent(ctx context.Context, b []byte) (Envelope, error)

	// UnmarshalEvent returns a slice of envelopes based on the binary given chunk of events.
	// Similarly to UnmarshalEvent, events might be nil if event type is not found in the registry.
	UnmarshalEventBatch(ctx context.Context, b []byte) ([]Envelope, error)
}

Serializer provides a standard encoding/decoding interface for events

type Store

type Store interface {
	// Append save a chunk (aka record) of events into a stream
	Append(ctx context.Context, id StreamID, events []Envelope, optFns ...func(*AppendConfig)) error
	// Load events from a stream based on the given timestamp range
	Load(ctx context.Context, id StreamID, trange ...time.Time) ([]Envelope, error)
}

Store defines the interface of the event logging store aka timestamp-based event store. Check sourcing package for a version-based Store interface definition.

type Stream

type Stream []Envelope

Stream presents a collection of consecutive events

func (Stream) Empty

func (stm Stream) Empty() bool

func (Stream) EventIDs

func (stm Stream) EventIDs() []string

func (Stream) Events

func (s Stream) Events() []any

Events unwraps the event envelops and returns the domain events of the stream chunk

func (Stream) Validate

func (stm Stream) Validate(opts ...func(v *Validation)) error

Validate a chunk of events. Validate will define the validation boundaries and cursor based on the current, any validation boundaries set at the option-level will ignored.

type StreamData

type StreamData struct {
	Type  StreamDataType
	Value any
	Stm   string
}

StreamData mainly contains an event record. It might also contain some infra-related signals such as stream termination or lagging. The consumer must check StreamData type and behave accordingly. Note that StreamData was initially introduced to deal with an old S3-based event store implementation.

type StreamDataType

type StreamDataType string
const (
	StreamDataTypeRecord   StreamDataType = "record"
	StreamDataTypeEnd      StreamDataType = "end"
	StreamDataTypeContinue StreamDataType = "continue"
)

type StreamID

type StreamID struct {
	// contains filtered or unexported fields
}

StreamID presents a composed event stream ID, the global part identifies the global stream ex: TenantID. While the other parts can identify the service, bounded context, or the root entity, etc

func NewStreamID

func NewStreamID(global string, parts ...string) StreamID

NewStreamID returns a composed event stream ID. It panics if the global stream ID or parts are not alphanumeric,"-","_","+"

func ParseStreamID

func ParseStreamID(streamID string) (StreamID, error)

func (StreamID) Global

func (si StreamID) Global() bool

Global returns true if the given stream is a global one A global stream ID does not have parts

func (StreamID) GlobalID

func (si StreamID) GlobalID() string

GlobalID returns the first part of the streamID which represents the global stream ID, e.g TenantID

func (StreamID) Parts

func (si StreamID) Parts() []string

Parts returns the event stream ID parts others than the global ID

func (StreamID) String

func (si StreamID) String() string

String returns the ID of the stream

type Streamer

type Streamer interface {
	// Replay a stream based on the given query params.
	// Replay capabilities and behavior are implementation-specific
	Replay(ctx context.Context, streamID StreamID, q StreamerQuery, h StreamerHandler) error
}

Streamer mainly used to query global streams for event replay and projections.

type StreamerHandler

type StreamerHandler func(ctx context.Context, data StreamData) error

StreamerHandler process the given event in the replay stream process

type StreamerQuery

type StreamerQuery struct {
	From, To    Version
	RecordLimit uint
	Order       StreamerReplayOrder
}

StreamerQuery allows to filter stream based on a pre-defined range, limit, and order

func (*StreamerQuery) Build

func (q *StreamerQuery) Build()

Build applies filter default values if they are missing. In case of "To" is defined, it has to be within the range defined by "From" + "Limit"

type StreamerReplayOrder

type StreamerReplayOrder string
const (
	StreamerReplayOrderASC  StreamerReplayOrder = "ASC"
	StreamerReplayOrderDESC StreamerReplayOrder = "DESC"
)

type Transformer

type Transformer interface {
	Transform(fn func(cur any) (new any))
}

type Validation

type Validation struct {
	GlobalStream  bool
	Boundaries    ValidationBoundaries
	SkipVersion   bool
	SkipTimeStamp bool
}

Validation presents the stream validation options

type ValidationBoundaries

type ValidationBoundaries struct {
	Since, Until time.Time
	From, To     Version
}

ValidationBoundaries filters extra events contained in the result's chunks.

Note that this case was relevant for an old S3-based implementation of the event store; It might be deprecated and removed later.

func (*ValidationBoundaries) Build

func (vb *ValidationBoundaries) Build()

Build applies ValidationBoundaries's default values if values are zero.

type Version

type Version struct {
	// contains filtered or unexported fields
}

func NewVersion

func NewVersion() Version

NewVersion returns new version with minimal value. It fails if version is malformed

func ParseVersion

func ParseVersion(str string) (Version, error)

ParseVersion the given string version

func Ver

func Ver(str ...string) (Version, error)

Ver does parse the given version (the first string arguments), or returns a new one with minimal value

func VersionRange added in v0.0.2

func VersionRange(vers []Version) (from, to Version)

func (Version) Add

func (v Version) Add(p uint64, d uint8) Version

Add increments the version using the given values for both integer and fractional parts

func (Version) After

func (v Version) After(ov Version) bool

func (Version) Before

func (v Version) Before(ov Version) bool

func (Version) Between

func (v Version) Between(v1, v2 Version) bool

func (Version) Compare

func (v Version) Compare(ov Version) int

func (Version) Decr

func (v Version) Decr() Version

Decr decrements the version integer part while removing the fractional part from the returned version

func (Version) Drop

func (v Version) Drop(p uint64, d uint8) Version

Drop decrements the version using the given values for both integer and fractional parts

func (Version) EOF

func (v Version) EOF() Version

EOF mark the current version as "End OF Fractional Part"

func (Version) Equal

func (v Version) Equal(ov Version) bool

func (Version) Incr

func (v Version) Incr() Version

Incr increments the version integer part while removing the fractional part from the returned version

func (Version) IsZero

func (v Version) IsZero() bool

func (Version) Next

func (v Version) Next(to Version) bool

func (Version) String

func (v Version) String() string

String serializes the version and returns a sorted string value

func (Version) Trunc

func (v Version) Trunc() Version

Trunc truncates the fractional part and returns a new version

type VersionSequenceDiff

type VersionSequenceDiff int

VersionSequenceDiff defines the difference between two consecutive versions. Two consecutive events within the same record have a fractional difference of .1. Two consecutive events belonging to separate records have a difference of 1.

const (
	VersionSeqDiffFracPart VersionSequenceDiff = iota
	VersionSeqDiffPart
)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL