po

package module
v0.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 16, 2020 License: Apache-2.0 Imports: 22 Imported by: 3

README

PO

Library to create Event Sourced Applications using Message Streams as the persistence layer.

Goals

  • Make it easy to use Message Streams for Persistence
  • Be a lightweight toolkit that plays well with others

Non-Goals

  • Handle async communication between multiple services

Features

  1. Writing Messages to a stream
  2. Projecting Messages onto a Handler
  3. Subscribing to message streams
    1. by stream
    2. by group
  4. Grouping of message streams
  5. Message ordering
    1. by stream
    2. by group

Planned

  1. Snapshots
    1. Projections
    2. Subscriptions

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewProtocolChannels

func NewProtocolChannels() *channels.Channels

func NewStoreInMemory

func NewStoreInMemory() *inmemory.InMemory

func NewStorePostgresDB

func NewStorePostgresDB(db *sql.DB) (*postgres.Storage, error)

func NewStorePostgresUrl

func NewStorePostgresUrl(connectionUrl string) (*postgres.Storage, error)

func RegisterMessages

func RegisterMessages(initializers ...registry.MessageUnmarshaller)

Types

type Broker

type Broker interface {
	Notify(ctx context.Context, records ...record.Record) error
	Register(ctx context.Context, subscriberId string, streamId streams.Id, subscriber streams.Handler) error
}

type CommandHandler added in v0.7.0

type CommandHandler interface {
	// Hydrates a stream
	Handle(ctx context.Context, msg streams.Message) error
	// Applies the command
	Execute(appender TransactionAppender) error
}

Implemented by commands. Contract is that the CommandHandler is hydrated with all messages on the stream is applied to. Thereafter the Execute method is called, allowing the CommandHandler to append messages within a transaction. Returning an error will cause a rollback. Otherwise all are discarded.

type Handler added in v0.7.0

type Handler interface {
	// Receives messages from a stream
	Handle(ctx context.Context, msg streams.Message) error
}

type HandlerFunc added in v0.7.0

type HandlerFunc func(ctx context.Context, msg streams.Message) error

Utility for functional style handlers

func (HandlerFunc) Handle added in v0.7.0

func (fn HandlerFunc) Handle(ctx context.Context, msg streams.Message) error

type Logger added in v0.5.0

type Logger interface {
	Debugf(template string, args ...interface{})
	Errorf(template string, args ...interface{})
	Infof(template string, args ...interface{})
	Errf(err error, template string, args ...interface{})
}

type Option

type Option func(opt *Options) error

func WithLogger added in v0.5.0

func WithLogger(logger Logger) Option

func WithPrometheus added in v0.6.0

func WithPrometheus(prom prometheus.Registerer) Option

func WithProtocol added in v0.5.0

func WithProtocol(protocol broker.Protocol) Option

func WithProtocolChannels

func WithProtocolChannels() Option

func WithRegistry added in v0.4.0

func WithRegistry(registry Registry) Option

func WithStore added in v0.5.0

func WithStore(store Store) Option

func WithStoreInMemory

func WithStoreInMemory() Option

func WithStorePostgresDB

func WithStorePostgresDB(db *sql.DB) Option

func WithStorePostgresUrl

func WithStorePostgresUrl(connectionUrl string) Option

type Options

type Options struct {
	// contains filtered or unexported fields
}

type Po

type Po struct {
	// contains filtered or unexported fields
}

func New

func New(store Store, protocol broker.Protocol) *Po

func NewFromOptions

func NewFromOptions(opts ...Option) (*Po, error)

func (*Po) Append added in v0.5.2

func (po *Po) Append(ctx context.Context, id streams.Id, messages ...interface{}) (int64, error)

func (*Po) Execute added in v0.5.1

func (po *Po) Execute(ctx context.Context, id streams.Id, exec CommandHandler) error

func (*Po) Project

func (po *Po) Project(ctx context.Context, id streams.Id, projection Handler) error

convenience method to load a stream and project it

func (*Po) Stream

func (po *Po) Stream(ctx context.Context, id streams.Id) *Stream

func (*Po) Subscribe

func (po *Po) Subscribe(ctx context.Context, subscriptionId string, id streams.Id, subscriber Handler) error

type Registry

type Registry interface {
	Unmarshal(typeName string, b []byte) (interface{}, error)
	Marshal(msg interface{}) ([]byte, string, error)
	ToMessage(r record.Record) (streams.Message, error)
}

type Store

type Store interface {
	WriteRecords(ctx context.Context, id streams.Id, data ...record.Data) ([]record.Record, error)
	WriteRecordsFrom(ctx context.Context, id streams.Id, position int64, data ...record.Data) ([]record.Record, error)
	ReadSnapshot(ctx context.Context, id streams.Id, snapshotId string) (record.Snapshot, error)
	UpdateSnapshot(ctx context.Context, id streams.Id, snapshotId string, snapshot record.Snapshot) error
	Begin(ctx context.Context) (store.Tx, error)
	SubscriptionPositionLock(tx store.Tx, id streams.Id, subscriptionIds ...string) ([]store.SubscriptionPosition, error)
	ReadRecords(ctx context.Context, id streams.Id, from, to, limit int64) ([]record.Record, error)
	SetSubscriptionPosition(tx store.Tx, id streams.Id, position store.SubscriptionPosition) error
}

type Stream

type Stream struct {
	Id streams.Id
	// contains filtered or unexported fields
}

Stream that uses Optimistic locking when appending to the message stream

func NewStream added in v0.8.0

func NewStream(ctx context.Context, streamId streams.Id, store Store, broker Broker, registry Registry) *Stream

func (*Stream) Append

func (stream *Stream) Append(messages ...interface{}) (int64, error)

func (*Stream) Execute added in v0.5.0

func (stream *Stream) Execute(exec CommandHandler) error

func (*Stream) Project

func (stream *Stream) Project(projection Handler) error

Projects all messages onto the given Handler. If the handler implements streams.NamedSnapshot, snapshotting will be performed.

The projection will also lock this Stream instance to the most recently read message number for the stream.

type TransactionAppender added in v0.5.0

type TransactionAppender interface {
	// appends tot he stream
	Append(messages ...interface{})
	// current size of the stream
	Size() int64
}

Append to a transaction. Messages will be written to the store on commit

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL