managers

package
v0.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 26, 2020 License: MIT Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	DefaultMaxWaitToSend = 5 * time.Second
)

Functions

This section is empty.

Types

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager implements a concept which allows separation of actual network transport protocol management from the actual pubsub operation of subscription and message delivery.

It uses a TransportManager to support multiple subscribers per topic (which somewhat limits the capacity of tracking if each subscriber received all messages and redelivery) but allows us use multiple protocols to talk to the same mailer transportManager to handle message delivery to and from these protocols.

func NewManager

func NewManager(config ManagerConfig) *Manager

func (*Manager) Codec

func (gp *Manager) Codec() sabuhp.Codec

func (*Manager) Config

func (gp *Manager) Config() ServiceConfig

func (*Manager) Conn

func (gp *Manager) Conn() sabuhp.Conn

func (*Manager) Ctx

func (gp *Manager) Ctx() context.Context

func (*Manager) HandleSocketBytesMessage

func (gp *Manager) HandleSocketBytesMessage(message []byte, socket sabuhp.Socket) error

func (*Manager) HandleSocketBytesMessageFromOverriding

func (gp *Manager) HandleSocketBytesMessageFromOverriding(message []byte, socket sabuhp.Socket, overridingTransport sabuhp.Transport) error

func (*Manager) HandleSocketMessage

func (gp *Manager) HandleSocketMessage(msg *sabuhp.Message, socket sabuhp.Socket) error

func (*Manager) Listen

func (gp *Manager) Listen(topic string, handler sabuhp.TransportResponse) sabuhp.Channel

Listen creates a local subscription for listening to an underline message for a giving topic.

func (*Manager) ManageSocketClosed

func (gp *Manager) ManageSocketClosed(socket sabuhp.Socket)

func (*Manager) ManageSocketOpened

func (gp *Manager) ManageSocketOpened(socket sabuhp.Socket)

func (*Manager) SendToAll

func (gp *Manager) SendToAll(data *sabuhp.Message, timeout time.Duration) error

SendToAll delivers to all listeners the provided message within specific timeout.

func (*Manager) SendToOne

func (gp *Manager) SendToOne(data *sabuhp.Message, timeout time.Duration) error

SendToOne selects a random recipient which will receive the message to be delivered for processing.

func (*Manager) SocketListenToTopic

func (gp *Manager) SocketListenToTopic(topic string, socket sabuhp.Socket)

func (*Manager) Stop

func (gp *Manager) Stop()

func (*Manager) Wait

func (gp *Manager) Wait()

type ManagerConfig

type ManagerConfig struct {
	ID            nxid.ID
	Transport     sabuhp.Transport
	Codec         sabuhp.Codec
	Ctx           context.Context
	Logger        sabuhp.Logger
	OnClosure     SocketNotification
	OnOpen        SocketNotification
	MaxWaitToSend time.Duration
}

type ServiceConfig

type ServiceConfig struct {
	Ctx       context.Context
	Handler   sabuhp.MessageHandler
	OnClosure SocketNotification
	OnOpen    SocketNotification
}

ServiceConfig provides attributes that a transport can use to deliver sockets to a Manager.

This intends allow other transport protocols (think http, tcp) take provided config and route their messages through the transportManager.

type SocketNotification

type SocketNotification func(socket sabuhp.Socket)

type TransportManager

type TransportManager struct {
	Transport sabuhp.Transport
	// contains filtered or unexported fields
}

TransportManager wraps a transport object exposes a wrapper that underline manages multiple subscriptions on a per topic basis.

func NewTransportManager

func NewTransportManager(ctx context.Context, transport sabuhp.Transport, logger sabuhp.Logger) *TransportManager

func (*TransportManager) Conn

func (tm *TransportManager) Conn() sabuhp.Conn

func (*TransportManager) Listen

func (tm *TransportManager) Listen(topic string, handler sabuhp.TransportResponse) sabuhp.Channel

func (*TransportManager) ListenWithId

func (tm *TransportManager) ListenWithId(id nxid.ID, topic string, handler sabuhp.TransportResponse) sabuhp.Channel

ListenWithId creates a listener using specified id, this allows the unique id to represent a possible subscription.

func (*TransportManager) Send

func (tm *TransportManager) Send(message *sabuhp.Message, transport sabuhp.Transport) sabuhp.MessageErr

func (*TransportManager) SendToAll

func (tm *TransportManager) SendToAll(data *sabuhp.Message, timeout time.Duration) error

SendToAll implements the Transport interface but handles things in a different way. If the message has an overriding transport from its originator then that is used to send the message first before attempting to deliver message to all other channels matching giving topic.

func (*TransportManager) SendToOne

func (tm *TransportManager) SendToOne(data *sabuhp.Message, timeout time.Duration) error

SendToOne implements the Transport interface but handles things in a different way. If the message has an overriding transport from its originator then that is used to send the message, which bypasses all internal channels or subscriptions.

func (*TransportManager) SendWithTimeout

func (tm *TransportManager) SendWithTimeout(message *sabuhp.Message, transport sabuhp.Transport, timeout time.Duration) sabuhp.MessageErr

func (*TransportManager) UnlistenAllWithId

func (tm *TransportManager) UnlistenAllWithId(id nxid.ID)

UnlistenAllWithId sends a signal to remove possible handler with giving id from all topics.

func (*TransportManager) UnlistenWithId

func (tm *TransportManager) UnlistenWithId(topic string, id nxid.ID)

UnlistenWithId sends a signal to remove possible handler with giving id from specific topic.

func (*TransportManager) Wait

func (tm *TransportManager) Wait()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL