outbox

package module
v2.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 23, 2023 License: Apache-2.0 Imports: 17 Imported by: 0

README

Golang simple transactional outbox

Transactional outbox based on polling publisher for PostgreSQL.

Features:

  • Persist messages
  • Publish message batch
  • Publish message in worker pool
  • Manage delay between batch publishing
  • Create custom publisher
  • Create custom repository
  • Use custom outbox table

Drivers:

  • pgx
  • gorm

Basic initialization with pgx Repository

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"runtime"
	"time"

	"github.com/jackc/pgx/v5/pgxpool"
	"github.com/vsvp21/outbox"
)

type publisherMock struct{}
func (p publisherMock) Publish(exchange, topic string, message *outbox.Message) error {
	payload, err := json.Marshal(message.Payload)
	if err != nil {
		return err
	}

	fmt.Printf("published message to topic: %s, payload: %s", topic, string(payload))

	return nil
}

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	c, err := pgxpool.New(ctx, "postgres://root:root@127.0.0.1:5432/db_name")
	if err != nil {
		log.Fatal(err)
	}

	r := outbox.NewPgxOutboxRepository(c)

	relay := outbox.NewRelay(r, publisherMock{}, runtime.NumCPU(), time.Second)
	if err = relay.Run(ctx, outbox.BatchSize(100)); err != nil {
		log.Fatal(err)
	}
}

Basic initialization with gorm Repository

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"runtime"
	"time"

	"gorm.io/driver/postgres"
	"gorm.io/gorm"

	"github.com/vsvp21/outbox"
)

type publisherMock struct{}
func (p publisherMock) Publish(exchange, topic string, message *outbox.Message) error {
	payload, err := json.Marshal(message.Payload)
	if err != nil {
		return err
	}

	fmt.Printf("published message to topic: %s, payload: %s", topic, string(payload))

	return nil
}

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	db, err := gorm.Open(postgres.New(postgres.Config{
		DSN: "host=127.0.0.1 user=db_user password=secretsecret dbname=test_db port=5432 sslmode=disable",
	}), &gorm.Config{})
	if err != nil {
		log.Fatal(err)
	}

	r := outbox.NewGormRepository(c)

	relay := outbox.NewRelay(r, publisherMock{}, runtime.NumCPU(), time.Second)
	if err = relay.Run(ctx, outbox.BatchSize(100)); err != nil {
		log.Fatal(err)
	}
}

Custom outbox table:

package main

import "github.com/vsvp21/outbox"

func main() {
	// Your code ...
	outbox.TableName = "custom"
	// Your code ...
}

Pgx Persister

package main

import (
	"context"
	"github.com/jackc/pgx/v5"
	"github.com/jackc/pgx/v5/pgxpool"
	"github.com/vsvp21/outbox"
	"log"
)

func main() {
	db, err := pgxpool.New(context.TODO(), "postgres://root:root@127.0.0.1:5432/db_name")
	if err != nil {
		log.Fatal(err)
	}

	p := outbox.NewPgxPersister(db)
	p.PersistInTx(context.TODO(), func(tx pgx.Tx) ([]*outbox.Message, error) {
		// SQL Queries
		return []*outbox.Message{}, nil
	})
}

Gorm Persister

package main

import (
	"github.com/vsvp21/outbox"
	"gorm.io/gorm"
	"log"
)

func main() {
	db, err := gorm.Open(postgres.New(postgres.Config{
		DSN: "host=127.0.0.1 user=db_user password=secretsecret dbname=test_db port=5432 sslmode=disable",
	}), &gorm.Config{})
	if err != nil {
		log.Fatal(err)
	}

	p := outbox.NewGormPersister(db)
	p.PersistInTx(func(tx *gorm.DB) ([]*outbox.Message, error) {
		// SQL Queries
		return []*outbox.Message{}, nil
	})
}

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	ErrBatchSizeOutOfRange = errors.New("invalid batch size")

	TableName                 = "outbox_messages"
	PublishRetryDelay         = time.Second
	PublishRetryAttempts uint = 3
)

Functions

This section is empty.

Types

type BatchSize

type BatchSize uint

func (BatchSize) Valid

func (b BatchSize) Valid() error

type EventRepository

type EventRepository interface {
	Fetch(ctx context.Context, batchSize BatchSize) ([]*Message, error)
	MarkConsumed(ctx context.Context, messages []*Message) error
}

type GormPersister

type GormPersister struct {
	// contains filtered or unexported fields
}

func NewGormPersister

func NewGormPersister(db *gorm.DB) *GormPersister

func (*GormPersister) PersistInTx

