Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ErrEmptyMessagePayload = errors.New("empty message payload")
ErrEmptyMessagePayload is the error returned when the message payload is empty.
Functions ¶
This section is empty.
Types ¶
type ErrorLogger ¶ added in v0.4.0
type ErrorLogger interface {
Error(err error)
}
ErrorLogger is the interface that wraps the basic message publishing.
type GenericMessage ¶
type GenericMessage struct { // Unique identifier for the message. MsgID string // Contains the message header to be sent by the messenger to the message queue. MsgMetadata map[string]string // Payload is the message payload. Must not be empty MsgPayload []byte // Message is published to broker or not. MsgPublished bool // At represent the moment of the message creation. MsgAt time.Time }
GenericMessage represents a message to be sent to message message queue. It implements the Message interface.
func NewMessage ¶
func NewMessage(payload []byte) (*GenericMessage, error)
NewMessage returns a new Message given a payload.
func (*GenericMessage) At ¶ added in v0.5.0
func (m *GenericMessage) At() time.Time
At returns message creation moment.
func (*GenericMessage) ID ¶ added in v0.5.0
func (m *GenericMessage) ID() string
ID returns the unique identifier of the message.
func (*GenericMessage) Metadata ¶
func (m *GenericMessage) Metadata() map[string]string
Metadata returns the message metadata.
func (*GenericMessage) Payload ¶
func (m *GenericMessage) Payload() []byte
Payload returns the message payload.
func (*GenericMessage) Published ¶ added in v0.7.0
func (m *GenericMessage) Published() bool
Published returns if message is published to broker.
func (*GenericMessage) SetMetadata ¶ added in v0.5.0
func (m *GenericMessage) SetMetadata(key, value string) *GenericMessage
SetMetadata sets the given key-value pair to the message metadata. If the key already exists, it replaces the value.
type Message ¶
type Message interface { ID() string Metadata() map[string]string Payload() []byte Published() bool At() time.Time }
A Message represents a message to be sent to message message queue.
type Messenger ¶ added in v0.5.0
type Messenger struct {
// contains filtered or unexported fields
}
Messenger is responsible of publishing messages from datastore to publisher, and cleaning already published messages.
func NewMessenger ¶ added in v0.5.0
NewMessenger returns a `Messenger` instance with defaults.
- Publish batch size: 100
- Publish period: 1s
- Golang standard error logger.
func (*Messenger) Clean ¶ added in v0.5.0
Clean runs once the message cleaning process given a message expiration time.
type Option ¶ added in v0.4.0
type Option func(*Messenger)
Option defines the optional parameters for messenger.
func WithCleanUp ¶ added in v0.4.0
WithCleanUp enables cleanup process setting an expiration time for messages.
func WithErrorLogger ¶ added in v0.4.0
func WithErrorLogger(l ErrorLogger) Option
WithErrorLogger replaces the default error logger.
func WithInterval ¶ added in v0.5.0
WithInterval replaces the default interval duration.
func WithPublishBatchSize ¶ added in v0.4.0
WithPublishBatchSize replaces the default published batch size.
type Publisher ¶ added in v0.5.0
type Publisher interface { // Sends the message to broker. Publish(ctx context.Context, msg Message) error }
Publisher is the interface that wraps the basic message publishing.
type Store ¶ added in v0.4.0
type Store interface { // List unpublished messages with a batch size Messages(ctx context.Context, batch int) ([]Message, error) // Mark as published the given messages, if one of the messages fails will not update any of the messages Published(ctx context.Context, msg ...Message) error // Deletes messages marked as published and older than expiration period from datastore. DeletePublishedByExpiration(ctx context.Context, exp time.Duration) error }
Store is the interface that wraps the message retrieval and update methods.