Documentation ¶
Index ¶
- Variables
- type BatchSize
- type EventRepository
- type Message
- type PersistFunc
- type PgxRepository
- type Publisher
- type PublisherMock
- type Relay
- type RepositoryMock
- func (m *RepositoryMock) Fetch(ctx context.Context, batchSize BatchSize) ([]*Message, error)
- func (m *RepositoryMock) MarkConsumed(ctx context.Context, messages []*Message) error
- func (m *RepositoryMock) Persist(ctx context.Context, messages []*Message) error
- func (m *RepositoryMock) PersistInTx(ctx context.Context, f PersistFunc) error
Examples ¶
Constants ¶
This section is empty.
Variables ¶
Functions ¶
This section is empty.
Types ¶
type EventRepository ¶
type Message ¶
type Message struct { ID string EventType string Payload interface{} Exchange string RoutingKey string Consumed bool CreatedAt time.Time }
func GenerateMessages ¶
type PersistFunc ¶
type PgxRepository ¶
type PgxRepository struct {
// contains filtered or unexported fields
}
func NewPgxOutboxRepository ¶
func NewPgxOutboxRepository(db *pgxpool.Pool) *PgxRepository
func (*PgxRepository) MarkConsumed ¶
func (r *PgxRepository) MarkConsumed(ctx context.Context, messages []*Message) error
func (*PgxRepository) PersistInTx ¶
func (r *PgxRepository) PersistInTx(ctx context.Context, fn PersistFunc) error
type PublisherMock ¶
type PublisherMock struct { Published []*Message // contains filtered or unexported fields }
type Relay ¶
type Relay struct {
// contains filtered or unexported fields
}
func (*Relay) Run ¶
Example ¶
ctx, cancel := context.WithCancel(context.Background()) defer cancel() c, err := pgxpool.Connect(ctx, "postgres://root:root@127.0.0.1:5432/db_name") if err != nil { log.Fatal(err) } r := NewPgxOutboxRepository(c) if err = r.PersistInTx(ctx, func(tx pgx.Tx) ([]*Message, error) { return GenerateMessages(1000), nil }); err != nil { log.Fatal(err) } relay := NewRelay(r, publisherMock{}, runtime.NumCPU(), time.Second) if err = relay.Run(ctx, BatchSize(100)); err != nil { log.Fatal(err) }
Output:
type RepositoryMock ¶
type RepositoryMock struct { Messages []*Message Consumed []*Message Cursor int FetchErr bool // contains filtered or unexported fields }
func (*RepositoryMock) MarkConsumed ¶
func (m *RepositoryMock) MarkConsumed(ctx context.Context, messages []*Message) error
func (*RepositoryMock) Persist ¶
func (m *RepositoryMock) Persist(ctx context.Context, messages []*Message) error
func (*RepositoryMock) PersistInTx ¶
func (m *RepositoryMock) PersistInTx(ctx context.Context, f PersistFunc) error
Click to show internal directories.
Click to hide internal directories.