func (r *GormPersister) PersistInTx(fn func(tx *gorm.DB) ([]*Message, error)) error

type GormRepository

type GormRepository struct {
	// contains filtered or unexported fields
}

func NewGormRepository

func NewGormRepository(db *gorm.DB) *GormRepository

func (*GormRepository) Fetch

func (r *GormRepository) Fetch(ctx context.Context, batchSize BatchSize) ([]*Message, error)

func (*GormRepository) MarkConsumed

func (r *GormRepository) MarkConsumed(ctx context.Context, messages []*Message) error

type GormTestSuite

type GormTestSuite struct {
	TestSuite
	// contains filtered or unexported fields
}

GormTestSuite base test suite for gorm tests

func (*GormTestSuite) SetupTest

func (suite *GormTestSuite) SetupTest()

type Message

type Message struct {
	ID         string
	EventType  string
	Payload    interface{}
	Exchange   string
	RoutingKey string
	Consumed   bool
	CreatedAt  time.Time
}

func GenerateMessages

func GenerateMessages(n int) []*Message

func NewMessage

func NewMessage(id string, eventType string, payload interface{}, exchange, routingKey string) *Message

func (*Message) BytePayload added in v2.2.0

func (m *Message) BytePayload() ([]byte, error)

type PgxPersister

type PgxPersister struct {
	// contains filtered or unexported fields
}

func NewPgxPersister

func NewPgxPersister(db *pgxpool.Pool) *PgxPersister

func (*PgxPersister) PersistInTx

func (r *PgxPersister) PersistInTx(ctx context.Context, fn func(tx pgx.Tx) ([]*Message, error)) error

type PgxRepository

type PgxRepository struct {
	// contains filtered or unexported fields
}

func NewPgxOutboxRepository

func NewPgxOutboxRepository(db *pgxpool.Pool) *PgxRepository

func (*PgxRepository) Fetch

func (r *PgxRepository) Fetch(ctx context.Context, batchSize BatchSize) ([]*Message, error)

func (*PgxRepository) MarkConsumed

func (r *PgxRepository) MarkConsumed(ctx context.Context, messages []*Message) error

type PgxTestSuite

type PgxTestSuite struct {
	TestSuite
	// contains filtered or unexported fields
}

PgxTestSuite base test suite for pgx tests

func (*PgxTestSuite) SetupTest

func (suite *PgxTestSuite) SetupTest()

func (*PgxTestSuite) TearDownSuite

func (suite *PgxTestSuite) TearDownSuite()

type Publisher

type Publisher interface {
	Publish(exchange, topic string, message *Message) error
}

type PublisherMock

type PublisherMock struct {
	Published []*Message
	// contains filtered or unexported fields
}

func (*PublisherMock) Publish

func (p *PublisherMock) Publish(exchange, topic string, message *Message) error

type Relay

type Relay struct {
	// contains filtered or unexported fields
}

func NewRelay

func NewRelay(repo EventRepository, publisher Publisher, publishWorkerPoolSize int, publishDelay time.Duration) *Relay

func (*Relay) Run

func (r *Relay) Run(ctx context.Context, batchSize BatchSize) error
Example
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

c, err := pgxpool.New(ctx, "postgres://root:root@127.0.0.1:5432/db_name")
if err != nil {
	log.Fatal(err)
}

p := NewPgxPersister(c)
r := NewPgxOutboxRepository(c)
if err = p.PersistInTx(ctx, func(tx pgx.Tx) ([]*Message, error) {
	return GenerateMessages(1000), nil
}); err != nil {
	log.Fatal(err)
}

relay := NewRelay(r, publisherMock{}, runtime.NumCPU(), time.Second)
if err = relay.Run(ctx, BatchSize(100)); err != nil {
	log.Fatal(err)
}
Output:

type RepositoryMock

type RepositoryMock struct {
	Messages []*Message
	Consumed []*Message
	Cursor   int
	FetchErr bool
	// contains filtered or unexported fields
}

func (*RepositoryMock) Fetch

func (m *RepositoryMock) Fetch(ctx context.Context, batchSize BatchSize) ([]*Message, error)

func (*RepositoryMock) MarkConsumed

func (m *RepositoryMock) MarkConsumed(ctx context.Context, messages []*Message) error

type TestSuite

type TestSuite struct {
	suite.Suite
	// contains filtered or unexported fields
}

TestSuite base test suite

func (*TestSuite) SetupTest

func (suite *TestSuite) SetupTest()

func (*TestSuite) TearDownSuite

func (suite *TestSuite) TearDownSuite()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL