outbox

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 25, 2023 License: Apache-2.0 Imports: 10 Imported by: 0

README

Golang simple transactional outbox

Transactional outbox based on polling publisher for PostgreSQL.

Features:

  • Persist messages
  • Publish message batch
  • Publish message in worker pool
  • Manage delay between batch publishing
  • Create custom publisher
  • Create custom repository
  • Use custom outbox table

Basic initialization with pgx Repository

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"runtime"
	"time"

	"github.com/jackc/pgx/v4/pgxpool"
	"github.com/vsvp21/outbox"
)

type publisherMock struct{}
func (p publisherMock) Publish(exchange, topic string, message *outbox.Message) error {
	payload, err := json.Marshal(message.Payload)
	if err != nil {
		return err
	}

	fmt.Printf("published message to topic: %s, payload: %s", topic, string(payload))

	return nil
}

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	c, err := pgxpool.Connect(ctx, "postgres://root:root@127.0.0.1:5432/db_name")
	if err != nil {
		log.Fatal(err)
	}

	r := outbox.NewPgxOutboxRepository(c)

	relay := outbox.NewRelay(r, publisherMock{}, runtime.NumCPU(), time.Second)
	if err = relay.Run(ctx, outbox.BatchSize(100)); err != nil {
		log.Fatal(err)
	}
}

Custom outbox table:

package main

import "github.com/vsvp21/outbox"

func main() {
	// Your code ...
	outbox.TableName = "custom"
	// Your code ...
}

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	ErrBatchSizeOutOfRange = errors.New("invalid batch size")

	TableName                 = "outbox_messages"
	PublishRetryDelay         = time.Second
	PublishRetryAttempts uint = 3
)

Functions

This section is empty.

Types

type BatchSize

type BatchSize uint

func (BatchSize) Valid

func (b BatchSize) Valid() error

type EventRepository

type EventRepository interface {
	PersistInTx(ctx context.Context, f PersistFunc) error
	Fetch(ctx context.Context, batchSize BatchSize) ([]*Message, error)
	MarkConsumed(ctx context.Context, messages []*Message) error
}

type Message

type Message struct {
	ID         string
	EventType  string
	Payload    interface{}
	Exchange   string
	RoutingKey string
	Consumed   bool
	CreatedAt  time.Time
}

func GenerateMessages

func GenerateMessages(n int) []*Message

func NewMessage

func NewMessage(id string, eventType string, payload interface{}, exchange, routingKey string) *Message

type PersistFunc

type PersistFunc func(tx pgx.Tx) ([]*Message, error)

type PgxRepository

type PgxRepository struct {
	// contains filtered or unexported fields
}

func NewPgxOutboxRepository

func NewPgxOutboxRepository(db *pgxpool.Pool) *PgxRepository

func (*PgxRepository) Fetch

func (r *PgxRepository) Fetch(ctx context.Context, batchSize BatchSize) ([]*Message, error)

func (*PgxRepository) MarkConsumed

func (r *PgxRepository) MarkConsumed(ctx context.Context, messages []*Message) error

func (*PgxRepository) PersistInTx

func (r *PgxRepository) PersistInTx(ctx context.Context, fn PersistFunc) error

type Publisher

type Publisher interface {
	Publish(exchange, topic string, message *Message) error
}

type PublisherMock

type PublisherMock struct {
	Published []*Message
	// contains filtered or unexported fields
}

func (*PublisherMock) Publish

func (p *PublisherMock) Publish(exchange, topic string, message *Message) error

type Relay

type Relay struct {
	// contains filtered or unexported fields
}

func NewRelay

func NewRelay(repo EventRepository, publisher Publisher, publishWorkerPoolSize int, publishDelay time.Duration) *Relay

func (*Relay) Run

func (r *Relay) Run(ctx context.Context, batchSize BatchSize) error
Example
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

c, err := pgxpool.Connect(ctx, "postgres://root:root@127.0.0.1:5432/db_name")
if err != nil {
	log.Fatal(err)
}

r := NewPgxOutboxRepository(c)
if err = r.PersistInTx(ctx, func(tx pgx.Tx) ([]*Message, error) {
	return GenerateMessages(1000), nil
}); err != nil {
	log.Fatal(err)
}

relay := NewRelay(r, publisherMock{}, runtime.NumCPU(), time.Second)
if err = relay.Run(ctx, BatchSize(100)); err != nil {
	log.Fatal(err)
}
Output:

type RepositoryMock

type RepositoryMock struct {
	Messages []*Message
	Consumed []*Message
	Cursor   int
	FetchErr bool
	// contains filtered or unexported fields
}

func (*RepositoryMock) Fetch

func (m *RepositoryMock) Fetch(ctx context.Context, batchSize BatchSize) ([]*Message, error)

func (*RepositoryMock) MarkConsumed

func (m *RepositoryMock) MarkConsumed(ctx context.Context, messages []*Message) error

func (*RepositoryMock) Persist

func (m *RepositoryMock) Persist(ctx context.Context, messages []*Message) error

func (*RepositoryMock) PersistInTx

func (m *RepositoryMock) PersistInTx(ctx context.Context, f PersistFunc) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL