Documentation ¶
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
Functions ¶
This section is empty.
Types ¶
type EventRepository ¶
type GormPersister ¶
type GormPersister struct {
// contains filtered or unexported fields
}
func NewGormPersister ¶
func NewGormPersister(db *gorm.DB) *GormPersister
func (*GormPersister) PersistInTx ¶
type GormRepository ¶
type GormRepository struct {
// contains filtered or unexported fields
}
func NewGormRepository ¶
func NewGormRepository(db *gorm.DB) *GormRepository
func (*GormRepository) MarkConsumed ¶
func (r *GormRepository) MarkConsumed(ctx context.Context, messages []*Message) error
type GormTestSuite ¶
type GormTestSuite struct { TestSuite // contains filtered or unexported fields }
GormTestSuite base test suite for gorm tests
func (*GormTestSuite) SetupTest ¶
func (suite *GormTestSuite) SetupTest()
type Message ¶
type Message struct { ID string EventType string Payload interface{} Exchange string RoutingKey string Consumed bool CreatedAt time.Time }
func GenerateMessages ¶
func NewMessage ¶
func (*Message) BytePayload ¶ added in v2.2.0
type PgxPersister ¶
type PgxPersister struct {
// contains filtered or unexported fields
}
func NewPgxPersister ¶
func NewPgxPersister(db *pgxpool.Pool) *PgxPersister
func (*PgxPersister) PersistInTx ¶
type PgxRepository ¶
type PgxRepository struct {
// contains filtered or unexported fields
}
func NewPgxOutboxRepository ¶
func NewPgxOutboxRepository(db *pgxpool.Pool) *PgxRepository
func (*PgxRepository) MarkConsumed ¶
func (r *PgxRepository) MarkConsumed(ctx context.Context, messages []*Message) error
type PgxTestSuite ¶
type PgxTestSuite struct { TestSuite // contains filtered or unexported fields }
PgxTestSuite base test suite for pgx tests
func (*PgxTestSuite) SetupTest ¶
func (suite *PgxTestSuite) SetupTest()
func (*PgxTestSuite) TearDownSuite ¶
func (suite *PgxTestSuite) TearDownSuite()
type PublisherMock ¶
type PublisherMock struct { Published []*Message // contains filtered or unexported fields }
type Relay ¶
type Relay struct {
// contains filtered or unexported fields
}
func (*Relay) Run ¶
Example ¶
ctx, cancel := context.WithCancel(context.Background()) defer cancel() c, err := pgxpool.New(ctx, "postgres://root:root@127.0.0.1:5432/db_name") if err != nil { log.Fatal(err) } p := NewPgxPersister(c) r := NewPgxOutboxRepository(c) if err = p.PersistInTx(ctx, func(tx pgx.Tx) ([]*Message, error) { return GenerateMessages(1000), nil }); err != nil { log.Fatal(err) } relay := NewRelay(r, publisherMock{}, runtime.NumCPU(), time.Second) if err = relay.Run(ctx, BatchSize(100)); err != nil { log.Fatal(err) }
Output:
type RepositoryMock ¶
type RepositoryMock struct { Messages []*Message Consumed []*Message Cursor int FetchErr bool // contains filtered or unexported fields }
func (*RepositoryMock) MarkConsumed ¶
func (m *RepositoryMock) MarkConsumed(ctx context.Context, messages []*Message) error
Click to show internal directories.
Click to hide internal directories.