Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ErrEmptyMessagePayload = errors.New("empty message payload")
ErrEmptyMessagePayload is the error returned when the message payload is empty.
Functions ¶
This section is empty.
Types ¶
type ErrorHandler ¶ added in v0.8.0
ErrorHandler is the interface that wraps the basic message publishing.
type GenericMessage ¶
type GenericMessage struct { // Unique identifier for the message. MsgID string // Contains the message header to be sent by the messenger to the message queue. MsgMetadata Metadata // Payload is the message payload. Must not be empty MsgPayload []byte // Message is published to broker or not. MsgPublished bool // At represent the moment of the message creation. MsgAt time.Time }
GenericMessage represents a message to be sent to message message queue. It implements the Message interface.
func NewMessage ¶
func NewMessage(payload []byte) (*GenericMessage, error)
NewMessage returns a new Message given a payload.
func (*GenericMessage) At ¶ added in v0.5.0
func (m *GenericMessage) At() time.Time
At returns message creation moment.
func (*GenericMessage) ID ¶ added in v0.5.0
func (m *GenericMessage) ID() string
ID returns the unique identifier of the message.
func (*GenericMessage) Metadata ¶
func (m *GenericMessage) Metadata() Metadata
Metadata returns the message metadata.
func (*GenericMessage) Payload ¶
func (m *GenericMessage) Payload() []byte
Payload returns the message payload.
func (*GenericMessage) Published ¶ added in v0.7.0
func (m *GenericMessage) Published() bool
Published returns if message is published to broker.
func (*GenericMessage) SetMetadata ¶ added in v0.5.0
func (m *GenericMessage) SetMetadata(key, value string) *GenericMessage
SetMetadata sets the given key-value pair to the message metadata. If the key already exists, it replaces the value.
type Message ¶
type Message interface { ID() string Metadata() Metadata Payload() []byte Published() bool At() time.Time }
A Message represents a message to be sent to message message queue.
type Messenger ¶ added in v0.5.0
type Messenger struct {
// contains filtered or unexported fields
}
Messenger is responsible of publishing messages from datastore to publisher, and cleaning already published messages.
func NewMessenger ¶ added in v0.5.0
NewMessenger returns a `Messenger` instance with defaults.
- Publish batch size: 100
- Publish period: 1s
- Golang standard error logger.
func (*Messenger) Clean ¶ added in v0.5.0
Clean runs once the message cleaning process given a message expiration time.
type Metadata ¶ added in v0.8.0
Metadata defines a key value field sent with every message to add more context without the need of decoding the payload.
func (Metadata) Get ¶ added in v0.8.0
Get returns the metadata value for the given key. If the key is not found, an empty string is returned.
func (*Metadata) Scan ¶ added in v0.8.0
Scan implements the sql.Scanner interface. This method simply decodes a JSON-encoded value into the struct fields.
type Option ¶ added in v0.4.0
type Option func(*Messenger)
Option defines the optional parameters for messenger.
func WithCleanUp ¶ added in v0.4.0
WithCleanUp enables cleanup process setting an expiration time for messages.
func WithErrorHandler ¶ added in v0.8.0
func WithErrorHandler(l ErrorHandler) Option
WithErrorHandler replaces the default error logger.
func WithInterval ¶ added in v0.5.0
WithInterval replaces the default interval duration.
func WithPublishBatchSize ¶ added in v0.4.0
WithPublishBatchSize replaces the default published batch size.
type Publisher ¶ added in v0.5.0
type Publisher interface { // Sends the message to broker. Publish(ctx context.Context, msg Message) error }
Publisher is the interface that wraps the basic message publishing.
type Store ¶ added in v0.4.0
type Store interface { // List unpublished messages with a batch size Messages(ctx context.Context, batch int) ([]Message, error) // Mark as published the given messages. Published(ctx context.Context, msg Message) error // Deletes messages marked as published and older than expiration period from datastore. DeletePublishedByExpiration(ctx context.Context, exp time.Duration) error }
Store is the interface that wraps the message retrieval and update methods.
type Subscription ¶ added in v0.8.0
Subscription defines the basic methods for a subscription broker.
func NewSubscription ¶ added in v0.8.0
func NewSubscription(name string, h SubscriptionHandler) Subscription
NewSubscription returns a Subscription handler.