outbox

package module
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 23, 2023 License: Apache-2.0 Imports: 9 Imported by: 0

README

Outbox Implementation in Go

golangci-lint codecov

This project provides a sample implementation of the Transactional Outbox Pattern in Go

Features

  • Send messages within a sql.Tx transaction through the Outbox Pattern
  • Optional Maximum attempts limit for a specific message
  • Outbox row locking so that concurrent outbox workers don't process the same records
    • Includes a background worker that cleans record locks after a specified time
  • Message Retention. A configurable cleanup worker removes old records after a configurable duration has passed.
  • Extensible message broker interface
  • Extensible data store interface for sql databases

Currently supported providers

Message Brokers
  • Kafka
Database Providers
  • MySQL

Usage

For a full example of a mySQL outbox using a Kafka broker check the example here

Create the outbox table

The following script creates the outbox table in mySQL

CREATE TABLE outbox (
        id varchar(100) NOT NULL,
        data BLOB NOT NULL,
        state INT NOT NULL,
        created_on DATETIME NOT NULL,
        locked_by varchar(100) NULL,
        locked_on DATETIME NULL,
        processed_on DATETIME NULL,
        number_of_attempts INT NOT NULL,
        last_attempted_on DATETIME NULL,
        error varchar(1000) NULL
)

Send a message via the outbox service


type SampleMessage struct {
	message string
}

func main() {
  
  //Setup the mysql store
	sqlSettings := mysql.Settings{
		MySQLUsername: "root",
		MySQLPass:     "a123456",
		MySQLHost:     "localhost",
		MySQLDB:       "outbox",
		MySQLPort:     "3306",
	}
	store, err := mysql.NewStore(sqlSettings)
	if err != nil {
		fmt.Sprintf("Could not initialize the store: %v", err)
		os.Exit(1)
	}
  
  // Initialize the outbox
	repo := outbox.New(store)

	db, _ := sql.Open("mysql",
		fmt.Sprintf("%v:%v@tcp(%v:%v)/%v?parseTime=True",
			sqlSettings.MySQLUsername, sqlSettings.MySQLPass, sqlSettings.MySQLHost, sqlSettings.MySQLPort, sqlSettings.MySQLDB))

  // Open the transaction
	tx, _ := db.BeginTx(context.Background(), nil)

  // Encode the message in a string format
	encodedData, _ := json.Marshal(SampleMessage{message: "ok"})
  
  // Send the message
	repo.Add(outbox.Message{
		Key:   "sampleKey",
		Body:  encodedData,
		Topic: "sampleTopic",
	}, tx)
  
	tx.Commit()
}

Start the outbox dispatcher

The dispatcher can run on the same or different instance of the application that uses the outbox. Once the dispatcher starts, it will periodically check for new outbox messages and push them to the kafka broker

func main() {
  
  //Setup the mysql store
	sqlSettings := mysql.Settings{
		MySQLUsername: "root",
		MySQLPass:     "a123456",
		MySQLHost:     "localhost",
		MySQLDB:       "outbox",
		MySQLPort:     "3306",
	}
	store, err := mysql.NewStore(sqlSettings)
	if err != nil {
		fmt.Sprintf("Could not initialize the store: %v", err)
		os.Exit(1)
	}
  
  //Setup the kafka message broker
	c := sarama.NewConfig()
	c.Producer.Return.Successes = true
	broker, err := kafka.NewBroker([]string{"localhost:29092"}, c)
	if err != nil {
		fmt.Sprintf("Could not initialize the message broker: %v", err)
		os.Exit(1)
	}

  // Initialize the dispatcher
	
	settings := outbox.DispatcherSettings{
      ProcessInterval:           20 * time.Second,
      LockCheckerInterval:       600 * time.Minute,
      CleanupWorkerInterval:     60 * time.Second,
      MaxLockTimeDuration:       5 * time.Minute,
      MessagesRetentionDuration: 1 * time.Minute,
	}
  
    d := outbox.NewDispatcher(store, broker, settings, "1")

  // Run the dispatcher
	errChan := make(chan error)
	doneChan := make(chan struct{})
	d.Run(errChan, doneChan)
	defer func() { doneChan <- struct{}{} }()
	err = <-errChan
	fmt.Printf(err.Error())
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

Dispatcher initializes and runs the outbox dispatcher

func NewDispatcher

func NewDispatcher(store Store, broker MessageBroker, settings DispatcherSettings, machineID string) *Dispatcher

NewDispatcher constructor

func (Dispatcher) Run

func (d Dispatcher) Run(errChan chan<- error, doneChan <-chan struct{})

Run periodically checks for new outbox messages from the Store, sends the messages through the MessageBroker and updates the message status accordingly

type DispatcherSettings

type DispatcherSettings struct {
	ProcessInterval           time.Duration
	LockCheckerInterval       time.Duration
	MaxLockTimeDuration       time.Duration
	CleanupWorkerInterval     time.Duration
	RetrialPolicy             RetrialPolicy
	MessagesRetentionDuration time.Duration
}

DispatcherSettings defines the set of configurations for the dispatcher

type Message

type Message struct {
	Key     string
	Headers []MessageHeader
	Body    []byte
	Topic   string
}

Message encapsulates the contents of the message to be sent

type MessageBroker

type MessageBroker interface {
	Send(message Message) error
}

MessageBroker provides an interface for message brokers to send Message objects

type MessageHeader

type MessageHeader struct {
	Key   string
	Value string
}

MessageHeader is the MessageHeader of the Message to be sent. It is used by Brokers

type MockBroker

type MockBroker struct {
	mock.Mock
}

MockBroker mocks the Broker interface

func (*MockBroker) Send

func (m *MockBroker) Send(message Message) error

Send method mock

type MockStore

type MockStore struct {
	mock.Mock
}

MockStore mocks the Store

func (*MockStore) AddRecordTx

func (m *MockStore) AddRecordTx(record Record, tx *sql.Tx) error

AddRecordTx method mock

func (*MockStore) ClearLocksByLockID

func (m *MockStore) ClearLocksByLockID(lockID string) error

ClearLocksByLockID method mock

func (*MockStore) ClearLocksWithDurationBeforeDate

func (m *MockStore) ClearLocksWithDurationBeforeDate(time time.Time) error

ClearLocksWithDurationBeforeDate method mock

func (*MockStore) GetRecordsByLockID

func (m *MockStore) GetRecordsByLockID(lockID string) ([]Record, error)

GetRecordsByLockID method mock

func (*MockStore) RemoveRecordsBeforeDatetime

func (m *MockStore) RemoveRecordsBeforeDatetime(expiryTime time.Time) error

RemoveRecordsBeforeDatetime method mock

func (*MockStore) UpdateRecordByID

func (m *MockStore) UpdateRecordByID(message Record) error

UpdateRecordByID method mock

func (*MockStore) UpdateRecordLockByState

func (m *MockStore) UpdateRecordLockByState(lockID string, lockedOn time.Time, state RecordState) error

UpdateRecordLockByState method mock

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher encapsulates the save functionality of the outbox pattern

func NewPublisher

func NewPublisher(store Store) Publisher

NewPublisher is the Publisher constructor

func (Publisher) Send

func (o Publisher) Send(msg Message, tx *sql.Tx) error

Send stores the provided Message within the provided sql.Tx

type Record

type Record struct {
	ID               uuid.UUID
	Message          Message
	State            RecordState
	CreatedOn        time.Time
	LockID           *string
	LockedOn         *time.Time
	ProcessedOn      *time.Time
	NumberOfAttempts int
	LastAttemptOn    *time.Time
	Error            *string
}

Record represents the record that is stored and retrieved from the database

type RecordState

type RecordState int

RecordState is the State of the Record

const (
	//PendingDelivery is the initial state of all records
	PendingDelivery RecordState = iota
	//Delivered indicates that the Records is already Delivered
	Delivered
	//MaxAttemptsReached indicates that the message is not Delivered but the max attempts are reached so it shouldn't be delivered
	MaxAttemptsReached
)

type RetrialPolicy

type RetrialPolicy struct {
	MaxSendAttemptsEnabled bool
	MaxSendAttempts        int
}

RetrialPolicy contains the retrial settings

type Store

type Store interface {
	//AddRecordTx stores the message within the provided database transaction
	AddRecordTx(record Record, tx *sql.Tx) error
	//GetRecordsByLockID returns the records by lockID
	GetRecordsByLockID(lockID string) ([]Record, error)
	//UpdateRecordLockByState updates the lock of all records with the provided state
	UpdateRecordLockByState(lockID string, lockedOn time.Time, state RecordState) error
	//UpdateRecordByID updates the provided the record
	UpdateRecordByID(message Record) error
	//ClearLocksWithDurationBeforeDate clears the locks of records with a lock time before the provided time
	ClearLocksWithDurationBeforeDate(time time.Time) error
	//ClearLocksByLockID clears all records locked by the provided lockID
	ClearLocksByLockID(lockID string) error
	//RemoveRecordsBeforeDatetime removes all records before the provided time
	RemoveRecordsBeforeDatetime(expiryTime time.Time) error
}

Store is the interface that should be implemented by SQL-like database drivers to support the outbox functionality

Directories

Path Synopsis
broker
examples
internal
store

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL