Documentation ¶
Overview ¶
Easy-to-use consumer API on top of EventHorizon client (which is lower-level)
Index ¶
- func ConfigFromENV() (string, error)
- func LogIgnoredUnrecognizedEventType(e ehevent.Event, logl *logex.Leveled)
- func NewDynamoDbSnapshotStore(opts ehdynamodb.DynamoDbOptions) (eh.SnapshotStore, error)
- func UnsupportedEventTypeErr(e ehevent.Event) error
- type Client
- type Config
- type ConfigStringGetter
- type DynamoSnapshotItem
- type EventProcessorHandler
- type EventsProcessor
- type LogDataDeserializerFn
- type LogDataKindDeserializer
- type NoSnapshots
- type Reader
- type SnapshotReference
- type SnapshotVersion
- type SystemClient
- func (e *SystemClient) Append(ctx context.Context, stream eh.StreamName, events ...ehevent.Event) error
- func (e *SystemClient) AppendAfter(ctx context.Context, after eh.Cursor, events ...ehevent.Event) error
- func (e *SystemClient) AppendStrings(ctx context.Context, stream eh.StreamName, eventsSerialized []string) error
- func (e *SystemClient) CreateStream(ctx context.Context, stream eh.StreamName, data *eh.LogData) (*eh.AppendResult, error)
- func (s *SystemClient) GetServerUrl() string
- func (e *SystemClient) LoadDEKv0(ctx context.Context, stream eh.StreamName) ([]byte, error)
- func (s *SystemClient) Logger(prefix string) *log.Logger
- type SystemConnector
- type Tenant
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ConfigFromENV ¶
reads the config string from an environment variable
func LogIgnoredUnrecognizedEventType ¶
another helper
func NewDynamoDbSnapshotStore ¶
func NewDynamoDbSnapshotStore(opts ehdynamodb.DynamoDbOptions) (eh.SnapshotStore, error)
func UnsupportedEventTypeErr ¶
helper for your code to generate an error
Types ¶
type Client ¶
type Client struct { Tenant *SystemClient }
combines together: - event log - snapshot store - tenant
func ClientFrom ¶
func ClientFrom( getter ConfigStringGetter, logger *log.Logger, sysConn SystemConnector, ) (*Client, error)
type Config ¶
type Config struct {
// contains filtered or unexported fields
}
func (*Config) ClientDynamoDbOptions ¶
func (c *Config) ClientDynamoDbOptions() ehdynamodb.DynamoDbOptions
func (*Config) SnapshotsDynamoDbOptions ¶
func (c *Config) SnapshotsDynamoDbOptions() ehdynamodb.DynamoDbOptions
type ConfigStringGetter ¶
type DynamoSnapshotItem ¶
type DynamoSnapshotItem struct { Stream string `json:"s"` // stream + perspective form the composite key Perspective string `json:"c"` // different software can have different perspective for the same stream Version int64 `json:"v"` // we conditionally put updates into DynamoDB as not to overwrite advanced state RawData []byte `json:"d"` // actual snapshot data, probably encrypted }
type EventProcessorHandler ¶
type EventProcessorHandler func( cur eh.Cursor, handleEvent func(ehevent.Event) error, commit func(eh.Cursor) error, ) error
encapsulates:
1) validate that current version is what we think it is 2) process events (via callback) 3) commit (via callback), while updating version
type EventsProcessor ¶
type EventsProcessor interface { /* returns error if: - failed to start a tx (user code) - failed to resolve the version (user code) - handle (infra code) failed which itself contains: * error for handleEvent() * commit() * etc. */ ProcessEvents(ctx context.Context, handle EventProcessorHandler) error // 1st: types for app-specific events, 2nd: EventHorizon meta events (most times nil !) GetEventTypes() []LogDataKindDeserializer InstallSnapshot(*eh.Snapshot) error Snapshot() (*eh.Snapshot, error) // return "" if you don't implement snapshots Perspective() eh.SnapshotPerspective }
type LogDataDeserializerFn ¶
type LogDataKindDeserializer ¶
type LogDataKindDeserializer struct { Kind eh.LogDataKind Deserializer LogDataDeserializerFn // otherwise could be detected from Kind==LogDataKindEncryptedData but see "synthetic statistics" // (which lies about deserializing LogDataKindEncryptedData but does not actually deal with encryption) Encryption bool }
func EncryptedDataDeserializer ¶
func EncryptedDataDeserializer(types ehevent.Types) []LogDataKindDeserializer
returns a slice for ergonomics
func MetaDeserializer ¶
func MetaDeserializer() []LogDataKindDeserializer
returns a slice for ergonomics
func MetaDeserializer2 ¶
func MetaDeserializer2(types ehevent.Types) []LogDataKindDeserializer
type NoSnapshots ¶
type NoSnapshots struct{}
embed this struct in your implementation if you want to opt-out of snapshotting
func (*NoSnapshots) InstallSnapshot ¶
func (n *NoSnapshots) InstallSnapshot(_ *eh.Snapshot) error
func (*NoSnapshots) Perspective ¶
func (n *NoSnapshots) Perspective() eh.SnapshotPerspective
type Reader ¶
type Reader struct {
// contains filtered or unexported fields
}
Serves reads for one processor. not safe for concurrent use
func NewReader ¶
func NewReader(processor EventsProcessor, client *SystemClient) *Reader
"keep processor happy by feeding it from client"
func (*Reader) AddLogPrefix ¶
you should probably not use this
func (*Reader) LoadUntilRealtime ¶
feeds events to processor from the beginning of stream's event log (or as optimization starts from snapshot if there is one) and reads until we have reached realtime (= no more newer events) state
func (*Reader) LoadUntilRealtimeIfStale ¶
func (r *Reader) LoadUntilRealtimeIfStale( ctx context.Context, staleDuration time.Duration, ) error
same as LoadUntilRealtime(), but only loads if not done so recently. safe for concurrent access.
func (*Reader) TransactWrite ¶
wraps your AppendAfter() result with state-refreshed retries for ErrOptimisticLockingFailed FIXME: currently this cannot be used along with Synchronizer(), because the Reader
is not safe for concurrent use
type SnapshotReference ¶
type SnapshotReference struct { Stream eh.StreamName Perspective eh.SnapshotPerspective }
type SnapshotVersion ¶
type SnapshotVersion struct { Snapshot SnapshotReference Version int64 }
type SystemClient ¶
type SystemClient struct { EventLog eh.ReaderWriter SnapshotStore eh.SnapshotStore // contains filtered or unexported fields }
same as Client, but *WITHOUT* tenant awareness, usually this is not used directly, as most data access cases are expected to be tenant-aware
func SystemClientFrom ¶
func SystemClientFrom( getter ConfigStringGetter, logger *log.Logger, sysConn SystemConnector, ) (*SystemClient, error)
func (*SystemClient) Append ¶
func (e *SystemClient) Append(ctx context.Context, stream eh.StreamName, events ...ehevent.Event) error
func (*SystemClient) AppendAfter ¶
func (*SystemClient) AppendStrings ¶
func (e *SystemClient) AppendStrings(ctx context.Context, stream eh.StreamName, eventsSerialized []string) error
func (*SystemClient) CreateStream ¶
func (e *SystemClient) CreateStream( ctx context.Context, stream eh.StreamName, data *eh.LogData, ) (*eh.AppendResult, error)
TODO: maybe make *eh.LogData be returned from a cb, because for encrypted LogData it
depends on the generated DEK
func (*SystemClient) GetServerUrl ¶
func (s *SystemClient) GetServerUrl() string
returns empty if running at server side
func (*SystemClient) LoadDEKv0 ¶
func (e *SystemClient) LoadDEKv0(ctx context.Context, stream eh.StreamName) ([]byte, error)
loads DEK (Data Encryption Key) for a given stream (by loading DEK envelope and decrypting it) v0 in name to emphasize the fact that key rotation is not yet implemented.
type SystemConnector ¶
type SystemConnector interface { DEKv0EnvelopeForNewStream(context.Context, eh.StreamName) (*envelopeenc.EnvelopeBundle, error) ResolveDEK(context.Context, eh.StreamName) ([]byte, error) }
things we need from outside of client package to make the system as a whole work. this is basically to circumvent circular dependencies (using client required system stores which depend on Reader interface). this is also the reason for "ehclientfactory" package TODO: extract the interfaces the stores depend on, so we don't need this?
type Tenant ¶
type Tenant struct {
// contains filtered or unexported fields
}
func (Tenant) ChildStream ¶
func (t Tenant) ChildStream(name string) eh.StreamName
return tenant's stream's (e.g. "/t-314") child-stream ChildStream("foobar") => "/t-314/foobar"
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Test helpers for testing EventHorizon consumers.
|
Test helpers for testing EventHorizon consumers. |