ehclient

package
v0.2.1-0...-14f66c8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 24, 2021 License: Apache-2.0 Imports: 24 Imported by: 0

Documentation

Overview

Easy-to-use consumer API on top of EventHorizon client (which is lower-level)

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ConfigFromENV

func ConfigFromENV() (string, error)

reads the config string from an environment variable

func LogIgnoredUnrecognizedEventType

func LogIgnoredUnrecognizedEventType(e ehevent.Event, logl *logex.Leveled)

another helper

func NewDynamoDbSnapshotStore

func NewDynamoDbSnapshotStore(opts ehdynamodb.DynamoDbOptions) (eh.SnapshotStore, error)

func UnsupportedEventTypeErr

func UnsupportedEventTypeErr(e ehevent.Event) error

helper for your code to generate an error

Types

type Client

type Client struct {
	Tenant
	*SystemClient
}

combines together: - event log - snapshot store - tenant

func ClientFrom

func ClientFrom(
	getter ConfigStringGetter,
	logger *log.Logger,
	sysConn SystemConnector,
) (*Client, error)

type Config

type Config struct {
	// contains filtered or unexported fields
}

func (*Config) ClientDynamoDbOptions

func (c *Config) ClientDynamoDbOptions() ehdynamodb.DynamoDbOptions

func (*Config) SnapshotsDynamoDbOptions

func (c *Config) SnapshotsDynamoDbOptions() ehdynamodb.DynamoDbOptions

type ConfigStringGetter

type ConfigStringGetter func() (string, error)

type DynamoSnapshotItem

type DynamoSnapshotItem struct {
	Stream      string `json:"s"` // stream + perspective form the composite key
	Perspective string `json:"c"` // different software can have different perspective for the same stream
	Version     int64  `json:"v"` // we conditionally put updates into DynamoDB as not to overwrite advanced state
	RawData     []byte `json:"d"` // actual snapshot data, probably encrypted
}

type EventProcessorHandler

type EventProcessorHandler func(
	cur eh.Cursor,
	handleEvent func(ehevent.Event) error,
	commit func(eh.Cursor) error,
) error
encapsulates:

1) validate that current version is what we think it is 2) process events (via callback) 3) commit (via callback), while updating version

type EventsProcessor

type EventsProcessor interface {
	/*	returns error if:

		- failed to start a tx (user code)
		- failed to resolve the version (user code)
		- handle (infra code) failed which itself contains:
		  * error for handleEvent()
		  * commit()
		  * etc.
	*/
	ProcessEvents(ctx context.Context, handle EventProcessorHandler) error
	// 1st: types for app-specific events, 2nd: EventHorizon meta events (most times nil !)
	GetEventTypes() []LogDataKindDeserializer

	InstallSnapshot(*eh.Snapshot) error
	Snapshot() (*eh.Snapshot, error)
	// return "" if you don't implement snapshots
	Perspective() eh.SnapshotPerspective
}

type LogDataDeserializerFn

type LogDataDeserializerFn func(ctx context.Context, entry *eh.LogEntry, client *SystemClient) ([]ehevent.Event, error)

type LogDataKindDeserializer

type LogDataKindDeserializer struct {
	Kind         eh.LogDataKind
	Deserializer LogDataDeserializerFn

	// otherwise could be detected from Kind==LogDataKindEncryptedData but see "synthetic statistics"
	// (which lies about deserializing LogDataKindEncryptedData but does not actually deal with encryption)
	Encryption bool
}

func EncryptedDataDeserializer

func EncryptedDataDeserializer(types ehevent.Types) []LogDataKindDeserializer

returns a slice for ergonomics

func MetaDeserializer

func MetaDeserializer() []LogDataKindDeserializer

returns a slice for ergonomics

func MetaDeserializer2

func MetaDeserializer2(types ehevent.Types) []LogDataKindDeserializer

type NoSnapshots

type NoSnapshots struct{}

embed this struct in your implementation if you want to opt-out of snapshotting

func (*NoSnapshots) InstallSnapshot

func (n *NoSnapshots) InstallSnapshot(_ *eh.Snapshot) error

func (*NoSnapshots) Perspective

func (n *NoSnapshots) Perspective() eh.SnapshotPerspective

func (*NoSnapshots) Snapshot

func (n *NoSnapshots) Snapshot() (*eh.Snapshot, error)

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Serves reads for one processor. not safe for concurrent use

func NewReader

func NewReader(processor EventsProcessor, client *SystemClient) *Reader

"keep processor happy by feeding it from client"

func (*Reader) AddLogPrefix

func (r *Reader) AddLogPrefix(prefix string)

you should probably not use this

func (*Reader) LoadUntilRealtime

func (r *Reader) LoadUntilRealtime(ctx context.Context) error

feeds events to processor from the beginning of stream's event log (or as optimization starts from snapshot if there is one) and reads until we have reached realtime (= no more newer events) state

func (*Reader) LoadUntilRealtimeIfStale

func (r *Reader) LoadUntilRealtimeIfStale(
	ctx context.Context,
	staleDuration time.Duration,
) error

same as LoadUntilRealtime(), but only loads if not done so recently. safe for concurrent access.

func (*Reader) TransactWrite

func (r *Reader) TransactWrite(ctx context.Context, fn func() error) error

wraps your AppendAfter() result with state-refreshed retries for ErrOptimisticLockingFailed FIXME: currently this cannot be used along with Synchronizer(), because the Reader

is not safe for concurrent use

type SnapshotReference

type SnapshotReference struct {
	Stream      eh.StreamName
	Perspective eh.SnapshotPerspective
}

type SnapshotVersion

type SnapshotVersion struct {
	Snapshot SnapshotReference
	Version  int64
}

type SystemClient

type SystemClient struct {
	EventLog      eh.ReaderWriter
	SnapshotStore eh.SnapshotStore
	// contains filtered or unexported fields
}

same as Client, but *WITHOUT* tenant awareness, usually this is not used directly, as most data access cases are expected to be tenant-aware

func SystemClientFrom

func SystemClientFrom(
	getter ConfigStringGetter,
	logger *log.Logger,
	sysConn SystemConnector,
) (*SystemClient, error)

func (*SystemClient) Append

func (e *SystemClient) Append(ctx context.Context, stream eh.StreamName, events ...ehevent.Event) error

func (*SystemClient) AppendAfter

func (e *SystemClient) AppendAfter(ctx context.Context, after eh.Cursor, events ...ehevent.Event) error

func (*SystemClient) AppendStrings

func (e *SystemClient) AppendStrings(ctx context.Context, stream eh.StreamName, eventsSerialized []string) error

func (*SystemClient) CreateStream

func (e *SystemClient) CreateStream(
	ctx context.Context,
	stream eh.StreamName,
	data *eh.LogData,
) (*eh.AppendResult, error)

TODO: maybe make *eh.LogData be returned from a cb, because for encrypted LogData it

depends on the generated DEK

func (*SystemClient) GetServerUrl

func (s *SystemClient) GetServerUrl() string

returns empty if running at server side

func (*SystemClient) LoadDEKv0

func (e *SystemClient) LoadDEKv0(ctx context.Context, stream eh.StreamName) ([]byte, error)

loads DEK (Data Encryption Key) for a given stream (by loading DEK envelope and decrypting it) v0 in name to emphasize the fact that key rotation is not yet implemented.

func (*SystemClient) Logger

func (s *SystemClient) Logger(prefix string) *log.Logger

type SystemConnector

type SystemConnector interface {
	DEKv0EnvelopeForNewStream(context.Context, eh.StreamName) (*envelopeenc.EnvelopeBundle, error)
	ResolveDEK(context.Context, eh.StreamName) ([]byte, error)
}

things we need from outside of client package to make the system as a whole work. this is basically to circumvent circular dependencies (using client required system stores which depend on Reader interface). this is also the reason for "ehclientfactory" package TODO: extract the interfaces the stores depend on, so we don't need this?

type Tenant

type Tenant struct {
	// contains filtered or unexported fields
}

func TenantId

func TenantId(id string) Tenant

func (Tenant) ChildStream

func (t Tenant) ChildStream(name string) eh.StreamName

return tenant's stream's (e.g. "/t-314") child-stream ChildStream("foobar") => "/t-314/foobar"

Directories

Path Synopsis
Test helpers for testing EventHorizon consumers.
Test helpers for testing EventHorizon consumers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL