messenger

package module
v0.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2023 License: MIT Imports: 6 Imported by: 0

README

⚠ WIP ⚠


Messenger

Outbox pattern implementation in Go.

Abstracts from handling the retrieve of the messages from many sources and publishing to many queues. Just store the messages you want to publish.

go reference  go cover  go report

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrEmptyMessagePayload = errors.New("empty message payload")

ErrEmptyMessagePayload is the error returned when the message payload is empty.

Functions

This section is empty.

Types

type ErrorLogger added in v0.4.0

type ErrorLogger interface {
	Error(err error)
}

ErrorLogger is the interface that wraps the basic message publishing.

type GenericMessage

type GenericMessage struct {
	// contains filtered or unexported fields
}

GenericMessage represents a message to be sent to message message queue. It implements the Message interface.

func NewMessage

func NewMessage(payload []byte) (*GenericMessage, error)

NewMessage returns a new Message given a payload.

func (*GenericMessage) AddMetadata

func (m *GenericMessage) AddMetadata(key, value string)

AddMetadata adds the given key-value pair to the message metadata.

func (*GenericMessage) Metadata

func (m *GenericMessage) Metadata() map[string]string

Metadata returns the message metadata.

func (*GenericMessage) Payload

func (m *GenericMessage) Payload() any

Payload returns the message payload.

type Message

type Message interface {
	Metadata() map[string]string
	Payload() any
}

A Message represents a message to be sent to message message queue.

type Option added in v0.4.0

type Option func(*Worker)

Option defines the optional parameters for worker.

func WithCleanUp added in v0.4.0

func WithCleanUp(period time.Duration, expiration time.Duration) Option

func WithErrorLogger added in v0.4.0

func WithErrorLogger(l ErrorLogger) Option

func WithPublishBatchSize added in v0.4.0

func WithPublishBatchSize(bs int) Option

func WithPublishPeriod added in v0.4.0

func WithPublishPeriod(p time.Duration) Option

type Queue added in v0.4.0

type Queue interface {
	// Sends the message to queue
	Publish(ctx context.Context, msg *store.Message) error
}

Queue is the interface that wraps the basic message publishing.

type Store added in v0.4.0

type Store interface {
	// List unpublished messages with a batch size
	Messages(ctx context.Context, batch int) ([]*store.Message, error)
	// Mark as published the given messages, if one of the messages fails will not update any of the messages
	Published(ctx context.Context, msg ...*store.Message) error
	// Deletes messages marked as published and older than expiration period from datastore.
	DeletePublishedByExpiration(ctx context.Context, exp time.Duration) error
}

Source is the interface that wraps the message retrieval and update methods.

type Worker added in v0.4.0

type Worker struct {
	// contains filtered or unexported fields
}

Worker is responsible of publishing messages from datastore to queue, and cleaning already published messages.

func NewWorker added in v0.4.0

func NewWorker(store Store, queue Queue, opts ...Option) *Worker

NewWorker returns a `Worker` instance with defaults.

  • Publish batch size: 100
  • Publish period: 1s
  • Golang standard error logger.

func (*Worker) Clean added in v0.4.0

func (w *Worker) Clean(ctx context.Context) error

Clean runs once the message cleaning process given a message expiration time.

func (*Worker) Publish added in v0.4.0

func (w *Worker) Publish(ctx context.Context) error

Publish runs once publishing process.

func (*Worker) Start added in v0.4.0

func (w *Worker) Start(ctx context.Context) error

Start runs publish and clean processes.

func (*Worker) StartClean added in v0.4.0

func (w *Worker) StartClean(ctx context.Context) error

StartClean runs the cleaning process preriodically defined by the clean period. If there is an error it stops and returns the error.

func (*Worker) StartPublisher added in v0.4.0

func (w *Worker) StartPublisher(ctx context.Context) error

StartPublisher runs the publishing process. In case there is a publish error, it will call to error handler without stopping the process. If a fatal error happens, ex, cant connect to datastore it will stop the process.

Directories

Path Synopsis
Package publish manages message publishing from a source to a queue.
Package publish manages message publishing from a source to a queue.
sns
sqs
Package store the message for internal use that should not be exported
Package store the message for internal use that should not be exported
postgres/pgx
Package pgx implements storage functionality for saving and retrieving messages
Package pgx implements storage functionality for saving and retrieving messages

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL