queues

package
v0.0.1-2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 24, 2023 License: MIT Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type NatsAbstractMessageQueue

type NatsAbstractMessageQueue struct {
	*cqueues.MessageQueue

	//The dependency resolver.
	DependencyResolver *cref.DependencyResolver
	//The logger.
	Logger *clog.CompositeLogger
	//The NATS connection component.
	Connection *connect.NatsConnection
	//The NATS connection object.
	Client *nats.Conn

	// SerializeEnvelop bool
	Subject    string
	QueueGroup string
	// contains filtered or unexported fields
}

Abstract NATS message queue with ability to connect to NATS server.

func InheritNatsAbstractMessageQueue

func InheritNatsAbstractMessageQueue(overrides cqueues.IMessageQueueOverrides, name string, capabilities *cqueues.MessagingCapabilities) *NatsAbstractMessageQueue

Creates a new instance of the queue component.

  • overrides a queue overrides
  • name (optional) a queue name.

func (*NatsAbstractMessageQueue) Abandon

func (c *NatsAbstractMessageQueue) Abandon(ctx context.Context, message *cqueues.MessageEnvelope) (err error)

Abandon method are returnes message into the queue and makes it available for all subscribers to receive it again. This method is usually used to return a message which could not be processed at the moment to repeat the attempt. Messages that cause unrecoverable errors shall be removed permanently or/and send to dead letter queue. Important: This method is not supported by NATS.

Parameters:
	- ctx context.Context	operation context
	- message *cqueues.MessageEnvelope  a message to return.

Returns: error

error or nil for success.

func (*NatsAbstractMessageQueue) CheckOpen

func (c *NatsAbstractMessageQueue) CheckOpen(traceId string) error

func (*NatsAbstractMessageQueue) Clear

Clear method are clears component state.

Parameters:
	- ctx context.Context	transaction id to trace execution through call chain.

Returns error or nil no errors occured.

func (*NatsAbstractMessageQueue) Close

func (c *NatsAbstractMessageQueue) Close(ctx context.Context) (err error)

Closes component and frees used resources.

Parameters:
	- ctx context.Context	transaction id to trace execution through call chain.
	- Returns 			error or nil no errors occured.

func (*NatsAbstractMessageQueue) Complete

func (c *NatsAbstractMessageQueue) Complete(ctx context.Context, message *cqueues.MessageEnvelope) (err error)

Complete method are permanently removes a message from the queue. This method is usually used to remove the message after successful processing. Important: This method is not supported by NATS.

Parameters:
	- ctx context.Context	operation context
	- message  *cqueues.MessageEnvelope a message to remove.

Returns: error error or nil for success.

func (*NatsAbstractMessageQueue) Configure

func (c *NatsAbstractMessageQueue) Configure(ctx context.Context, config *cconf.ConfigParams)

Configures component by passing configuration parameters.

Parameters:
	- ctx context.Context	operation context
	- config    configuration parameters to be set.

func (*NatsAbstractMessageQueue) FromMessage

func (c *NatsAbstractMessageQueue) FromMessage(message *cqueues.MessageEnvelope) (*nats.Msg, error)

Converts MessageEnvelope to NATs message structure

Parameters:
	- message *cqueues.MessageEnvelope message object

Returns: NATs message structure

func (*NatsAbstractMessageQueue) IsOpen

func (c *NatsAbstractMessageQueue) IsOpen() bool

Checks if the component is opened. Returns true if the component has been opened and false otherwise.

func (*NatsAbstractMessageQueue) MoveToDeadLetter

func (c *NatsAbstractMessageQueue) MoveToDeadLetter(ctx context.Context, message *cqueues.MessageEnvelope) (err error)

Permanently removes a message from the queue and sends it to dead letter queue. Important: This method is not supported by NATS.

Parameters:
	- ctx context.Context	operation context
	- message  *cqueues.MessageEnvelope a message to be removed.

Returns: error

error or nil for success.

func (*NatsAbstractMessageQueue) Open

func (c *NatsAbstractMessageQueue) Open(ctx context.Context) (err error)

Opens the component.

Parameters:
	- ctx context.Context	transaction id to trace execution through call chain.
	- Returns 			 error or nil no errors occured.

func (*NatsAbstractMessageQueue) OpenWithParams

func (c *NatsAbstractMessageQueue) OpenWithParams(ctx context.Context, connections []*cconn.ConnectionParams,
	credential *cauth.CredentialParams) error

OpenWithParams method are opens the component with given connection and credential parameters.

Parameters:
	- ctx context.Context	transaction id to trace execution through call chain.
	- connections        connection parameters
	- credential        credential parameters

Returns error or nil no errors occured.

func (*NatsAbstractMessageQueue) ReadMessageCount

func (c *NatsAbstractMessageQueue) ReadMessageCount() (int64, error)

ReadMessageCount method are reads the current number of messages in the queue to be delivered. Returns number of messages or error.

func (*NatsAbstractMessageQueue) RenewLock

func (c *NatsAbstractMessageQueue) RenewLock(ctx context.Context, message *cqueues.MessageEnvelope, lockTimeout time.Duration) (err error)

RenewLock method are renews a lock on a message that makes it invisible from other receivers in the queue. This method is usually used to extend the message processing time. Important: This method is not supported by NATS.

Parameters:
	- ctx context.Context	operation context
	- message   *cqueues.MessageEnvelope    a message to extend its lock.
	- lockTimeout  time.Duration  a locking timeout in milliseconds.

Returns: error receives an error or nil for success.

func (*NatsAbstractMessageQueue) Send

Send method are sends a message into the queue.

Parameters:
	- ctx context.Context	transaction id to trace execution through call chain.
	- envelope *cqueues.MessageEnvelope  a message envelop to be sent.

Returns: error or nil for success.

func (*NatsAbstractMessageQueue) SetReferences

func (c *NatsAbstractMessageQueue) SetReferences(ctx context.Context, references cref.IReferences)

Sets references to dependent components.

Parameters:
	- ctx context.Context	operation context
	- references 	references to locate the component dependencies.

func (*NatsAbstractMessageQueue) SubscriptionSubject

func (c *NatsAbstractMessageQueue) SubscriptionSubject() string

func (*NatsAbstractMessageQueue) ToMessage

func (c *NatsAbstractMessageQueue) ToMessage(msg *nats.Msg) (*cqueues.MessageEnvelope, error)

Converts NATs structure to MessageEnvelope

Parameters:
	- msg *nats.Msg message object

Returns: MessageEnvelope structure

func (*NatsAbstractMessageQueue) UnsetReferences

func (c *NatsAbstractMessageQueue) UnsetReferences()

Unsets (clears) previously set references to dependent components.

type NatsBareMessageQueue

type NatsBareMessageQueue struct {
	*NatsAbstractMessageQueue
	// contains filtered or unexported fields
}

NatsBareMessageQueue are message queue that sends and receives messages via NATS message broker.

Configuration parameters:

	- subject:                       name of NATS topic (subject) to subscribe
	- queue_group:                   name of NATS queue group
	- connection(s):
		- discovery_key:               (optional) a key to retrieve the connection from  IDiscovery
		- host:                        host name or IP address
		- port:                        port number
		- uri:                         resource URI or connection string with all parameters in it
	- credential(s):
		- store_key:                   (optional) a key to retrieve the credentials from  ICredentialStore
		- username:                    user name
		- password:                    user password
	- options:
		- serialize_message:    (optional) true to serialize entire message as JSON, false to send only message payload (default: true)
		- retry_connect:        (optional) turns on/off automated reconnect when connection is log (default: true)
		- max_reconnect:        (optional) maximum reconnection attempts (default: 3)
		- reconnect_timeout:    (optional) number of milliseconds to wait on each reconnection attempt (default: 3000)
		- flush_timeout:        (optional) number of milliseconds to wait on flushing messages (default: 3000)

References:

	- *:logger:*:*:1.0             (optional)  ILogger components to pass log messages
	- *:counters:*:*:1.0           (optional)  ICounters components to pass collected measurements
	- *:discovery:*:*:1.0          (optional)  IDiscovery services to resolve connections
	- *:credential-store:*:*:1.0   (optional) Credential stores to resolve credentials
	- *:connection:nats:*:1.0      (optional) Shared connection to NATS service

See MessageQueue See MessagingCapabilities

	Example:
		ctx := context.Background()
		queue := NewNatsBareMessageQueue("myqueue")
		queue.Configure(ctx, cconf.NewConfigParamsFromTuples(
			"subject", "mytopic",
			"queue_group", "mygroup",
			"connection.protocol", "nats"
			"connection.host", "localhost"
			"connection.port", 1883
		))

		_ = queue.Open(ctx)

   	_ = queue.Send(ctx,  NewMessageEnvelope("", "mymessage", "ABC"))

   	message, err := queue.Receive(ctx, 10000*time.Milliseconds)
	  	if (message != nil) {
	  		...
	  		queue.Complete(ctx, message);
	  	}

func NewNatsBareMessageQueue

func NewNatsBareMessageQueue(name string) *NatsBareMessageQueue

NewNatsBareMessageQueue are creates a new instance of the message queue. Parameters:

  • name string (optional) a queue name.

func (*NatsBareMessageQueue) EndListen

func (c *NatsBareMessageQueue) EndListen(ctx context.Context)

EndListen method are ends listening for incoming messages. When this method is call listen unblocks the thread and execution continues.

Parameters:
	- ctx context.Context	transaction id to trace execution through call chain.

func (*NatsBareMessageQueue) Listen

Listens for incoming messages and blocks the current thread until queue is closed.

Parameters:
	- ctx context.Context	 transaction id to trace execution through call chain.
	- receiver    cqueues.IMessageReceiver      a receiver to receive incoming messages.

See IMessageReceiver See receive

func (*NatsBareMessageQueue) Peek

Peek method are peeks a single incoming message from the queue without removing it. If there are no messages available in the queue it returns nil.

Parameters:
	- ctx context.Context	transaction id to trace execution through call chain.

Returns: result *cqueues.MessageEnvelope, err error message or error.

func (*NatsBareMessageQueue) PeekBatch

func (c *NatsBareMessageQueue) PeekBatch(ctx context.Context, messageCount int64) ([]*cqueues.MessageEnvelope, error)

PeekBatch method are peeks multiple incoming messages from the queue without removing them. If there are no messages available in the queue it returns an empty list. Important: This method is not supported by NATS.

Parameters:
	- ctx context.Context	transaction id to trace execution through call chain.
	- messageCount      a maximum number of messages to peek.

Returns: callback function that receives a list with messages or error.

func (*NatsBareMessageQueue) Receive

func (c *NatsBareMessageQueue) Receive(ctx context.Context, waitTimeout time.Duration) (*cqueues.MessageEnvelope, error)

Receive method are receives an incoming message and removes it from the queue.

Parameters:
	- ctx context.Context	transaction id to trace execution through call chain.
	- waitTimeout  time.Duration     a timeout in milliseconds to wait for a message to come.

Returns: result *cqueues.MessageEnvelope, err error receives a message or error.

type NatsMessageQueue

type NatsMessageQueue struct {
	*NatsAbstractMessageQueue
	// contains filtered or unexported fields
}

func NewNatsMessageQueue

func NewNatsMessageQueue(name string) *NatsMessageQueue

NewNatsMessageQueue are creates a new instance of the message queue.

Parameters:
	- ctx context.Context	operation context
	- name  string (optional) a queue name.

func (*NatsMessageQueue) Clear

func (c *NatsMessageQueue) Clear(ctx context.Context) (err error)

Clear method are clears component state.

Parameters:
	- ctx context.Context	transaction id to trace execution through call chain.

Returns error or nil no errors occured.

func (*NatsMessageQueue) Close

func (c *NatsMessageQueue) Close(ctx context.Context) error

Close method are Closes component and frees used resources.

Parameters:
	- ctx context.Context	transaction id to trace execution through call chain.

Returns error or nil no errors occured.

func (*NatsMessageQueue) Configure

func (c *NatsMessageQueue) Configure(ctx context.Context, config *cconf.ConfigParams)

Configures component by passing configuration parameters.

Parameters:
	- ctx context.Context	operation context
	- config    configuration parameters to be set.

func (*NatsMessageQueue) EndListen

func (c *NatsMessageQueue) EndListen(ctx context.Context)

EndListen method are ends listening for incoming messages. When this method is call listen unblocks the thread and execution continues.

Parameters:
	- ctx context.Context	transaction id to trace execution through call chain.

func (*NatsMessageQueue) Listen

func (c *NatsMessageQueue) Listen(ctx context.Context, receiver cqueues.IMessageReceiver) error

Listens for incoming messages and blocks the current thread until queue is closed.

Parameters:
	- ctx context.Context	transaction id to trace execution through call chain.
	- receiver    cqueues.IMessageReceiver      a receiver to receive incoming messages.

See IMessageReceiver See receive

func (*NatsMessageQueue) OnMessage

func (c *NatsMessageQueue) OnMessage(msg *nats.Msg)

Function thath process incoming messages

Parameters:
	- msg *nats.Msg	message from the NATs

func (*NatsMessageQueue) Open

func (c *NatsMessageQueue) Open(ctx context.Context) error

Opens the component with given connection and credential parameters.

Parameters:
	- ctx context.Context	transaction id to trace execution through call chain.

Returns error or nil no errors occured.

func (*NatsMessageQueue) Peek

Peek method are peeks a single incoming message from the queue without removing it. If there are no messages available in the queue it returns nil.

Parameters:
	- ctx context.Context	transaction id to trace execution through call chain.

Returns: result *cqueues.MessageEnvelope, err error message or error.

func (*NatsMessageQueue) PeekBatch

func (c *NatsMessageQueue) PeekBatch(ctx context.Context, messageCount int64) ([]*cqueues.MessageEnvelope, error)

PeekBatch method are peeks multiple incoming messages from the queue without removing them. If there are no messages available in the queue it returns an empty list. Important: This method is not supported by NATS.

Parameters:
	- ctx context.Context	transaction id to trace execution through call chain.
	- messageCount      a maximum number of messages to peek.

Returns: receives a list with messages or error.

func (*NatsMessageQueue) ReadMessageCount

func (c *NatsMessageQueue) ReadMessageCount() (count int64, err error)

ReadMessageCount method are reads the current number of messages in the queue to be delivered. Returns number of messages or error.

func (*NatsMessageQueue) Receive

func (c *NatsMessageQueue) Receive(ctx context.Context, waitTimeout time.Duration) (*cqueues.MessageEnvelope, error)

Receive method are receives an incoming message and removes it from the queue.

Parameters:
	- ctx context.Context	transaction id to trace execution through call chain.
	- waitTimeout  time.Duration     a timeout in milliseconds to wait for a message to come.

Returns: result *cqueues.MessageEnvelope, err error receives a message or error.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL