messenger

package module
v0.7.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 17, 2023 License: MIT Imports: 5 Imported by: 0

README


Messenger

Outbox pattern implementation in Go.

Abstracts from handling the retrieve of the messages from many sources and publishing to many queues. Just store the messages you want to publish.

go reference    go report

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrEmptyMessagePayload = errors.New("empty message payload")

ErrEmptyMessagePayload is the error returned when the message payload is empty.

Functions

This section is empty.

Types

type ErrorLogger added in v0.4.0

type ErrorLogger interface {
	Error(err error)
}

ErrorLogger is the interface that wraps the basic message publishing.

type GenericMessage

type GenericMessage struct {
	// Unique identifier for the message.
	MsgID string
	// Contains the message header to be sent by the messenger to the message queue.
	MsgMetadata map[string]string
	// Payload is the message payload. Must not be empty
	MsgPayload []byte
	// Message is published to broker or not.
	MsgPublished bool
	// At represent the moment of the message creation.
	MsgAt time.Time
}

GenericMessage represents a message to be sent to message message queue. It implements the Message interface.

func NewMessage

func NewMessage(payload []byte) (*GenericMessage, error)

NewMessage returns a new Message given a payload.

func (*GenericMessage) At added in v0.5.0

func (m *GenericMessage) At() time.Time

At returns message creation moment.

func (*GenericMessage) ID added in v0.5.0

func (m *GenericMessage) ID() string

ID returns the unique identifier of the message.

func (*GenericMessage) Metadata

func (m *GenericMessage) Metadata() map[string]string

Metadata returns the message metadata.

func (*GenericMessage) Payload

func (m *GenericMessage) Payload() []byte

Payload returns the message payload.

func (*GenericMessage) Published added in v0.7.0

func (m *GenericMessage) Published() bool

Published returns if message is published to broker.

func (*GenericMessage) SetMetadata added in v0.5.0

func (m *GenericMessage) SetMetadata(key, value string) *GenericMessage

SetMetadata sets the given key-value pair to the message metadata. If the key already exists, it replaces the value.

type Message

type Message interface {
	ID() string
	Metadata() map[string]string
	Payload() []byte
	Published() bool
	At() time.Time
}

A Message represents a message to be sent to message message queue.

type Messenger added in v0.5.0

type Messenger struct {
	// contains filtered or unexported fields
}

Messenger is responsible of publishing messages from datastore to publisher, and cleaning already published messages.

func NewMessenger added in v0.5.0

func NewMessenger(store Store, publisher Publisher, opts ...Option) *Messenger

NewMessenger returns a `Messenger` instance with defaults.

  • Publish batch size: 100
  • Publish period: 1s
  • Golang standard error logger.

func (*Messenger) Clean added in v0.5.0

func (w *Messenger) Clean(ctx context.Context) error

Clean runs once the message cleaning process given a message expiration time.

func (*Messenger) Publish added in v0.5.0

func (w *Messenger) Publish(ctx context.Context) error

Publish runs once publishing process.

func (*Messenger) Start added in v0.5.0

func (w *Messenger) Start(ctx context.Context) error

Start runs the process of publishing/cleaning messages every period. In case there is a publish error, it will call to error handler without stopping the process. If a fatal error happens, ex, cant connect to datastore it will stop the process.

type Option added in v0.4.0

type Option func(*Messenger)

Option defines the optional parameters for messenger.

func WithCleanUp added in v0.4.0

func WithCleanUp(expiration time.Duration) Option

WithCleanUp enables cleanup process setting an expiration time for messages.

func WithErrorLogger added in v0.4.0

func WithErrorLogger(l ErrorLogger) Option

WithErrorLogger replaces the default error logger.

func WithInterval added in v0.5.0

func WithInterval(p time.Duration) Option

WithInterval replaces the default interval duration.

func WithPublishBatchSize added in v0.4.0

func WithPublishBatchSize(bs int) Option

WithPublishBatchSize replaces the default published batch size.

type Publisher added in v0.5.0

type Publisher interface {
	// Sends the message to broker.
	Publish(ctx context.Context, msg Message) error
}

Publisher is the interface that wraps the basic message publishing.

type Store added in v0.4.0

type Store interface {
	// List unpublished messages with a batch size
	Messages(ctx context.Context, batch int) ([]Message, error)
	// Mark as published the given messages, if one of the messages fails will not update any of the messages
	Published(ctx context.Context, msg ...Message) error
	// Deletes messages marked as published and older than expiration period from datastore.
	DeletePublishedByExpiration(ctx context.Context, exp time.Duration) error
}

Store is the interface that wraps the message retrieval and update methods.

Directories

Path Synopsis
sns
sqs
example
internal
store

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL