messaging

package
v1.2.109 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 4, 2024 License: MIT Imports: 7 Imported by: 10

Documentation

Index

Constants

View Source
const EntityMessageTopic = "ENTITY"

Variables

This section is empty.

Functions

This section is empty.

Types

type BaseMessage

type BaseMessage struct {
	MsgTopic     string `json:"topic"`     // Message topic (channel)
	MsgOpCode    int    `json:"opCode"`    // Message op code
	MsgVersion   string `json:"version"`   // Message op code
	MsgAddressee string `json:"addressee"` // Message final addressee
	MsgSessionId string `json:"sessionId"` // Session id shared across all messages related to the same session
}

BaseMessage base implementation of IMessage interface

func (*BaseMessage) Addressee

func (m *BaseMessage) Addressee() string

func (*BaseMessage) OpCode

func (m *BaseMessage) OpCode() int

func (*BaseMessage) Payload

func (m *BaseMessage) Payload() any

func (*BaseMessage) SessionId

func (m *BaseMessage) SessionId() string

func (*BaseMessage) Topic

func (m *BaseMessage) Topic() string

func (*BaseMessage) Version added in v1.2.52

func (m *BaseMessage) Version() string

type EntityMessage added in v1.2.43

type EntityMessage struct {
	BaseMessage
	MsgPayload any `json:"payload"` // Payload
}

EntityMessage general entity message implementation of IMessage interface

func (*EntityMessage) Payload added in v1.2.43

func (m *EntityMessage) Payload() any

type IMessage

type IMessage interface {
	// Topic name (also known as channel or queue)
	Topic() string

	// OpCode message operational code
	OpCode() int

	// Addressee message final addressee (recipient) - optional field
	Addressee() string

	// SessionId identifies a message exchange session which is shared across all messages related to the same session
	SessionId() string

	// Version identifies a message version
	Version() string

	// Payload is the message body
	Payload() any
}

IMessage General message interface

func GetMessage added in v1.2.57

func GetMessage[T any](topic string, payload T) IMessage

func NewEntityMessage added in v1.2.44

func NewEntityMessage() IMessage

NewEntityMessage is a message factory

func NewMessage added in v1.2.57

func NewMessage[T any]() IMessage

type IMessageBus

type IMessageBus interface {

	// Closer includes method Close()
	io.Closer

	// Ping Test connectivity for retries number of time with time interval (in seconds) between retries
	Ping(retries uint, intervalInSeconds uint) error

	// CloneMessageBus Returns a clone (copy) of the instance
	CloneMessageBus() (IMessageBus, error)

	// Publish messages to a channel (topic)
	Publish(messages ...IMessage) error

	// Subscribe on topics and return subscriberId
	Subscribe(subscription string, mf MessageFactory, callback SubscriptionCallback, topics ...string) (string, error)

	// Unsubscribe with the given subscriber id
	Unsubscribe(subscriptionId string) bool

	// Push Append one or multiple messages to a queue
	Push(messages ...IMessage) error

	// Pop Remove and get the last message in a queue or block until timeout expires
	Pop(mf MessageFactory, timeout time.Duration, queue ...string) (IMessage, error)

	// CreateProducer creates message producer for a specific topic
	CreateProducer(topic string) (IMessageProducer, error)

	// CreateConsumer creates message consumer for a specific topic
	CreateConsumer(subscription string, mf MessageFactory, topics ...string) (IMessageConsumer, error)
}

IMessageBus Message bus interface

func NewInMemoryMessageBus

func NewInMemoryMessageBus() (mq IMessageBus, err error)

NewInMemoryMessageBus Factory method

type IMessageConsumer added in v1.2.47

type IMessageConsumer interface {
	// Closer includes method Close()
	io.Closer

	// Read message from topic, blocks until a new message arrive or until timeout expires
	// Use 0 instead of time.Duration for unlimited time
	// The standard way to use Read is by using infinite loop:
	//
	//	for {
	//		if msg, err := consumer.Read(time.Second * 5); err != nil {
	//			// Handle error
	//		} else {
	//			// Process message in a dedicated go routine
	//			go processTisMessage(msg)
	//		}
	//	}
	//
	Read(timeout time.Duration) (IMessage, error)
}

IMessageConsumer Message bus consumer (reader) interface

type IMessageProducer added in v1.2.2

type IMessageProducer interface {

	// Closer includes method Close()
	io.Closer

	// Publish messages to a producer channel (topic)
	Publish(messages ...IMessage) error
}

IMessageProducer Message bus producer interface

type InMemoryMessageBus

type InMemoryMessageBus struct {
	// contains filtered or unexported fields
}

InMemoryMessageBus represents in memory implementation of IMessageBus interface topics is a map ot topic -> array of channels (channel per subscriber)

func (*InMemoryMessageBus) CloneMessageBus added in v1.2.62

func (m *InMemoryMessageBus) CloneMessageBus() (IMessageBus, error)

CloneMessageBus Returns a clone (copy) of the instance

func (*InMemoryMessageBus) Close added in v1.2.1

func (m *InMemoryMessageBus) Close() error

Close client and free resources

func (*InMemoryMessageBus) CreateConsumer added in v1.2.47

func (m *InMemoryMessageBus) CreateConsumer(subscription string, mf MessageFactory, topics ...string) (IMessageConsumer, error)

CreateConsumer creates message consumer for a specific topic

func (*InMemoryMessageBus) CreateProducer added in v1.2.2

func (m *InMemoryMessageBus) CreateProducer(topic string) (IMessageProducer, error)

CreateProducer creates message producer for specific topic

func (*InMemoryMessageBus) Ping

func (m *InMemoryMessageBus) Ping(retries uint, intervalInSeconds uint) error

Ping Test connectivity for retries number of time with time interval (in seconds) between retries

func (*InMemoryMessageBus) Pop

func (m *InMemoryMessageBus) Pop(mf MessageFactory, timeout time.Duration, queue ...string) (IMessage, error)

Pop Remove and get the last message in a queue or block until timeout expires

func (*InMemoryMessageBus) Publish

func (m *InMemoryMessageBus) Publish(messages ...IMessage) error

Publish messages to a channel (topic)

func (*InMemoryMessageBus) Push

func (m *InMemoryMessageBus) Push(messages ...IMessage) error

Push Append one or multiple messages to a queue

func (*InMemoryMessageBus) Subscribe

func (m *InMemoryMessageBus) Subscribe(subscription string, mf MessageFactory, callback SubscriptionCallback, topics ...string) (subscriptionId string, error error)

Subscribe on topics

func (*InMemoryMessageBus) Unsubscribe

func (m *InMemoryMessageBus) Unsubscribe(subscriptionId string) (success bool)

Unsubscribe with the given subscriber id

type InMemoryMessageConsumer added in v1.2.47

type InMemoryMessageConsumer struct {
	// contains filtered or unexported fields
}

func (*InMemoryMessageConsumer) Close added in v1.2.47

func (m *InMemoryMessageConsumer) Close() error

Close client and free resources

func (*InMemoryMessageConsumer) Read added in v1.2.47

func (m *InMemoryMessageConsumer) Read(timeout time.Duration) (IMessage, error)

Read message from topic, blocks until a new message arrive or until timeout

type Message added in v1.2.42

type Message[T any] struct {
	BaseMessage
	MsgPayload T `json:"payload"` // Data model
}

Message is a generic message structure

type MessageFactory

type MessageFactory func() IMessage

MessageFactory is a factory method of any message

type SubscriptionCallback

type SubscriptionCallback func(msg IMessage) bool

SubscriptionCallback Message subscription callback function, return true for ack

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL