Documentation ¶
Index ¶
- Constants
- Variables
- func NewSnapshotStore(client Client, options ...SnapshotStoreOption) es.AggregateRootStoreMiddleware
- func ReceiverSessionMiddleware(conn *pgxpool.Pool, logger edatlog.Logger) func(msg.MessageReceiver) msg.MessageReceiver
- func RpcSessionStreamInterceptor(_ *pgxpool.Pool, logger edatlog.Logger) grpc.StreamServerInterceptor
- func RpcSessionUnrayInterceptor(conn *pgxpool.Pool, logger edatlog.Logger) grpc.UnaryServerInterceptor
- func WebSessionMiddleware(conn *pgxpool.Pool, logger edatlog.Logger) func(http.Handler) http.Handler
- type Client
- type EventStore
- type EventStoreOption
- type MessageStore
- func (s *MessageStore) Close(ctx context.Context) error
- func (s *MessageStore) Fetch(ctx context.Context, limit int) ([]outbox.Message, error)
- func (s *MessageStore) MarkPublished(ctx context.Context, messageIDs []string) error
- func (s *MessageStore) PurgePublished(ctx context.Context, olderThan time.Duration) error
- func (s *MessageStore) Save(ctx context.Context, message outbox.Message) error
- func (s *MessageStore) Send(ctx context.Context, channel string, message msg.Message) error
- type MessageStoreOption
- type SagaInstanceStore
- type SagaInstanceStoreOption
- type SnapshotStore
- type SnapshotStoreOption
Constants ¶
View Source
const ( DefaultEventTableName = "events" DefaultMessageTableName = "messages" DefaultSagaInstanceTableName = "saga_instances" DefaultSnapshotTableName = "snapshots" CreateEventsTableSQL = `` /* 429-byte string literal not displayed */ CreateMessagesTableSQL = `` /* 386-byte string literal not displayed */ CreateMessagesUnpublishedIndexSQL = `CREATE INDEX unpublished_idx ON messages (created_at) WHERE not published` CreateMessagesPublishedIndexSQL = `CREATE INDEX published_idx ON messages (modified_at) WHERE published` CreateSagaInstancesTableSQL = `` /* 410-byte string literal not displayed */ CreateSnapshotsTableSQL = `` /* 344-byte string literal not displayed */ )
Variables ¶
View Source
var ErrInvalidTxValue = errors.New("tx value is not a pgx.Tx type")
View Source
var ErrTxNotInContext = errors.New("pgx.Tx is not set for session")
Functions ¶
func NewSnapshotStore ¶ added in v1.2.3
func NewSnapshotStore(client Client, options ...SnapshotStoreOption) es.AggregateRootStoreMiddleware
func ReceiverSessionMiddleware ¶
func ReceiverSessionMiddleware(conn *pgxpool.Pool, logger edatlog.Logger) func(msg.MessageReceiver) msg.MessageReceiver
func RpcSessionStreamInterceptor ¶
func RpcSessionStreamInterceptor(_ *pgxpool.Pool, logger edatlog.Logger) grpc.StreamServerInterceptor
func RpcSessionUnrayInterceptor ¶
func RpcSessionUnrayInterceptor(conn *pgxpool.Pool, logger edatlog.Logger) grpc.UnaryServerInterceptor
Types ¶
type Client ¶
type Client interface { Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error) Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error) QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row QueryFunc(ctx context.Context, sql string, args []interface{}, scans []interface{}, f func(pgx.QueryFuncRow) error) (pgconn.CommandTag, error) SendBatch(ctx context.Context, b *pgx.Batch) pgx.BatchResults Begin(ctx context.Context) (pgx.Tx, error) BeginTx(ctx context.Context, txOptions pgx.TxOptions) (pgx.Tx, error) }
Client covers a subset of what both pgx.Conn or pgxpool.Pool provide
func NewSessionClient ¶
func NewSessionClient() Client
NewSessionClient returns a pgx.Conn or pgxpool.Pool compatible client that uses an active transaction from context
type EventStore ¶
type EventStore struct {
// contains filtered or unexported fields
}
func NewEventStore ¶
func NewEventStore(cliet Client, options ...EventStoreOption) *EventStore
func (*EventStore) Load ¶
func (s *EventStore) Load(ctx context.Context, root *es.AggregateRoot) error
func (*EventStore) Save ¶
func (s *EventStore) Save(ctx context.Context, root *es.AggregateRoot) (err error)
type EventStoreOption ¶
type EventStoreOption func(store *EventStore)
func WithEventStoreLogger ¶
func WithEventStoreLogger(logger edatlog.Logger) EventStoreOption
func WithEventStoreTableName ¶
func WithEventStoreTableName(tableName string) EventStoreOption
type MessageStore ¶
type MessageStore struct {
// contains filtered or unexported fields
}
func NewMessageStore ¶
func NewMessageStore(client Client, options ...MessageStoreOption) *MessageStore
func (*MessageStore) MarkPublished ¶
func (s *MessageStore) MarkPublished(ctx context.Context, messageIDs []string) error
func (*MessageStore) PurgePublished ¶
type MessageStoreOption ¶
type MessageStoreOption func(store *MessageStore)
func WithMessageStoreLogger ¶
func WithMessageStoreLogger(logger edatlog.Logger) MessageStoreOption
func WithMessageStoreTableName ¶
func WithMessageStoreTableName(tableName string) MessageStoreOption
type SagaInstanceStore ¶ added in v1.2.14
type SagaInstanceStore struct {
// contains filtered or unexported fields
}
func NewSagaInstanceStore ¶ added in v1.2.14
func NewSagaInstanceStore(client Client, options ...SagaInstanceStoreOption) *SagaInstanceStore
type SagaInstanceStoreOption ¶ added in v1.2.14
type SagaInstanceStoreOption func(store *SagaInstanceStore)
func WithSagaInstanceStoreLogger ¶ added in v1.2.14
func WithSagaInstanceStoreLogger(logger edatlog.Logger) SagaInstanceStoreOption
func WithSagaInstanceStoreTableName ¶ added in v1.2.14
func WithSagaInstanceStoreTableName(tableName string) SagaInstanceStoreOption
type SnapshotStore ¶ added in v1.2.3
type SnapshotStore struct {
// contains filtered or unexported fields
}
func (*SnapshotStore) Load ¶ added in v1.2.3
func (s *SnapshotStore) Load(ctx context.Context, root *es.AggregateRoot) error
func (*SnapshotStore) Save ¶ added in v1.2.3
func (s *SnapshotStore) Save(ctx context.Context, root *es.AggregateRoot) error
type SnapshotStoreOption ¶ added in v1.2.3
type SnapshotStoreOption func(store *SnapshotStore)
func WithSnapshotStoreLogger ¶ added in v1.2.3
func WithSnapshotStoreLogger(logger edatlog.Logger) SnapshotStoreOption
func WithSnapshotStoreStrategy ¶ added in v1.2.3
func WithSnapshotStoreStrategy(strategy es.SnapshotStrategy) SnapshotStoreOption
func WithSnapshotStoreTableName ¶ added in v1.2.3
func WithSnapshotStoreTableName(tableName string) SnapshotStoreOption
Source Files ¶
Click to show internal directories.
Click to hide internal directories.