Documentation ¶
Index ¶
- Constants
- Variables
- func NormalizeTypeWithNamespace(namespace string, t reflect.Type) string
- func TimeRange(times []time.Time) (since, until time.Time)
- func Transform(ctx context.Context, envs []Envelope, ...) error
- func TypeOf(v any) (vType string)
- func TypeOfWithContext(ctx context.Context, v any) string
- func TypeOfWithNamespace(namespace string, v any) string
- func ValidateEvent(env Envelope, cur *Cursor, opts ...func(v *Validation)) (ignore bool, err error)
- func WithAliases(aliases ...string) func(registryEntryProps)
- type AppendConfig
- type ContextKey
- type Cursor
- type Envelope
- type EnvelopeOption
- type GlobalVersionSetter
- type ID
- type NamespaceSetter
- type Publishable
- type Publisher
- type RWEnvelope
- type Register
- type Serializer
- type Store
- type Stream
- type StreamData
- type StreamDataType
- type StreamID
- type Streamer
- type StreamerHandler
- type StreamerQuery
- type StreamerReplayOrder
- type Transformer
- type Validation
- type ValidationBoundaries
- type Version
- func (v Version) Add(p uint64, d uint8) Version
- func (v Version) After(ov Version) bool
- func (v Version) Before(ov Version) bool
- func (v Version) Between(v1, v2 Version) bool
- func (v Version) Compare(ov Version) int
- func (v Version) Decr() Version
- func (v Version) Drop(p uint64, d uint8) Version
- func (v Version) EOF() Version
- func (v Version) Equal(ov Version) bool
- func (v Version) Incr() Version
- func (v Version) IsZero() bool
- func (v Version) Next(to Version) bool
- func (v Version) String() string
- func (v Version) Trunc() Version
- type VersionSequenceDiff
Constants ¶
const ( VersionPartZeroStr = "00000000000000000000" VersionFracZeroStr = "000" )
const ( VersionSuffixEOF = "e" VersionSuffixNotEOF = "~" )
const GlobalRegistryName = ""
const (
StreamIDPartsDelimiter = "#"
)
const (
StreamerReplayQueryDefaultLimit uint = 500
)
Variables ¶
var ( ContextNamespaceKey = ContextKey("namespace") ContextUserKey = ContextKey("user") )
var ( ErrMarshalEventFailed = errors.New("marshal event(s) failed") ErrMarshalEmptyEvent = errors.New("event to marshal is empty") ErrUnmarshalEventFailed = errors.New("unmarshal event(s) failed") )
var ( ErrAppendEventsConflict = errors.New("append events conflict") ErrAppendEventsFailed = errors.New("append events failure") ErrUnsupportedAppendOption = errors.New("unsupported append option") ErrLoadEventFailed = errors.New("load events failure") ErrEventSizeLimitExceeded = errors.New("event record size limit exceeded") )
var ( ErrInvalidStreamID = errors.New("invalid stream ID") ErrInvalidStream = errors.New("invalid event stream") ErrCursorNotFound = errors.New("cursor not found") ErrInvalidCursor = errors.New("invalid cursor") )
var ( ErrVersionLimitExceeded = errors.New("version limit exceeded") ErrVersionMalformed = errors.New("version malformed") ErrInvalidSequenceIncr = errors.New("invalid sequence increment") ErrVersionEOFReached = errors.New("version EOF reached") )
var ( VersionMax = Version{ // contains filtered or unexported fields } VersionMin = Version{ // contains filtered or unexported fields } VersionZero = Version{ // contains filtered or unexported fields } )
var (
ErrNotFoundInRegistry = errors.New("event not found in registry")
)
var (
ErrPublishEventFailed = errors.New("publish events failed")
)
Functions ¶
func Transform ¶
func Transform(ctx context.Context, envs []Envelope, fn func(ctx context.Context, evts ...any) ([]any, error)) error
Transform the given slice of events by replacing the data (aka domain event) with the result of fn function.
func TypeOfWithContext ¶
TypeOfWithContext uses TypeOfWithNamespace under the hood and looks for the namespace value from the context.
func TypeOfWithNamespace ¶
TypeOfWithNamespace returns the type of the value using the given namespace. By default the type format is {package name}.{value type name}. The return is changed to {namespace}.{value type name} if namespace is not empty
func ValidateEvent ¶
func ValidateEvent(env Envelope, cur *Cursor, opts ...func(v *Validation)) (ignore bool, err error)
ValidateEvent validates the event according to its sequence in the stream. It returns an error, or an 'ignored' boolean flag if the event is out of validation boundaries
func WithAliases ¶
func WithAliases(aliases ...string) func(registryEntryProps)
Types ¶
type AppendConfig ¶ added in v0.0.2
type AppendConfig struct { AddToTx func(ctx context.Context) (items []any) AddTracing func(ctx context.Context) (traceID string) }
AppendConfig presents a generic definition of write operation options. the behavior might differs based on the event store implementation.
type ContextKey ¶
type ContextKey string
func (ContextKey) String ¶
func (c ContextKey) String() string
type Envelope ¶
type Envelope interface { ID() string Type() string Event() any At() time.Time StreamID() string Version() Version GlobalStreamID() string GlobalVersion() Version User() string Dests() []string TTL() time.Duration }
Envelope wraps and adds meta-data to events such us timestamp, stream ID, version
type EnvelopeOption ¶
type EnvelopeOption func(env RWEnvelope)
EnvelopeOption defines a functional option that allows to override some envelope properties.
func WithGlobalVersionIncr ¶
func WithGlobalVersionIncr(startingVer Version, limit int, diff VersionSequenceDiff) EnvelopeOption
func WithNameSpace ¶
func WithNameSpace(namespace string) EnvelopeOption
func WithVersionIncr ¶
func WithVersionIncr(startingVer Version, limit int, diff VersionSequenceDiff) EnvelopeOption
type GlobalVersionSetter ¶ added in v0.0.2
GlobalVersionSetter defines an envelope with a global version setter.
func MustGlobalVersionSetter ¶ added in v0.0.2
func MustGlobalVersionSetter(env Envelope) GlobalVersionSetter
MustGlobalVersionSetter asserts that the given envelope implements GlobalVersionSetter interface.
type ID ¶
type ID interface {
String() string
}
ID presents the interface responsible for generating UID. It serves as an abstraction to avoid strong coupling to a specific library/algorithm
type NamespaceSetter ¶ added in v0.0.2
NamespaceSetter defines an envelope with a namespace setter.
func MustNamespaceSetter ¶ added in v0.0.2
func MustNamespaceSetter(env Envelope) NamespaceSetter
MustNamespaceSetter asserts that the given envelope implements NamespaceSetter interface.
type Publishable ¶
type Publishable interface{ EvDests() []string }
Publishable presents an event that must be forwarded (in a push fashion) to some system Processors (ex: push-based Projectors) Note that not all events must be publishable, i.e poll-based Projectors may query the durable event store and replay a chunk of events in a regular basis
type Publisher ¶
type Publisher interface { // Publish a chunk of events to their according destinations. Publish(ctx context.Context, events []Envelope) error }
Publisher presents the service responsible for publishing events to the given destinations
type RWEnvelope ¶
type RWEnvelope interface { Envelope SetAt(t time.Time) Envelope SetUser(userID string) Envelope SetVersion(v Version) Envelope SetDests(dests []string) Envelope SetTTL(ttl time.Duration) Envelope }
RWEnvelope defines an envelope with some properties setter methods.
type Register ¶
type Register interface { // Set register the given event in the registry. // In case the event is pointer it register its value instead. Set(event any, opts ...func(registryEntryProps)) Register // Get returns a pointer of a zero type's value. Get(name string) (ptr any, err error) // GetFromGlobal returns a pointer to equivalent event of the given one from the global registry. GetFromGlobal(evt any) (ptr any, err error) // All returns all registered events in the current namespace. It returns both event type and // default value. All() map[string]registryEntry // Clear all namespace registries. It's mainly used for internal tests Clear() }
Register defines the registry service for domain events
func NewRegister ¶
NewRegister returns a Register instance for the given namespace.
func NewRegisterFrom ¶
NewRegisterFrom context returns a new instance of the register using the namespace found in the context. Otherwise, it returns an instance base on the global namespace
type Serializer ¶
type Serializer interface { // MarshalEvent returns a binary version of the event and its size according to the supported format. MarshalEvent(ctx context.Context, event Envelope) ([]byte, error) // MarshalEventBatch returns a binary version of the given chunk of events. It also returns a slice of events' size. // It fails if chunk is empty. MarshalEventBatch(ctx context.Context, events []Envelope) ([]byte, error) // UnmarshalEvent returns an event envelope based on the binary/raw given event. // The returned envelope might be nil in case the event type if not found in the registry. UnmarshalEvent(ctx context.Context, b []byte) (Envelope, error) // UnmarshalEvent returns a slice of envelopes based on the binary given chunk of events. // Similarly to UnmarshalEvent, events might be nil if event type is not found in the registry. UnmarshalEventBatch(ctx context.Context, b []byte) ([]Envelope, error) }
Serializer provides a standard encoding/decoding interface for events
type Store ¶
type Store interface { // Append save a chunk (aka record) of events into a stream Append(ctx context.Context, id StreamID, events []Envelope, optFns ...func(*AppendConfig)) error // Load events from a stream based on the given timestamp range Load(ctx context.Context, id StreamID, trange ...time.Time) ([]Envelope, error) }
Store defines the interface of the event logging store aka timestamp-based event store. Check sourcing package for a version-based Store interface definition.
type Stream ¶
type Stream []Envelope
Stream presents a collection of consecutive events
func (Stream) Events ¶
Events unwraps the event envelops and returns the domain events of the stream chunk
func (Stream) Validate ¶
func (stm Stream) Validate(opts ...func(v *Validation)) error
Validate a chunk of events. Validate will define the validation boundaries and cursor based on the current, any validation boundaries set at the option-level will ignored.
type StreamData ¶
type StreamData struct { Type StreamDataType Value any Stm string }
StreamData mainly contains an event record. It might also contain some infra-related signals such as stream termination or lagging. The consumer must check StreamData type and behave accordingly. Note that StreamData was initially introduced to deal with an old S3-based event store implementation.
type StreamDataType ¶
type StreamDataType string
const ( StreamDataTypeRecord StreamDataType = "record" StreamDataTypeEnd StreamDataType = "end" StreamDataTypeContinue StreamDataType = "continue" )
type StreamID ¶
type StreamID struct {
// contains filtered or unexported fields
}
StreamID presents a composed event stream ID, the global part identifies the global stream ex: TenantID. While the other parts can identify the service, bounded context, or the root entity, etc
func NewStreamID ¶
NewStreamID returns a composed event stream ID. It panics if the global stream ID or parts are not alphanumeric,"-","_","+"
func ParseStreamID ¶
func (StreamID) Global ¶
Global returns true if the given stream is a global one A global stream ID does not have parts
func (StreamID) GlobalID ¶
GlobalID returns the first part of the streamID which represents the global stream ID, e.g TenantID
type Streamer ¶
type Streamer interface { // Replay a stream based on the given query params. // Replay capabilities and behavior are implementation-specific Replay(ctx context.Context, streamID StreamID, q StreamerQuery, h StreamerHandler) error }
Streamer mainly used to query global streams for event replay and projections.
type StreamerHandler ¶
type StreamerHandler func(ctx context.Context, data StreamData) error
StreamerHandler process the given event in the replay stream process
type StreamerQuery ¶
type StreamerQuery struct {
From, To Version
RecordLimit uint
Order StreamerReplayOrder
}
StreamerQuery allows to filter stream based on a pre-defined range, limit, and order
func (*StreamerQuery) Build ¶
func (q *StreamerQuery) Build()
Build applies filter default values if they are missing. In case of "To" is defined, it has to be within the range defined by "From" + "Limit"
type StreamerReplayOrder ¶
type StreamerReplayOrder string
const ( StreamerReplayOrderASC StreamerReplayOrder = "ASC" StreamerReplayOrderDESC StreamerReplayOrder = "DESC" )
type Transformer ¶
type Validation ¶
type Validation struct { GlobalStream bool Boundaries ValidationBoundaries SkipVersion bool SkipTimeStamp bool }
Validation presents the stream validation options
type ValidationBoundaries ¶
ValidationBoundaries filters extra events contained in the result's chunks.
Note that this case was relevant for an old S3-based implementation of the event store; It might be deprecated and removed later.
func (*ValidationBoundaries) Build ¶
func (vb *ValidationBoundaries) Build()
Build applies ValidationBoundaries's default values if values are zero.
type Version ¶
type Version struct {
// contains filtered or unexported fields
}
func NewVersion ¶
func NewVersion() Version
NewVersion returns new version with minimal value. It fails if version is malformed
func ParseVersion ¶
ParseVersion the given string version
func Ver ¶
Ver does parse the given version (the first string arguments), or returns a new one with minimal value
func VersionRange ¶ added in v0.0.2
func (Version) Add ¶
Add increments the version using the given values for both integer and fractional parts
func (Version) Decr ¶
Decr decrements the version integer part while removing the fractional part from the returned version
func (Version) Drop ¶
Drop decrements the version using the given values for both integer and fractional parts
func (Version) Incr ¶
Incr increments the version integer part while removing the fractional part from the returned version
type VersionSequenceDiff ¶
type VersionSequenceDiff int
VersionSequenceDiff defines the difference between two consecutive versions. Two consecutive events within the same record have a fractional difference of .1. Two consecutive events belonging to separate records have a difference of 1.
const ( VersionSeqDiffFracPart VersionSequenceDiff = iota VersionSeqDiffPart )