Documentation ¶
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Worker ¶
type Worker struct { MineSweeper datastore.MineSweeper Dispatcher pubsub.Dispatcher MineInterval time.Duration Logger *zap.Logger }
Worker is the outbox worker which runs repeatedly until asked to stop.
Example (Rabbit_with_pg) ¶
package main import ( "context" "database/sql" "os" "os/signal" "time" "github.com/kamal-github/outbox" "github.com/kamal-github/outbox/datastore" "github.com/kamal-github/outbox/pubsub" "go.uber.org/zap" ) func main() { ctx := context.Background() // Setup log logger, err := zap.NewProduction() if err != nil { panic(err) } // Connect to Postgres dsName := "postgres://postgres:password@localhost:5432/test-outbox?sslmode=disable" dbConn, err := connectToSQLDB("postgres", dsName) if err != nil { panic(err) } // Setup Postgres as Minesweeper mineSweeper, err := datastore.NewPostgres(dbConn, "outbox", logger) if err != nil { panic(err) } defer mineSweeper.Close() // Setup RabbitMQ as PubSub dispatcher, err := pubsub.NewRabbitMQ("", mineSweeper, logger) if err != nil { panic(err) } defer dispatcher.Close() // Graceful shutdown sig := make(chan os.Signal, 1) signal.Notify(sig, os.Interrupt) ctx, cancel := context.WithCancel(ctx) defer cancel() workerDone := make(chan struct{}) // Run worker in a separate go routine. go outbox.Worker{ MineSweeper: mineSweeper, Dispatcher: dispatcher, Logger: logger, MineInterval: 2 * time.Second, }.Start(ctx, workerDone) <-sig cancel() <-workerDone } func connectToSQLDB(driver, dsName string) (*sql.DB, error) { db, err := sql.Open(driver, dsName) if err != nil { return db, err } return db, nil }
Output:
Example (Sqs_with_pg) ¶
package main import ( "context" "database/sql" "os" "os/signal" "time" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/sqs" "github.com/kamal-github/outbox" "github.com/kamal-github/outbox/datastore" "github.com/kamal-github/outbox/pubsub" "go.uber.org/zap" ) func main() { ctx := context.Background() // Setup log logger, err := zap.NewProduction() if err != nil { panic(err) } // Connect to Postgres dsName := "postgres://postgres:password@localhost:5432/test-outbox?sslmode=disable" dbConn, err := connectToSQLDB("postgres", dsName) if err != nil { panic(err) } // Setup Postgres as Minesweeper mineSweeper, err := datastore.NewPostgres(dbConn, "outbox", logger) if err != nil { panic(err) } defer mineSweeper.Close() // Setup AWS session and SQS connection awsSession := session.Must(session.NewSession()) sqsConn := sqs.New(awsSession) // Setup SQS as PubSub dispatcher, err := pubsub.NewSimpleQueueService(sqsConn, mineSweeper, logger) if err != nil { panic(err) } defer dispatcher.Close() // Graceful shutdown sig := make(chan os.Signal, 1) signal.Notify(sig, os.Interrupt) ctx, cancel := context.WithCancel(ctx) defer cancel() workerDone := make(chan struct{}) // Run worker in a separate go routine. go outbox.Worker{ MineSweeper: mineSweeper, Dispatcher: dispatcher, Logger: logger, MineInterval: 2 * time.Second, }.Start(ctx, workerDone) <-sig cancel() <-workerDone } func connectToSQLDB(driver, dsName string) (*sql.DB, error) { db, err := sql.Open(driver, dsName) if err != nil { return db, err } return db, nil }
Output:
func (Worker) Start ¶
Start starts the outbox worker and iterative looks for new outbox rows (ready to process) after each given MineInterval and publishes to one of the configured Messaging system.
When no ready to process message are found, it keep looking for new ones.
Exit as soon as ctx is cancelled.
Directories ¶
Path | Synopsis |
---|---|
Package datastore maintains all the datastorage implementation.
|
Package datastore maintains all the datastorage implementation. |
Package event contains details related to event defined as Outbox Row in Datastore.
|
Package event contains details related to event defined as Outbox Row in Datastore. |
internal
|
|
Package pubsub contains various implementation for event dispatcher.
|
Package pubsub contains various implementation for event dispatcher. |
mocks
Package sweepermock is a generated GoMock package.
|
Package sweepermock is a generated GoMock package. |
Package test contains integration tests and related configuration.
|
Package test contains integration tests and related configuration. |
Click to show internal directories.
Click to hide internal directories.