messaging

package
v1.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 25, 2024 License: MIT Imports: 9 Imported by: 0

README

MESSAGING Client

This is a flexible and extensible messaging client designed to provide a unified interface for producing and consuming messages across various messaging platforms. It allows developers to seamlessly integrate messaging functionality into their applications without being tied to a specific messaging service.



Features

  • General producer interface for sending messages to different messaging platforms.
  • General consumer interface for receiving and processing messages from different messaging platforms.
  • Supports QualityOfService features such as
    • CircuitBreaker
    • Retry
  • Easy-to-use message abstraction for consistent handling of messages across platforms.
  • Can be extended to multiple messaging platforms with easily pluggable interfaces, including:
    • AMQP (Advanced Message Queuing Protocol)
    • Apache Kafka
    • AWS SNS (Simple Notification Service)
    • AWS SQS (Simple Queue Service)
    • GCM (Google Cloud Messaging)
    • GCP Pub/Sub (Google Cloud Pub/Sub)

Installation

To install the messaging client, use the following command:

go get oss.nandlabs.io/golly/clients/messaging

Usage

  1. Import the library into your Go project:
    import "oss.nandlabs.io/golly/clients/messaging"
    
  2. Initialize the messaging provider for a specific platform. For example, to use the AMQP extension:
    type AMQPProvider struct {} // implements the Provider interface defined under the library
    amqpProvider := &AMQPProvider{}
    
    manager := messaging.Get()
    manager.Register(amqpProvider)
    
  3. Send a message
    message := &messaging.Message{
      Body: []byte("Hello, World!"), 
      /// Add any additional properties or metadata
    }
    destination := url.Parse("amqp://guest:password@localhost:5672/myvhost")
    err := manager.Send(destination, message)
    if err != nil {
      // Handle error
    }
    
  4. Receive a message
    // Define the onReceive function
    onReceive := func(msg Message) error {
        // Process the message
        // ...
    
     return nil
    }
    // Start receiving messages from the channel
    manager.Receive(receiverUrl, onReceive)
    
  5. Repeat steps 2-4 for other messaging platforms by initializing the respective clients.

Extending the library

To add support for additional messaging platforms, you can create new extensions by implementing the producer, consumer, and message interfaces defined in the library. These interfaces provide a consistent way to interact with different messaging systems.

You can refer to the existing extensions, such as amqp, kafka, etc., as examples for creating your own extensions. Ensure that your extension adheres to the interface definitions and follows the library's conventions for consistency.

Documentation

Overview

Package messaging provides a set of utilities for working with messaging systems. It includes functionality for sending and receiving messages, as well as managing message queues. This package supports various messaging protocols, including AMQP and MQTT. It provides a simple and consistent API for interacting with different messaging systems. The `Sender` type is used for sending messages, while the `Receiver` type is used for receiving messages. Both types provide methods for connecting to a messaging server, sending/receiving messages, and closing the connection.

Note: This package requires a messaging server to be running in order to send/receive messages. Please refer to the documentation of the specific messaging protocol for more information on how to set up a server.

Index

Constants

View Source
const (
	CircuitBreakerOpts = "CircuitBreakerOption"
	RetryOpts          = "CircuitBreakerOption"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type BaseMessage

type BaseMessage struct {
	// contains filtered or unexported fields
}

func (*BaseMessage) GetBoolHeader

func (bm *BaseMessage) GetBoolHeader(key string) (value bool, exists bool)

func (*BaseMessage) GetFloat64Header

func (bm *BaseMessage) GetFloat64Header(key string) (value float64, exists bool)

func (*BaseMessage) GetFloatHeader

func (bm *BaseMessage) GetFloatHeader(key string) (value float32, exists bool)

func (*BaseMessage) GetHeader

func (bm *BaseMessage) GetHeader(key string) (value []byte, exists bool)

func (*BaseMessage) GetInt16Header

func (bm *BaseMessage) GetInt16Header(key string) (value int16, exists bool)

func (*BaseMessage) GetInt32Header

func (bm *BaseMessage) GetInt32Header(key string) (value int32, exists bool)

func (*BaseMessage) GetInt64Header

func (bm *BaseMessage) GetInt64Header(key string) (value int64, exists bool)

func (*BaseMessage) GetInt8Header

func (bm *BaseMessage) GetInt8Header(key string) (value int8, exists bool)

func (*BaseMessage) GetIntHeader

func (bm *BaseMessage) GetIntHeader(key string) (value int, exists bool)

func (*BaseMessage) GetStrHeader

func (bm *BaseMessage) GetStrHeader(key string) (value string, exists bool)

func (*BaseMessage) ReadAsStr

func (bm *BaseMessage) ReadAsStr() string

func (*BaseMessage) ReadBody

func (bm *BaseMessage) ReadBody() io.Reader

func (*BaseMessage) ReadBytes

func (bm *BaseMessage) ReadBytes() []byte

func (*BaseMessage) ReadContent

func (bm *BaseMessage) ReadContent(out interface{}, contentType string) (err error)

func (*BaseMessage) ReadJSON

func (bm *BaseMessage) ReadJSON(out interface{}) (err error)

func (*BaseMessage) ReadXML

func (bm *BaseMessage) ReadXML(out interface{}) (err error)

func (*BaseMessage) SetBodyBytes

func (bm *BaseMessage) SetBodyBytes(input []byte) (n int, err error)

func (*BaseMessage) SetBodyStr

func (bm *BaseMessage) SetBodyStr(input string) (n int, err error)

func (*BaseMessage) SetBoolHeader

func (bm *BaseMessage) SetBoolHeader(key string, value bool)

func (*BaseMessage) SetFloat64Header

func (bm *BaseMessage) SetFloat64Header(key string, value float64)

func (*BaseMessage) SetFloatHeader

func (bm *BaseMessage) SetFloatHeader(key string, value float32)

func (*BaseMessage) SetFrom

func (bm *BaseMessage) SetFrom(content io.Reader) (n int64, err error)

func (*BaseMessage) SetHeader

func (bm *BaseMessage) SetHeader(key string, value []byte)

func (*BaseMessage) SetInt16Header

func (bm *BaseMessage) SetInt16Header(key string, value int16)

func (*BaseMessage) SetInt32Header

func (bm *BaseMessage) SetInt32Header(key string, value int32)

func (*BaseMessage) SetInt64Header

func (bm *BaseMessage) SetInt64Header(key string, value int64)

func (*BaseMessage) SetInt8Header

func (bm *BaseMessage) SetInt8Header(key string, value int8)

func (*BaseMessage) SetIntHeader

func (bm *BaseMessage) SetIntHeader(key string, value int)

func (*BaseMessage) SetStrHeader

func (bm *BaseMessage) SetStrHeader(key string, value string)

func (*BaseMessage) WriteContent

func (bm *BaseMessage) WriteContent(input interface{}, contentType string) (err error)

func (*BaseMessage) WriteJSON

func (bm *BaseMessage) WriteJSON(input interface{}) (err error)

func (*BaseMessage) WriteXML

func (bm *BaseMessage) WriteXML(input interface{}) (err error)

type Body

type Body interface {
	// SetBodyStr sets the string body to the Message structure
	SetBodyStr(in string) (int, error)
	// SetBodyBytes sets the byte[] body to the Message structure
	SetBodyBytes(int []byte) (int, error)
	// SetFrom sets the Reader body to the Message structure
	SetFrom(content io.Reader) (int64, error)
	// WriteJSON sets the JSON body to the Message structure
	WriteJSON(int interface{}) error
	// WriteXML sets the XML body to the Message structure
	WriteXML(in interface{}) error
	// WriteContent sets the custom body type based on the contentType to the Message structure
	WriteContent(in interface{}, contentType string) error

	// ReadBody reads the Reader body from the Message structure
	ReadBody() io.Reader
	// ReadBytes reads the []byte body from the Message structure
	ReadBytes() []byte
	// ReadAsStr reads the string body from the Message structure
	ReadAsStr() string
	// ReadJSON reads the JSON body from the Message structure
	ReadJSON(out interface{}) error
	// ReadXML reads the XML body from the Message structure
	ReadXML(out interface{}) error
	// ReadContent reads the content body based on the contentType from the Message structure
	ReadContent(out interface{}, contentType string) error
}

Body defines all the body interfaces required by the body of the messaging client

type Header interface {
	// SetHeader sets the byte header value for the Message header
	SetHeader(key string, value []byte)
	// SetStrHeader sets the string header value for the Message header
	SetStrHeader(key string, value string)
	// SetBoolHeader sets the boolean header value for the Message header
	SetBoolHeader(key string, value bool)
	// SetIntHeader sets the int header value for the Message header
	SetIntHeader(key string, value int)
	// SetInt8Header sets the int8 header value for the Message header
	SetInt8Header(key string, value int8)
	// SetInt16Header sets the int16 header value for the Message header
	SetInt16Header(key string, value int16)
	// SetInt32Header sets the int32 header value for the Message header
	SetInt32Header(key string, value int32)
	// SetInt64Header sets the int64 header value for the Message header
	SetInt64Header(key string, value int64)
	// SetFloatHeader sets the float32 header value for the Message header
	SetFloatHeader(key string, value float32)
	// SetFloat64Header sets the float64 header value for the Message header
	SetFloat64Header(key string, value float64)

	// GetHeader returns the value of the key set in the headers if exists in the byte[] value
	GetHeader(key string) (value []byte, exists bool)
	// GetStrHeader returns the value of the key set in the headers if exists in the string value
	GetStrHeader(key string) (value string, exists bool)
	// GetBoolHeader returns the value of the key set in the headers if exists in the bool value
	GetBoolHeader(key string) (value bool, exists bool)
	// GetIntHeader returns the value of the key set in the headers if exists in the int value
	GetIntHeader(key string) (value int, exists bool)
	// GetInt8Header returns the value of the key set in the headers if exists in the int8 value
	GetInt8Header(key string) (value int8, exists bool)
	// GetInt16Header returns the value of the key set in the headers if exists in the int16 value
	GetInt16Header(key string) (value int16, exists bool)
	// GetInt32Header returns the value of the key set in the headers if exists in the int32 value
	GetInt32Header(key string) (value int32, exists bool)
	// GetInt64Header returns the value of the key set in the headers if exists in the int64 value
	GetInt64Header(key string) (value int64, exists bool)
	// GetFloatHeader returns the value of the key set in the headers if exists in the float32 value
	GetFloatHeader(key string) (value float32, exists bool)
	// GetFloat64Header returns the value of the key set in the headers if exists in the float64 value
	GetFloat64Header(key string) (value float64, exists bool)
}

Header defines all the header interfaces required by the messaging clients

type LocalMessage

type LocalMessage struct {
	*BaseMessage
}

func NewLocalMessage

func NewLocalMessage() *LocalMessage

func (*LocalMessage) Rsvp

func (lm *LocalMessage) Rsvp(yes bool, options ...Option) (err error)

type LocalProvider

type LocalProvider struct {
	// contains filtered or unexported fields
}

LocalProvider is an implementation of the Provider interface

func (*LocalProvider) AddListener

func (lp *LocalProvider) AddListener(url *url.URL, listener func(msg Message), options ...Option) (err error)

func (*LocalProvider) NewMessage

func (lp *LocalProvider) NewMessage(scheme string, options ...Option) (msg Message, err error)

func (*LocalProvider) Receive

func (lp *LocalProvider) Receive(url *url.URL, options ...Option) (msg Message, err error)

func (*LocalProvider) ReceiveBatch

func (lp *LocalProvider) ReceiveBatch(url *url.URL, options ...Option) (msgs []Message, err error)

func (*LocalProvider) Schemes

func (lp *LocalProvider) Schemes() (schemes []string)

func (*LocalProvider) Send

func (lp *LocalProvider) Send(url *url.URL, msg Message, options ...Option) (err error)

func (*LocalProvider) SendBatch

func (lp *LocalProvider) SendBatch(url *url.URL, msgs []Message, options ...Option) (err error)

func (*LocalProvider) Setup

func (lp *LocalProvider) Setup()

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager struct is used to manage the known Messaging providers. It includes a mutex to handle concurrent access to the known providers

func (*Manager) AddListener

func (m *Manager) AddListener(u *url.URL, listener func(msg Message), options ...Option) (err error)

AddListener registers a listener for the message using the appropriate provider

func (*Manager) NewMessage

func (m *Manager) NewMessage(scheme string, options ...Option) (msg Message, err error)

NewMessage creates a new message using the appropriate provider

func (*Manager) Receive

func (m *Manager) Receive(u *url.URL, options ...Option) (msg Message, err error)

Receive receives a single message using the appropriate provider

func (*Manager) ReceiveBatch

func (m *Manager) ReceiveBatch(u *url.URL, options ...Option) (msgs []Message, err error)

ReceiveBatch receives a batch of messages using the appropriate provider

func (*Manager) Register

func (m *Manager) Register(provider Provider)

Register registers a messaging provider with the manager

func (*Manager) Schemes

func (m *Manager) Schemes() (schemes []string)

Schemes returns the supported URL schemes by the known providers

func (*Manager) Send

func (m *Manager) Send(u *url.URL, msg Message, options ...Option) (err error)

Send is a helper function that sends a message using the appropriate provider

func (*Manager) SendBatch

func (m *Manager) SendBatch(u *url.URL, msgs []Message, options ...Option) (err error)

SendBatch sends a batch of messages using the appropriate provider

func (*Manager) Setup

func (m *Manager) Setup()

Setup performs the initial setup of the messaging manager

type Message

type Message interface {
	Header
	Body
	// Rsvp function provides a facade to acknowledge the message to the provider indicating the acceptance or rejection
	//as mentioned by the first bool parameter.
	//Additional options can be set for indicating further actions.
	//This functionality is purely dependent on the capability of the provider to accept an acknowledgement.
	Rsvp(bool, ...Option) error
}

Message interface wil be implemented by all third party implementation such as aws - sns, sqs, gcp -> pub/sub, gcm, messaging -> amqp, kafka

type Messaging

type Messaging interface {
	Provider
	Register(Provider)
}

Messaging interface defines an abstraction for messaging providers that can be registered

func Get

func Get() Messaging

Get returns the facade messaging instance

type Option

type Option struct {
	Key   string
	Value interface{}
}

type OptionsBuilder

type OptionsBuilder struct {
	// contains filtered or unexported fields
}

func NewOptionsBuilder

func NewOptionsBuilder() *OptionsBuilder

TODO check if we can pool this for performance

func (*OptionsBuilder) Add

func (ob *OptionsBuilder) Add(key string, value interface{}) *OptionsBuilder

TODO check if you need to pool this for performance

func (*OptionsBuilder) AddCircuitBreaker

func (ob *OptionsBuilder) AddCircuitBreaker(failureThreshold, successThreshold uint64, maxHalfOpen,
	timeout uint32) *OptionsBuilder

func (*OptionsBuilder) AddRetryHandler

func (ob *OptionsBuilder) AddRetryHandler(maxRetries, wait int) *OptionsBuilder

func (*OptionsBuilder) Build

func (ob *OptionsBuilder) Build() []Option

type OptionsResolver

type OptionsResolver struct {
	// contains filtered or unexported fields
}

func NewOptionsResolver

func NewOptionsResolver(options ...Option) (optsResolver *OptionsResolver)

func (*OptionsResolver) Get

func (or *OptionsResolver) Get(key string) (value interface{}, has bool)

func (*OptionsResolver) GetCircuitBreaker

func (or *OptionsResolver) GetCircuitBreaker() (breakerInfo *clients.BreakerInfo, has bool)

func (*OptionsResolver) GetRetryInfo

func (or *OptionsResolver) GetRetryInfo() (retryInfo *clients.RetryInfo, has bool)

type Producer

type Producer interface {
	// Send function sends an individual message to the url
	Send(*url.URL, Message, ...Option) error
	// SendBatch sends a batch of messages to the url
	SendBatch(*url.URL, []Message, ...Option) error
}

Producer interface is used to send message(s) to a specific provider

type Provider

type Provider interface {
	// Producer Interface included
	Producer
	// Receiver interface included
	Receiver
	// Schemes is array of URL schemes supported by this provider
	Schemes() []string
	// Setup method called
	Setup()
	// NewMessage function creates a new message that can be used by the clients. It expects the scheme to be provided
	NewMessage(string, ...Option) (Message, error)
}

Provider interface exposes methods for a messaging provider It includes Producer and Receiver interfaces It also includes Schemes method to get the supported schemes, Setup method to perform initial setup and NewMessage method to create a new message

type Receiver

type Receiver interface {
	// Receive function performs on-demand receive of a single message.
	// This function may or may not wait for the messages to arrive. This is purely dependent on the implementation.
	Receive(*url.URL, ...Option) (Message, error)
	// ReceiveBatch function performs on-demand receive of a batch of messages.
	// This function may or may not wait for the messages to arrive. This is purely dependent on the implementation.
	ReceiveBatch(*url.URL, ...Option) ([]Message, error)
	// AddListener registers a listener for the message
	AddListener(*url.URL, func(msg Message), ...Option) error
}

Receiver interface provides the functions for receiving a message(s)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL