messenger

package module
v0.11.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 18, 2024 License: MIT Imports: 8 Imported by: 0

README


Messenger

Outbox pattern implementation in Go.

Abstracts from handling the retrieve of the messages from many sources and publishing to many queues. Just store the messages you want to publish.

go reference    go report

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrEmptyMessagePayload = errors.New("empty message payload")

ErrEmptyMessagePayload is the error returned when the message payload is empty.

Functions

This section is empty.

Types

type ErrorHandler added in v0.8.0

type ErrorHandler interface {
	Error(ctx context.Context, err error)
}

ErrorHandler is the interface that wraps the basic message publishing.

type GenericMessage

type GenericMessage struct {
	// Unique identifier for the message.
	MsgID string
	// Contains the message header to be sent by the messenger to the message queue.
	MsgMetadata Metadata
	// Payload is the message payload. Must not be empty
	MsgPayload []byte
	// Message is published to broker or not.
	MsgPublished bool
	// At represent the moment of the message creation.
	MsgAt time.Time
}

GenericMessage represents a message to be sent to message message queue. It implements the Message interface.

func NewMessage

func NewMessage(payload []byte) (*GenericMessage, error)

NewMessage returns a new Message given a payload.

func (*GenericMessage) At added in v0.5.0

func (m *GenericMessage) At() time.Time

At returns message creation moment.

func (*GenericMessage) ID added in v0.5.0

func (m *GenericMessage) ID() string

ID returns the unique identifier of the message.

func (*GenericMessage) Metadata

func (m *GenericMessage) Metadata() Metadata

Metadata returns the message metadata.

func (*GenericMessage) Payload

func (m *GenericMessage) Payload() []byte

Payload returns the message payload.

func (*GenericMessage) Published added in v0.7.0

func (m *GenericMessage) Published() bool

Published returns if message is published to broker.

func (*GenericMessage) SetMetadata added in v0.5.0

func (m *GenericMessage) SetMetadata(key, value string) *GenericMessage

SetMetadata sets the given key-value pair to the message metadata. If the key already exists, it replaces the value.

type Message

type Message interface {
	ID() string
	Metadata() Metadata
	Payload() []byte
	Published() bool
	At() time.Time
}

A Message represents a message to be sent to message message queue.

type Messenger added in v0.5.0

type Messenger struct {
	// contains filtered or unexported fields
}

Messenger is responsible of publishing messages from datastore to publisher, and cleaning already published messages.

func NewMessenger added in v0.5.0

func NewMessenger(store Store, publisher Publisher, opts ...Option) *Messenger

NewMessenger returns a `Messenger` instance with defaults.

  • Publish batch size: 100
  • Publish period: 1s
  • Golang standard error logger.

func (*Messenger) Clean added in v0.5.0

func (w *Messenger) Clean(ctx context.Context) error

Clean runs once the message cleaning process given a message expiration time.

func (*Messenger) Publish added in v0.5.0

func (w *Messenger) Publish(ctx context.Context) error

Publish runs once publishing process.

func (*Messenger) Start added in v0.5.0

func (w *Messenger) Start(ctx context.Context) error

Start runs the process of publishing/cleaning messages every period. In case there is a publish error, it will call to error handler without stopping the process. If a fatal error happens, ex, cant connect to datastore it will stop the process.

type Metadata added in v0.8.0

type Metadata map[string]string

Metadata defines a key value field sent with every message to add more context without the need of decoding the payload.

func (Metadata) Get added in v0.8.0

func (m Metadata) Get(key string) string

Get returns the metadata value for the given key. If the key is not found, an empty string is returned.

func (*Metadata) Scan added in v0.8.0

func (m *Metadata) Scan(value any) error

Scan implements the sql.Scanner interface. This method simply decodes a JSON-encoded value into the struct fields.

func (Metadata) Set added in v0.8.0

func (m Metadata) Set(key, value string)

Set sets the metadata key to value.

func (Metadata) Value added in v0.8.0

func (m Metadata) Value() (driver.Value, error)

Value implements sql.Valuer. Transforms Metadata into json.

type Option added in v0.4.0

type Option func(*Messenger)

Option defines the optional parameters for messenger.

func WithCleanUp added in v0.4.0

func WithCleanUp(expiration time.Duration) Option

WithCleanUp enables cleanup process setting an expiration time for messages.

func WithErrorHandler added in v0.8.0

func WithErrorHandler(l ErrorHandler) Option

WithErrorHandler replaces the default error logger.

func WithInterval added in v0.5.0

func WithInterval(p time.Duration) Option

WithInterval replaces the default interval duration.

func WithPublishBatchSize added in v0.4.0

func WithPublishBatchSize(bs int) Option

WithPublishBatchSize replaces the default published batch size.

type Publisher added in v0.5.0

type Publisher interface {
	// Sends the message to broker.
	Publish(ctx context.Context, msg Message) error
}

Publisher is the interface that wraps the basic message publishing.

type Store added in v0.4.0

type Store interface {
	// List unpublished messages with a batch size
	Messages(ctx context.Context, batch int) ([]Message, error)
	// Mark as published the given messages.
	Published(ctx context.Context, msg Message) error
	// Deletes messages marked as published and older than expiration period from datastore.
	DeletePublishedByExpiration(ctx context.Context, exp time.Duration) error
}

Store is the interface that wraps the message retrieval and update methods.

type Subscription added in v0.8.0

type Subscription interface {
	Name() string
	Handle(ctx context.Context, msg Message) error
}

Subscription defines the basic methods for a subscription broker.

func NewSubscription added in v0.8.0

func NewSubscription(name string, h SubscriptionHandler) Subscription

NewSubscription returns a Subscription handler.

type SubscriptionHandler added in v0.8.0

type SubscriptionHandler func(ctx context.Context, msg Message) error

SubscriptionHandler defines a function to process a message, if something fails returns an error.

Directories

Path Synopsis
sns
sqs
example
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL