Documentation ¶
Overview ¶
Package messaging provides a set of utilities for working with messaging systems. It includes functionality for sending and receiving messages, as well as managing message queues. This package supports various messaging protocols, including AMQP and MQTT. It provides a simple and consistent API for interacting with different messaging systems. The `Sender` type is used for sending messages, while the `Receiver` type is used for receiving messages. Both types provide methods for connecting to a messaging server, sending/receiving messages, and closing the connection.
Note: This package requires a messaging server to be running in order to send/receive messages. Please refer to the documentation of the specific messaging protocol for more information on how to set up a server.
Index ¶
- Constants
- type BaseMessage
- func (bm *BaseMessage) GetBoolHeader(key string) (value bool, exists bool)
- func (bm *BaseMessage) GetFloat64Header(key string) (value float64, exists bool)
- func (bm *BaseMessage) GetFloatHeader(key string) (value float32, exists bool)
- func (bm *BaseMessage) GetHeader(key string) (value []byte, exists bool)
- func (bm *BaseMessage) GetInt16Header(key string) (value int16, exists bool)
- func (bm *BaseMessage) GetInt32Header(key string) (value int32, exists bool)
- func (bm *BaseMessage) GetInt64Header(key string) (value int64, exists bool)
- func (bm *BaseMessage) GetInt8Header(key string) (value int8, exists bool)
- func (bm *BaseMessage) GetIntHeader(key string) (value int, exists bool)
- func (bm *BaseMessage) GetStrHeader(key string) (value string, exists bool)
- func (bm *BaseMessage) ReadAsStr() string
- func (bm *BaseMessage) ReadBody() io.Reader
- func (bm *BaseMessage) ReadBytes() []byte
- func (bm *BaseMessage) ReadContent(out interface{}, contentType string) (err error)
- func (bm *BaseMessage) ReadJSON(out interface{}) (err error)
- func (bm *BaseMessage) ReadXML(out interface{}) (err error)
- func (bm *BaseMessage) SetBodyBytes(input []byte) (n int, err error)
- func (bm *BaseMessage) SetBodyStr(input string) (n int, err error)
- func (bm *BaseMessage) SetBoolHeader(key string, value bool)
- func (bm *BaseMessage) SetFloat64Header(key string, value float64)
- func (bm *BaseMessage) SetFloatHeader(key string, value float32)
- func (bm *BaseMessage) SetFrom(content io.Reader) (n int64, err error)
- func (bm *BaseMessage) SetHeader(key string, value []byte)
- func (bm *BaseMessage) SetInt16Header(key string, value int16)
- func (bm *BaseMessage) SetInt32Header(key string, value int32)
- func (bm *BaseMessage) SetInt64Header(key string, value int64)
- func (bm *BaseMessage) SetInt8Header(key string, value int8)
- func (bm *BaseMessage) SetIntHeader(key string, value int)
- func (bm *BaseMessage) SetStrHeader(key string, value string)
- func (bm *BaseMessage) WriteContent(input interface{}, contentType string) (err error)
- func (bm *BaseMessage) WriteJSON(input interface{}) (err error)
- func (bm *BaseMessage) WriteXML(input interface{}) (err error)
- type Body
- type Header
- type LocalMessage
- type LocalProvider
- func (lp *LocalProvider) AddListener(url *url.URL, listener func(msg Message), options ...Option) (err error)
- func (lp *LocalProvider) NewMessage(scheme string, options ...Option) (msg Message, err error)
- func (lp *LocalProvider) Receive(url *url.URL, options ...Option) (msg Message, err error)
- func (lp *LocalProvider) ReceiveBatch(url *url.URL, options ...Option) (msgs []Message, err error)
- func (lp *LocalProvider) Schemes() (schemes []string)
- func (lp *LocalProvider) Send(url *url.URL, msg Message, options ...Option) (err error)
- func (lp *LocalProvider) SendBatch(url *url.URL, msgs []Message, options ...Option) (err error)
- func (lp *LocalProvider) Setup()
- type Manager
- func (m *Manager) AddListener(u *url.URL, listener func(msg Message), options ...Option) (err error)
- func (m *Manager) NewMessage(scheme string, options ...Option) (msg Message, err error)
- func (m *Manager) Receive(u *url.URL, options ...Option) (msg Message, err error)
- func (m *Manager) ReceiveBatch(u *url.URL, options ...Option) (msgs []Message, err error)
- func (m *Manager) Register(provider Provider)
- func (m *Manager) Schemes() (schemes []string)
- func (m *Manager) Send(u *url.URL, msg Message, options ...Option) (err error)
- func (m *Manager) SendBatch(u *url.URL, msgs []Message, options ...Option) (err error)
- func (m *Manager) Setup()
- type Message
- type Messaging
- type Option
- type OptionsBuilder
- func (ob *OptionsBuilder) Add(key string, value interface{}) *OptionsBuilder
- func (ob *OptionsBuilder) AddCircuitBreaker(failureThreshold, successThreshold uint64, maxHalfOpen, timeout uint32) *OptionsBuilder
- func (ob *OptionsBuilder) AddRetryHandler(maxRetries, wait int) *OptionsBuilder
- func (ob *OptionsBuilder) Build() []Option
- type OptionsResolver
- type Producer
- type Provider
- type Receiver
Constants ¶
const ( CircuitBreakerOpts = "CircuitBreakerOption" RetryOpts = "CircuitBreakerOption" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BaseMessage ¶
type BaseMessage struct {
// contains filtered or unexported fields
}
func (*BaseMessage) GetBoolHeader ¶
func (bm *BaseMessage) GetBoolHeader(key string) (value bool, exists bool)
func (*BaseMessage) GetFloat64Header ¶
func (bm *BaseMessage) GetFloat64Header(key string) (value float64, exists bool)
func (*BaseMessage) GetFloatHeader ¶
func (bm *BaseMessage) GetFloatHeader(key string) (value float32, exists bool)
func (*BaseMessage) GetHeader ¶
func (bm *BaseMessage) GetHeader(key string) (value []byte, exists bool)
func (*BaseMessage) GetInt16Header ¶
func (bm *BaseMessage) GetInt16Header(key string) (value int16, exists bool)
func (*BaseMessage) GetInt32Header ¶
func (bm *BaseMessage) GetInt32Header(key string) (value int32, exists bool)
func (*BaseMessage) GetInt64Header ¶
func (bm *BaseMessage) GetInt64Header(key string) (value int64, exists bool)
func (*BaseMessage) GetInt8Header ¶
func (bm *BaseMessage) GetInt8Header(key string) (value int8, exists bool)
func (*BaseMessage) GetIntHeader ¶
func (bm *BaseMessage) GetIntHeader(key string) (value int, exists bool)
func (*BaseMessage) GetStrHeader ¶
func (bm *BaseMessage) GetStrHeader(key string) (value string, exists bool)
func (*BaseMessage) ReadAsStr ¶
func (bm *BaseMessage) ReadAsStr() string
func (*BaseMessage) ReadBody ¶
func (bm *BaseMessage) ReadBody() io.Reader
func (*BaseMessage) ReadBytes ¶
func (bm *BaseMessage) ReadBytes() []byte
func (*BaseMessage) ReadContent ¶
func (bm *BaseMessage) ReadContent(out interface{}, contentType string) (err error)
func (*BaseMessage) ReadJSON ¶
func (bm *BaseMessage) ReadJSON(out interface{}) (err error)
func (*BaseMessage) ReadXML ¶
func (bm *BaseMessage) ReadXML(out interface{}) (err error)
func (*BaseMessage) SetBodyBytes ¶
func (bm *BaseMessage) SetBodyBytes(input []byte) (n int, err error)
func (*BaseMessage) SetBodyStr ¶
func (bm *BaseMessage) SetBodyStr(input string) (n int, err error)
func (*BaseMessage) SetBoolHeader ¶
func (bm *BaseMessage) SetBoolHeader(key string, value bool)
func (*BaseMessage) SetFloat64Header ¶
func (bm *BaseMessage) SetFloat64Header(key string, value float64)
func (*BaseMessage) SetFloatHeader ¶
func (bm *BaseMessage) SetFloatHeader(key string, value float32)
func (*BaseMessage) SetFrom ¶
func (bm *BaseMessage) SetFrom(content io.Reader) (n int64, err error)
func (*BaseMessage) SetHeader ¶
func (bm *BaseMessage) SetHeader(key string, value []byte)
func (*BaseMessage) SetInt16Header ¶
func (bm *BaseMessage) SetInt16Header(key string, value int16)
func (*BaseMessage) SetInt32Header ¶
func (bm *BaseMessage) SetInt32Header(key string, value int32)
func (*BaseMessage) SetInt64Header ¶
func (bm *BaseMessage) SetInt64Header(key string, value int64)
func (*BaseMessage) SetInt8Header ¶
func (bm *BaseMessage) SetInt8Header(key string, value int8)
func (*BaseMessage) SetIntHeader ¶
func (bm *BaseMessage) SetIntHeader(key string, value int)
func (*BaseMessage) SetStrHeader ¶
func (bm *BaseMessage) SetStrHeader(key string, value string)
func (*BaseMessage) WriteContent ¶
func (bm *BaseMessage) WriteContent(input interface{}, contentType string) (err error)
func (*BaseMessage) WriteJSON ¶
func (bm *BaseMessage) WriteJSON(input interface{}) (err error)
func (*BaseMessage) WriteXML ¶
func (bm *BaseMessage) WriteXML(input interface{}) (err error)
type Body ¶
type Body interface { // SetBodyStr sets the string body to the Message structure SetBodyStr(in string) (int, error) // SetBodyBytes sets the byte[] body to the Message structure SetBodyBytes(int []byte) (int, error) // SetFrom sets the Reader body to the Message structure SetFrom(content io.Reader) (int64, error) // WriteJSON sets the JSON body to the Message structure WriteJSON(int interface{}) error // WriteXML sets the XML body to the Message structure WriteXML(in interface{}) error // WriteContent sets the custom body type based on the contentType to the Message structure WriteContent(in interface{}, contentType string) error // ReadBody reads the Reader body from the Message structure ReadBody() io.Reader // ReadBytes reads the []byte body from the Message structure ReadBytes() []byte // ReadAsStr reads the string body from the Message structure ReadAsStr() string // ReadJSON reads the JSON body from the Message structure ReadJSON(out interface{}) error // ReadXML reads the XML body from the Message structure ReadXML(out interface{}) error // ReadContent reads the content body based on the contentType from the Message structure ReadContent(out interface{}, contentType string) error }
Body defines all the body interfaces required by the body of the messaging client
type Header ¶
type Header interface { // SetHeader sets the byte header value for the Message header SetHeader(key string, value []byte) // SetStrHeader sets the string header value for the Message header SetStrHeader(key string, value string) // SetBoolHeader sets the boolean header value for the Message header SetBoolHeader(key string, value bool) // SetIntHeader sets the int header value for the Message header SetIntHeader(key string, value int) // SetInt8Header sets the int8 header value for the Message header SetInt8Header(key string, value int8) // SetInt16Header sets the int16 header value for the Message header SetInt16Header(key string, value int16) // SetInt32Header sets the int32 header value for the Message header SetInt32Header(key string, value int32) // SetInt64Header sets the int64 header value for the Message header SetInt64Header(key string, value int64) // SetFloatHeader sets the float32 header value for the Message header SetFloatHeader(key string, value float32) // SetFloat64Header sets the float64 header value for the Message header SetFloat64Header(key string, value float64) // GetHeader returns the value of the key set in the headers if exists in the byte[] value GetHeader(key string) (value []byte, exists bool) // GetStrHeader returns the value of the key set in the headers if exists in the string value GetStrHeader(key string) (value string, exists bool) // GetBoolHeader returns the value of the key set in the headers if exists in the bool value GetBoolHeader(key string) (value bool, exists bool) // GetIntHeader returns the value of the key set in the headers if exists in the int value GetIntHeader(key string) (value int, exists bool) // GetInt8Header returns the value of the key set in the headers if exists in the int8 value GetInt8Header(key string) (value int8, exists bool) // GetInt16Header returns the value of the key set in the headers if exists in the int16 value GetInt16Header(key string) (value int16, exists bool) // GetInt32Header returns the value of the key set in the headers if exists in the int32 value GetInt32Header(key string) (value int32, exists bool) // GetInt64Header returns the value of the key set in the headers if exists in the int64 value GetInt64Header(key string) (value int64, exists bool) // GetFloatHeader returns the value of the key set in the headers if exists in the float32 value GetFloatHeader(key string) (value float32, exists bool) // GetFloat64Header returns the value of the key set in the headers if exists in the float64 value GetFloat64Header(key string) (value float64, exists bool) }
Header defines all the header interfaces required by the messaging clients
type LocalMessage ¶
type LocalMessage struct {
*BaseMessage
}
func NewLocalMessage ¶
func NewLocalMessage() *LocalMessage
type LocalProvider ¶
type LocalProvider struct {
// contains filtered or unexported fields
}
LocalProvider is an implementation of the Provider interface
func (*LocalProvider) AddListener ¶
func (*LocalProvider) NewMessage ¶
func (lp *LocalProvider) NewMessage(scheme string, options ...Option) (msg Message, err error)
func (*LocalProvider) ReceiveBatch ¶
func (*LocalProvider) Schemes ¶
func (lp *LocalProvider) Schemes() (schemes []string)
func (*LocalProvider) Setup ¶
func (lp *LocalProvider) Setup()
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager struct is used to manage the known Messaging providers. It includes a mutex to handle concurrent access to the known providers
func (*Manager) AddListener ¶
func (m *Manager) AddListener(u *url.URL, listener func(msg Message), options ...Option) (err error)
AddListener registers a listener for the message using the appropriate provider
func (*Manager) NewMessage ¶
NewMessage creates a new message using the appropriate provider
func (*Manager) ReceiveBatch ¶
ReceiveBatch receives a batch of messages using the appropriate provider
func (*Manager) Send ¶
Send is a helper function that sends a message using the appropriate provider
type Message ¶
type Message interface { Header Body // Rsvp function provides a facade to acknowledge the message to the provider indicating the acceptance or rejection //as mentioned by the first bool parameter. //Additional options can be set for indicating further actions. //This functionality is purely dependent on the capability of the provider to accept an acknowledgement. Rsvp(bool, ...Option) error }
Message interface wil be implemented by all third party implementation such as aws - sns, sqs, gcp -> pub/sub, gcm, messaging -> amqp, kafka
type Messaging ¶
Messaging interface defines an abstraction for messaging providers that can be registered
type OptionsBuilder ¶
type OptionsBuilder struct {
// contains filtered or unexported fields
}
func NewOptionsBuilder ¶
func NewOptionsBuilder() *OptionsBuilder
TODO check if we can pool this for performance
func (*OptionsBuilder) Add ¶
func (ob *OptionsBuilder) Add(key string, value interface{}) *OptionsBuilder
TODO check if you need to pool this for performance
func (*OptionsBuilder) AddCircuitBreaker ¶
func (ob *OptionsBuilder) AddCircuitBreaker(failureThreshold, successThreshold uint64, maxHalfOpen, timeout uint32) *OptionsBuilder
func (*OptionsBuilder) AddRetryHandler ¶
func (ob *OptionsBuilder) AddRetryHandler(maxRetries, wait int) *OptionsBuilder
func (*OptionsBuilder) Build ¶
func (ob *OptionsBuilder) Build() []Option
type OptionsResolver ¶
type OptionsResolver struct {
// contains filtered or unexported fields
}
func NewOptionsResolver ¶
func NewOptionsResolver(options ...Option) (optsResolver *OptionsResolver)
func (*OptionsResolver) Get ¶
func (or *OptionsResolver) Get(key string) (value interface{}, has bool)
func (*OptionsResolver) GetCircuitBreaker ¶
func (or *OptionsResolver) GetCircuitBreaker() (breakerInfo *clients.BreakerInfo, has bool)
func (*OptionsResolver) GetRetryInfo ¶
func (or *OptionsResolver) GetRetryInfo() (retryInfo *clients.RetryInfo, has bool)
type Producer ¶
type Producer interface { // Send function sends an individual message to the url Send(*url.URL, Message, ...Option) error // SendBatch sends a batch of messages to the url SendBatch(*url.URL, []Message, ...Option) error }
Producer interface is used to send message(s) to a specific provider
type Provider ¶
type Provider interface { // Producer Interface included Producer // Receiver interface included Receiver // Schemes is array of URL schemes supported by this provider Schemes() []string // Setup method called Setup() // NewMessage function creates a new message that can be used by the clients. It expects the scheme to be provided NewMessage(string, ...Option) (Message, error) }
Provider interface exposes methods for a messaging provider It includes Producer and Receiver interfaces It also includes Schemes method to get the supported schemes, Setup method to perform initial setup and NewMessage method to create a new message
type Receiver ¶
type Receiver interface { // Receive function performs on-demand receive of a single message. // This function may or may not wait for the messages to arrive. This is purely dependent on the implementation. Receive(*url.URL, ...Option) (Message, error) // ReceiveBatch function performs on-demand receive of a batch of messages. // This function may or may not wait for the messages to arrive. This is purely dependent on the implementation. ReceiveBatch(*url.URL, ...Option) ([]Message, error) // AddListener registers a listener for the message AddListener(*url.URL, func(msg Message), ...Option) error }
Receiver interface provides the functions for receiving a message(s)