outbox

package module
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 15, 2024 License: MIT Imports: 6 Imported by: 0

README

Github Go Reference Go Report Card

Outbox

An implementation of Transactional outbox pattern for reliable publishing the messages.

Infrastructure support

Currently, outbox worker can fetch outbox rows from below mentioned DB.

  • Postgres
  • MySQL

and can publish messages to below mentioned PubSub systems

  • Amazon SQS
  • RabbitMQ

Installation

go get github.com/kamal-github/outbox

Usage

Please have a look at the well commented examples

Contribution

To run tests, run blow command, it will fetch all the prerequisites and run the tests.

make test

Pull requests are welcome. Please fork it and send a pull request against main branch. Make sure to add tests ;)

License

This project is licensed under the MIT license.

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Worker

type Worker struct {
	MineSweeper  datastore.MineSweeper
	Dispatcher   pubsub.Dispatcher
	MineInterval time.Duration

	Logger *zap.Logger
}

Worker is the outbox worker which runs repeatedly until asked to stop.

Example (Rabbit_with_pg)
package main

import (
	"context"
	"database/sql"
	"os"
	"os/signal"
	"time"

	"github.com/kamal-github/outbox"
	"github.com/kamal-github/outbox/datastore"
	"github.com/kamal-github/outbox/pubsub"
	"go.uber.org/zap"
)

func main() {
	ctx := context.Background()

	// Setup log
	logger, err := zap.NewProduction()
	if err != nil {
		panic(err)
	}

	// Connect to Postgres
	dsName := "postgres://postgres:password@localhost:5432/test-outbox?sslmode=disable"
	dbConn, err := connectToSQLDB("postgres", dsName)
	if err != nil {
		panic(err)
	}

	// Setup Postgres as Minesweeper
	mineSweeper, err := datastore.NewPostgres(dbConn, "outbox", logger)
	if err != nil {
		panic(err)
	}
	defer mineSweeper.Close()

	// Setup RabbitMQ as PubSub
	dispatcher, err := pubsub.NewRabbitMQ("", mineSweeper, logger)
	if err != nil {
		panic(err)
	}
	defer dispatcher.Close()

	// Graceful shutdown
	sig := make(chan os.Signal, 1)
	signal.Notify(sig, os.Interrupt)

	ctx, cancel := context.WithCancel(ctx)
	defer cancel()

	workerDone := make(chan struct{})

	// Run worker in a separate go routine.
	go outbox.Worker{
		MineSweeper:  mineSweeper,
		Dispatcher:   dispatcher,
		Logger:       logger,
		MineInterval: 2 * time.Second,
	}.Start(ctx, workerDone)

	<-sig
	cancel()

	<-workerDone
}

func connectToSQLDB(driver, dsName string) (*sql.DB, error) {
	db, err := sql.Open(driver, dsName)
	if err != nil {
		return db, err
	}

	return db, nil
}
Output:

Example (Sqs_with_pg)
package main

import (
	"context"
	"database/sql"
	"os"
	"os/signal"
	"time"

	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/sqs"

	"github.com/kamal-github/outbox"
	"github.com/kamal-github/outbox/datastore"
	"github.com/kamal-github/outbox/pubsub"
	"go.uber.org/zap"
)

func main() {
	ctx := context.Background()

	// Setup log
	logger, err := zap.NewProduction()
	if err != nil {
		panic(err)
	}

	// Connect to Postgres
	dsName := "postgres://postgres:password@localhost:5432/test-outbox?sslmode=disable"
	dbConn, err := connectToSQLDB("postgres", dsName)
	if err != nil {
		panic(err)
	}

	// Setup Postgres as Minesweeper
	mineSweeper, err := datastore.NewPostgres(dbConn, "outbox", logger)
	if err != nil {
		panic(err)
	}
	defer mineSweeper.Close()

	// Setup AWS session and SQS connection
	awsSession := session.Must(session.NewSession())
	sqsConn := sqs.New(awsSession)

	// Setup SQS as PubSub
	dispatcher, err := pubsub.NewSimpleQueueService(sqsConn, mineSweeper, logger)
	if err != nil {
		panic(err)
	}
	defer dispatcher.Close()

	// Graceful shutdown
	sig := make(chan os.Signal, 1)
	signal.Notify(sig, os.Interrupt)

	ctx, cancel := context.WithCancel(ctx)
	defer cancel()

	workerDone := make(chan struct{})

	// Run worker in a separate go routine.
	go outbox.Worker{
		MineSweeper:  mineSweeper,
		Dispatcher:   dispatcher,
		Logger:       logger,
		MineInterval: 2 * time.Second,
	}.Start(ctx, workerDone)

	<-sig
	cancel()

	<-workerDone
}

func connectToSQLDB(driver, dsName string) (*sql.DB, error) {
	db, err := sql.Open(driver, dsName)
	if err != nil {
		return db, err
	}

	return db, nil
}
Output:

func (Worker) Start

func (w Worker) Start(ctx context.Context, done chan<- struct{})

Start starts the outbox worker and iterative looks for new outbox rows (ready to process) after each given MineInterval and publishes to one of the configured Messaging system.

When no ready to process message are found, it keep looking for new ones.

Exit as soon as ctx is cancelled.

Directories

Path Synopsis
Package datastore maintains all the datastorage implementation.
Package datastore maintains all the datastorage implementation.
Package event contains details related to event defined as Outbox Row in Datastore.
Package event contains details related to event defined as Outbox Row in Datastore.
internal
Package pubsub contains various implementation for event dispatcher.
Package pubsub contains various implementation for event dispatcher.
mocks
Package sweepermock is a generated GoMock package.
Package sweepermock is a generated GoMock package.
Package test contains integration tests and related configuration.
Package test contains integration tests and related configuration.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL