Documentation ¶
Overview ¶
Package outboxer is an implementation of the outbox pattern. The producer of messages can durably store those messages in a local outbox before sending to a Message Endpoint. The durable local storage may be implemented in the Message Channel directly, especially when combined with Idempotent Messages.
Index ¶
- Variables
- type DataStore
- type DynamicValues
- type EventStream
- type ExecerContext
- type Option
- func WithCheckInterval(t time.Duration) Option
- func WithCleanUpBatchSize(s int32) Option
- func WithCleanUpOlderThan(t time.Duration) Option
- func WithCleanupInterval(t time.Duration) Option
- func WithDataStore(ds DataStore) Option
- func WithEventStream(es EventStream) Option
- func WithMessageBatchSize(s int32) Option
- type OutboxMessage
- type Outboxer
- func (o *Outboxer) ErrChan() <-chan error
- func (o *Outboxer) OkChan() <-chan struct{}
- func (o *Outboxer) Send(ctx context.Context, m *OutboxMessage) error
- func (o *Outboxer) SendWithinTx(ctx context.Context, evt *OutboxMessage, fn func(ExecerContext) error) error
- func (o *Outboxer) Start(ctx context.Context)
- func (o *Outboxer) StartCleanup(ctx context.Context)
- func (o *Outboxer) StartDispatcher(ctx context.Context)
- func (o *Outboxer) Stop()
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrMissingEventStream is used when no event stream is provided. ErrMissingEventStream = errors.New("an event stream is required for the outboxer to work") // ErrMissingDataStore is used when no data store is provided. ErrMissingDataStore = errors.New("a data store is required for the outboxer to work") )
var ErrFailedToDecodeType = errors.New("could not decode type")
ErrFailedToDecodeType is returned when the type of the value is not supported.
Functions ¶
This section is empty.
Types ¶
type DataStore ¶
type DataStore interface { // Tries to find the given message in the outbox. GetEvents(ctx context.Context, batchSize int32) ([]*OutboxMessage, error) Add(ctx context.Context, m *OutboxMessage) error AddWithinTx(ctx context.Context, m *OutboxMessage, fn func(ExecerContext) error) error SetAsDispatched(ctx context.Context, id int64) error Remove(ctx context.Context, since time.Time, batchSize int32) error }
DataStore defines the data store methods.
type DynamicValues ¶
type DynamicValues map[string]interface{}
DynamicValues is a map that can be serialized.
func (*DynamicValues) Scan ¶
func (p *DynamicValues) Scan(src interface{}) error
Scan scans a database json representation into a []Item.
type EventStream ¶
type EventStream interface {
Send(context.Context, *OutboxMessage) error
}
EventStream defines the event stream methods.
type ExecerContext ¶
type ExecerContext interface {
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
}
ExecerContext defines the exec context method that is used within a transaction.
type Option ¶
type Option func(*Outboxer)
Option represents the outboxer options.
func WithCheckInterval ¶
WithCheckInterval sets the frequency that outboxer will check for new events.
func WithCleanUpBatchSize ¶
WithCleanUpBatchSize sets the clean up process batch size.
func WithCleanUpOlderThan ¶
WithCleanUpOlderThan sets the date that the clean up process should start removing from.
func WithCleanupInterval ¶
WithCleanupInterval sets the frequency that outboxer will clean old events from the data store.
func WithDataStore ¶
WithDataStore sets the data store where events will be stored before sending.
func WithEventStream ¶
func WithEventStream(es EventStream) Option
WithEventStream sets the event stream to where events will be sent.
func WithMessageBatchSize ¶
WithMessageBatchSize sets how many messages will be sent at a time.
type OutboxMessage ¶
type OutboxMessage struct { ID int64 Dispatched bool DispatchedAt sql.NullTime Payload []byte Options DynamicValues Headers DynamicValues }
OutboxMessage represents a message that will be sent.
type Outboxer ¶
type Outboxer struct {
// contains filtered or unexported fields
}
Outboxer implements the outbox pattern.
func New ¶
New creates a new instance of Outboxer.
Example ¶
nolint
package main import ( "context" "database/sql" "fmt" "os" "time" "github.com/artsv79/outboxer" amqpOut "github.com/artsv79/outboxer/es/amqp" "github.com/artsv79/outboxer/storage/postgres" amqp "github.com/rabbitmq/amqp091-go" ) func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() db, err := sql.Open("postgres", os.Getenv("DS_DSN")) if err != nil { fmt.Printf("could not connect to amqp: %s", err) return } conn, err := amqp.Dial(os.Getenv("ES_DSN")) if err != nil { fmt.Printf("could not connect to amqp: %s", err) return } // we need to create a data store instance first ds, err := postgres.WithInstance(ctx, db) if err != nil { fmt.Printf("could not setup the data store: %s", err) return } defer ds.Close() // we create an event stream passing the amqp connection es := amqpOut.NewAMQP(conn) // now we create an outboxer instance passing the data store and event stream o, err := outboxer.New( outboxer.WithDataStore(ds), outboxer.WithEventStream(es), outboxer.WithCheckInterval(1*time.Second), outboxer.WithCleanupInterval(5*time.Second), outboxer.WithCleanUpOlderThan(5*24*time.Hour), outboxer.WithCleanUpBatchSize(10), outboxer.WithMessageBatchSize(10), ) if err != nil { fmt.Printf("could not create an outboxer instance: %s", err) return } // here we initialize the outboxer checks and cleanup go rotines o.Start(ctx) defer o.Stop() // finally we are ready to send messages if err = o.Send(ctx, &outboxer.OutboxMessage{ Payload: []byte("test payload"), Options: map[string]interface{}{ amqpOut.ExchangeNameOption: "test", amqpOut.ExchangeTypeOption: "topic", amqpOut.RoutingKeyOption: "test.send", }, }); err != nil { fmt.Printf("could not send message: %s", err) return } // we can also listen for errors and ok messages that were send for { select { case err := <-o.ErrChan(): fmt.Printf("could not send message: %s", err) case <-o.OkChan(): fmt.Printf("message received") return } } }
Output:
func (*Outboxer) OkChan ¶
func (o *Outboxer) OkChan() <-chan struct{}
OkChan returns the ok channel that is used when each message is successfully delivered.
func (*Outboxer) Send ¶
func (o *Outboxer) Send(ctx context.Context, m *OutboxMessage) error
Send sends a message.
func (*Outboxer) SendWithinTx ¶
func (o *Outboxer) SendWithinTx(ctx context.Context, evt *OutboxMessage, fn func(ExecerContext) error) error
SendWithinTx encapsulate any database call within a transaction.
func (*Outboxer) Start ¶
Start encapsulates two go routines. Starts the dispatcher, which is responsible for getting the messages from the data store and sending to the event stream. Starts the cleanup process, that makes sure old messages are removed from the data store.
func (*Outboxer) StartCleanup ¶
StartCleanup starts the cleanup process, that makes sure old messages are removed from the data store.
func (*Outboxer) StartDispatcher ¶
StartDispatcher starts the dispatcher, which is responsible for getting the messages from the data store and sending to the event stream.
Directories ¶
Path | Synopsis |
---|---|
es
|
|
amqp
Package amqp is the AMQP implementation of an event stream.
|
Package amqp is the AMQP implementation of an event stream. |
kinesis
Package kinesis is the AWS Kinesis implementation of an event stream.
|
Package kinesis is the AWS Kinesis implementation of an event stream. |
pubsub
Package pubsub is the GCP PubSub implementation of an event stream.
|
Package pubsub is the GCP PubSub implementation of an event stream. |
sqs
Package SQS is the AWS SQS implementation of an event stream.
|
Package SQS is the AWS SQS implementation of an event stream. |
storage
|
|
mysql
Package mysql is the implementation of the mysql data store.
|
Package mysql is the implementation of the mysql data store. |
postgres
Package postgres is the implementation of the postgres data store.
|
Package postgres is the implementation of the postgres data store. |