Documentation ¶
Index ¶
- func NewProtocolChannels() *channels.Channels
- func NewStoreInMemory() *inmemory.InMemory
- func NewStorePostgresDB(db *sql.DB) (*postgres.Storage, error)
- func NewStorePostgresUrl(connectionUrl string) (*postgres.Storage, error)
- func RegisterMessages(initializers ...registry.MessageUnmarshaller)
- type Broker
- type CommandHandler
- type Handler
- type HandlerFunc
- type Logger
- type Option
- func WithLogger(logger Logger) Option
- func WithPrometheus(prom prometheus.Registerer) Option
- func WithProtocol(protocol broker.Protocol) Option
- func WithProtocolChannels() Option
- func WithRegistry(registry Registry) Option
- func WithStore(store Store) Option
- func WithStoreInMemory() Option
- func WithStorePostgresDB(db *sql.DB) Option
- func WithStorePostgresUrl(connectionUrl string) Option
- type Options
- type Po
- func (po *Po) Append(ctx context.Context, id streams.Id, messages ...interface{}) (int64, error)
- func (po *Po) Execute(ctx context.Context, id streams.Id, exec CommandHandler) error
- func (po *Po) Project(ctx context.Context, id streams.Id, projection Handler) error
- func (po *Po) Stream(ctx context.Context, id streams.Id) *Stream
- func (po *Po) Subscribe(ctx context.Context, subscriptionId string, id streams.Id, subscriber Handler) error
- type Registry
- type Store
- type Stream
- type TransactionAppender
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewProtocolChannels ¶
func NewStoreInMemory ¶
func NewStorePostgresUrl ¶
func RegisterMessages ¶
func RegisterMessages(initializers ...registry.MessageUnmarshaller)
Types ¶
type CommandHandler ¶ added in v0.7.0
type CommandHandler interface { // Hydrates a stream Handle(ctx context.Context, msg streams.Message) error // Applies the command Execute(appender TransactionAppender) error }
Implemented by commands. Contract is that the CommandHandler is hydrated with all messages on the stream is applied to. Thereafter the Execute method is called, allowing the CommandHandler to append messages within a transaction. Returning an error will cause a rollback. Otherwise all are discarded.
type HandlerFunc ¶ added in v0.7.0
Utility for functional style handlers
type Option ¶
func WithLogger ¶ added in v0.5.0
func WithPrometheus ¶ added in v0.6.0
func WithPrometheus(prom prometheus.Registerer) Option
func WithProtocol ¶ added in v0.5.0
func WithProtocolChannels ¶
func WithProtocolChannels() Option
func WithRegistry ¶ added in v0.4.0
func WithStoreInMemory ¶
func WithStoreInMemory() Option
func WithStorePostgresDB ¶
func WithStorePostgresUrl ¶
type Po ¶
type Po struct {
// contains filtered or unexported fields
}
func NewFromOptions ¶
type Store ¶
type Store interface { WriteRecords(ctx context.Context, id streams.Id, data ...record.Data) ([]record.Record, error) WriteRecordsFrom(ctx context.Context, id streams.Id, position int64, data ...record.Data) ([]record.Record, error) ReadSnapshot(ctx context.Context, id streams.Id, snapshotId string) (record.Snapshot, error) UpdateSnapshot(ctx context.Context, id streams.Id, snapshotId string, snapshot record.Snapshot) error Begin(ctx context.Context) (store.Tx, error) SubscriptionPositionLock(tx store.Tx, id streams.Id, subscriptionIds ...string) ([]store.SubscriptionPosition, error) ReadRecords(ctx context.Context, id streams.Id, from, to, limit int64) ([]record.Record, error) SetSubscriptionPosition(tx store.Tx, id streams.Id, position store.SubscriptionPosition) error }
type Stream ¶
Stream that uses Optimistic locking when appending to the message stream
func (*Stream) Execute ¶ added in v0.5.0
func (stream *Stream) Execute(exec CommandHandler) error
type TransactionAppender ¶ added in v0.5.0
type TransactionAppender interface { // appends tot he stream Append(messages ...interface{}) // current size of the stream Size() int64 }
Append to a transaction. Messages will be written to the store on commit
Source Files ¶
Click to show internal directories.
Click to hide internal directories.