Documentation ¶
Index ¶
- type NatsAbstractMessageQueue
- func (c *NatsAbstractMessageQueue) Abandon(ctx context.Context, message *cqueues.MessageEnvelope) (err error)
- func (c *NatsAbstractMessageQueue) CheckOpen(correlationId string) error
- func (c *NatsAbstractMessageQueue) Clear(ctx context.Context, correlationId string) error
- func (c *NatsAbstractMessageQueue) Close(ctx context.Context, correlationId string) (err error)
- func (c *NatsAbstractMessageQueue) Complete(ctx context.Context, message *cqueues.MessageEnvelope) (err error)
- func (c *NatsAbstractMessageQueue) Configure(ctx context.Context, config *cconf.ConfigParams)
- func (c *NatsAbstractMessageQueue) FromMessage(message *cqueues.MessageEnvelope) (*nats.Msg, error)
- func (c *NatsAbstractMessageQueue) IsOpen() bool
- func (c *NatsAbstractMessageQueue) MoveToDeadLetter(ctx context.Context, message *cqueues.MessageEnvelope) (err error)
- func (c *NatsAbstractMessageQueue) Open(ctx context.Context, correlationId string) (err error)
- func (c *NatsAbstractMessageQueue) OpenWithParams(ctx context.Context, correlationId string, ...) error
- func (c *NatsAbstractMessageQueue) ReadMessageCount() (int64, error)
- func (c *NatsAbstractMessageQueue) RenewLock(ctx context.Context, message *cqueues.MessageEnvelope, ...) (err error)
- func (c *NatsAbstractMessageQueue) Send(ctx context.Context, correlationId string, envelop *cqueues.MessageEnvelope) error
- func (c *NatsAbstractMessageQueue) SetReferences(ctx context.Context, references cref.IReferences)
- func (c *NatsAbstractMessageQueue) SubscriptionSubject() string
- func (c *NatsAbstractMessageQueue) ToMessage(msg *nats.Msg) (*cqueues.MessageEnvelope, error)
- func (c *NatsAbstractMessageQueue) UnsetReferences()
- type NatsBareMessageQueue
- func (c *NatsBareMessageQueue) EndListen(ctx context.Context, correlationId string)
- func (c *NatsBareMessageQueue) Listen(ctx context.Context, correlationId string, receiver cqueues.IMessageReceiver) error
- func (c *NatsBareMessageQueue) Peek(ctx context.Context, correlationId string) (*cqueues.MessageEnvelope, error)
- func (c *NatsBareMessageQueue) PeekBatch(ctx context.Context, correlationId string, messageCount int64) ([]*cqueues.MessageEnvelope, error)
- func (c *NatsBareMessageQueue) Receive(ctx context.Context, correlationId string, waitTimeout time.Duration) (*cqueues.MessageEnvelope, error)
- type NatsMessageQueue
- func (c *NatsMessageQueue) Clear(ctx context.Context, correlationId string) (err error)
- func (c *NatsMessageQueue) Close(ctx context.Context, correlationId string) error
- func (c *NatsMessageQueue) Configure(ctx context.Context, config *cconf.ConfigParams)
- func (c *NatsMessageQueue) EndListen(ctx context.Context, correlationId string)
- func (c *NatsMessageQueue) Listen(ctx context.Context, correlationId string, receiver cqueues.IMessageReceiver) error
- func (c *NatsMessageQueue) OnMessage(msg *nats.Msg)
- func (c *NatsMessageQueue) Open(ctx context.Context, correlationId string) error
- func (c *NatsMessageQueue) Peek(ctx context.Context, correlationId string) (*cqueues.MessageEnvelope, error)
- func (c *NatsMessageQueue) PeekBatch(ctx context.Context, correlationId string, messageCount int64) ([]*cqueues.MessageEnvelope, error)
- func (c *NatsMessageQueue) ReadMessageCount() (count int64, err error)
- func (c *NatsMessageQueue) Receive(ctx context.Context, correlationId string, waitTimeout time.Duration) (*cqueues.MessageEnvelope, error)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type NatsAbstractMessageQueue ¶
type NatsAbstractMessageQueue struct { *cqueues.MessageQueue //The dependency resolver. DependencyResolver *cref.DependencyResolver //The logger. Logger *clog.CompositeLogger //The NATS connection component. Connection *connect.NatsConnection //The NATS connection object. Client *nats.Conn // SerializeEnvelop bool Subject string QueueGroup string // contains filtered or unexported fields }
Abstract NATS message queue with ability to connect to NATS server.
func InheritNatsAbstractMessageQueue ¶
func InheritNatsAbstractMessageQueue(overrides cqueues.IMessageQueueOverrides, name string, capabilities *cqueues.MessagingCapabilities) *NatsAbstractMessageQueue
Creates a new instance of the queue component.
- overrides a queue overrides
- name (optional) a queue name.
func (*NatsAbstractMessageQueue) Abandon ¶
func (c *NatsAbstractMessageQueue) Abandon(ctx context.Context, message *cqueues.MessageEnvelope) (err error)
Abandon method are returnes message into the queue and makes it available for all subscribers to receive it again. This method is usually used to return a message which could not be processed at the moment to repeat the attempt. Messages that cause unrecoverable errors shall be removed permanently or/and send to dead letter queue. Important: This method is not supported by NATS.
Parameters: - ctx context.Context operation context - message *cqueues.MessageEnvelope a message to return.
Returns: error
error or nil for success.
func (*NatsAbstractMessageQueue) CheckOpen ¶
func (c *NatsAbstractMessageQueue) CheckOpen(correlationId string) error
func (*NatsAbstractMessageQueue) Clear ¶
func (c *NatsAbstractMessageQueue) Clear(ctx context.Context, correlationId string) error
Clear method are clears component state.
Parameters: - ctx context.Context operation context - correlationId string (optional) transaction id to trace execution through call chain.
Returns error or nil no errors occured.
func (*NatsAbstractMessageQueue) Close ¶
func (c *NatsAbstractMessageQueue) Close(ctx context.Context, correlationId string) (err error)
Closes component and frees used resources.
Parameters: - ctx context.Context operation context - correlationId (optional) transaction id to trace execution through call chain. - Returns error or nil no errors occured.
func (*NatsAbstractMessageQueue) Complete ¶
func (c *NatsAbstractMessageQueue) Complete(ctx context.Context, message *cqueues.MessageEnvelope) (err error)
Complete method are permanently removes a message from the queue. This method is usually used to remove the message after successful processing. Important: This method is not supported by NATS.
Parameters: - ctx context.Context operation context - message *cqueues.MessageEnvelope a message to remove.
Returns: error error or nil for success.
func (*NatsAbstractMessageQueue) Configure ¶
func (c *NatsAbstractMessageQueue) Configure(ctx context.Context, config *cconf.ConfigParams)
Configures component by passing configuration parameters.
Parameters: - ctx context.Context operation context - config configuration parameters to be set.
func (*NatsAbstractMessageQueue) FromMessage ¶
func (c *NatsAbstractMessageQueue) FromMessage(message *cqueues.MessageEnvelope) (*nats.Msg, error)
Converts MessageEnvelope to NATs message structure
Parameters: - message *cqueues.MessageEnvelope message object
Returns: NATs message structure
func (*NatsAbstractMessageQueue) IsOpen ¶
func (c *NatsAbstractMessageQueue) IsOpen() bool
Checks if the component is opened. Returns true if the component has been opened and false otherwise.
func (*NatsAbstractMessageQueue) MoveToDeadLetter ¶
func (c *NatsAbstractMessageQueue) MoveToDeadLetter(ctx context.Context, message *cqueues.MessageEnvelope) (err error)
Permanently removes a message from the queue and sends it to dead letter queue. Important: This method is not supported by NATS.
Parameters: - ctx context.Context operation context - message *cqueues.MessageEnvelope a message to be removed.
Returns: error
error or nil for success.
func (*NatsAbstractMessageQueue) Open ¶
func (c *NatsAbstractMessageQueue) Open(ctx context.Context, correlationId string) (err error)
Opens the component.
Parameters: - ctx context.Context operation context - correlationId (optional) transaction id to trace execution through call chain. - Returns error or nil no errors occured.
func (*NatsAbstractMessageQueue) OpenWithParams ¶
func (c *NatsAbstractMessageQueue) OpenWithParams(ctx context.Context, correlationId string, connections []*cconn.ConnectionParams, credential *cauth.CredentialParams) error
OpenWithParams method are opens the component with given connection and credential parameters.
Parameters: - ctx context.Context operation context - correlationId (optional) transaction id to trace execution through call chain. - connections connection parameters - credential credential parameters
Returns error or nil no errors occured.
func (*NatsAbstractMessageQueue) ReadMessageCount ¶
func (c *NatsAbstractMessageQueue) ReadMessageCount() (int64, error)
ReadMessageCount method are reads the current number of messages in the queue to be delivered. Returns number of messages or error.
func (*NatsAbstractMessageQueue) RenewLock ¶
func (c *NatsAbstractMessageQueue) RenewLock(ctx context.Context, message *cqueues.MessageEnvelope, lockTimeout time.Duration) (err error)
RenewLock method are renews a lock on a message that makes it invisible from other receivers in the queue. This method is usually used to extend the message processing time. Important: This method is not supported by NATS.
Parameters: - ctx context.Context operation context - message *cqueues.MessageEnvelope a message to extend its lock. - lockTimeout time.Duration a locking timeout in milliseconds.
Returns: error receives an error or nil for success.
func (*NatsAbstractMessageQueue) Send ¶
func (c *NatsAbstractMessageQueue) Send(ctx context.Context, correlationId string, envelop *cqueues.MessageEnvelope) error
Send method are sends a message into the queue.
Parameters: - ctx context.Context operation context - correlationId string (optional) transaction id to trace execution through call chain. - envelope *cqueues.MessageEnvelope a message envelop to be sent.
Returns: error or nil for success.
func (*NatsAbstractMessageQueue) SetReferences ¶
func (c *NatsAbstractMessageQueue) SetReferences(ctx context.Context, references cref.IReferences)
Sets references to dependent components.
Parameters: - ctx context.Context operation context - references references to locate the component dependencies.
func (*NatsAbstractMessageQueue) SubscriptionSubject ¶
func (c *NatsAbstractMessageQueue) SubscriptionSubject() string
func (*NatsAbstractMessageQueue) ToMessage ¶
func (c *NatsAbstractMessageQueue) ToMessage(msg *nats.Msg) (*cqueues.MessageEnvelope, error)
Converts NATs structure to MessageEnvelope
Parameters: - msg *nats.Msg message object
Returns: MessageEnvelope structure
func (*NatsAbstractMessageQueue) UnsetReferences ¶
func (c *NatsAbstractMessageQueue) UnsetReferences()
Unsets (clears) previously set references to dependent components.
type NatsBareMessageQueue ¶
type NatsBareMessageQueue struct { *NatsAbstractMessageQueue // contains filtered or unexported fields }
NatsBareMessageQueue are message queue that sends and receives messages via NATS message broker.
Configuration parameters: - subject: name of NATS topic (subject) to subscribe - queue_group: name of NATS queue group - connection(s): - discovery_key: (optional) a key to retrieve the connection from IDiscovery - host: host name or IP address - port: port number - uri: resource URI or connection string with all parameters in it - credential(s): - store_key: (optional) a key to retrieve the credentials from ICredentialStore - username: user name - password: user password - options: - serialize_message: (optional) true to serialize entire message as JSON, false to send only message payload (default: true) - retry_connect: (optional) turns on/off automated reconnect when connection is log (default: true) - max_reconnect: (optional) maximum reconnection attempts (default: 3) - reconnect_timeout: (optional) number of milliseconds to wait on each reconnection attempt (default: 3000) - flush_timeout: (optional) number of milliseconds to wait on flushing messages (default: 3000) References: - *:logger:*:*:1.0 (optional) ILogger components to pass log messages - *:counters:*:*:1.0 (optional) ICounters components to pass collected measurements - *:discovery:*:*:1.0 (optional) IDiscovery services to resolve connections - *:credential-store:*:*:1.0 (optional) Credential stores to resolve credentials - *:connection:nats:*:1.0 (optional) Shared connection to NATS service
See MessageQueue See MessagingCapabilities
Example: ctx := context.Background() queue := NewNatsBareMessageQueue("myqueue") queue.Configure(ctx, cconf.NewConfigParamsFromTuples( "subject", "mytopic", "queue_group", "mygroup", "connection.protocol", "nats" "connection.host", "localhost" "connection.port", 1883 )) _ = queue.Open(ctx, "123") _ = queue.Send(ctx, "123", NewMessageEnvelope("", "mymessage", "ABC")) message, err := queue.Receive(ctx, "123", 10000*time.Milliseconds) if (message != nil) { ... queue.Complete(ctx, message); }
func NewNatsBareMessageQueue ¶
func NewNatsBareMessageQueue(name string) *NatsBareMessageQueue
NewNatsBareMessageQueue are creates a new instance of the message queue. Parameters:
- name string (optional) a queue name.
func (*NatsBareMessageQueue) EndListen ¶
func (c *NatsBareMessageQueue) EndListen(ctx context.Context, correlationId string)
EndListen method are ends listening for incoming messages. When this method is call listen unblocks the thread and execution continues.
Parameters: - ctx context.Context operation context - correlationId string (optional) transaction id to trace execution through call chain.
func (*NatsBareMessageQueue) Listen ¶
func (c *NatsBareMessageQueue) Listen(ctx context.Context, correlationId string, receiver cqueues.IMessageReceiver) error
Listens for incoming messages and blocks the current thread until queue is closed.
Parameters: - ctx context.Context operation context - correlationId string (optional) transaction id to trace execution through call chain. - receiver cqueues.IMessageReceiver a receiver to receive incoming messages.
See IMessageReceiver See receive
func (*NatsBareMessageQueue) Peek ¶
func (c *NatsBareMessageQueue) Peek(ctx context.Context, correlationId string) (*cqueues.MessageEnvelope, error)
Peek method are peeks a single incoming message from the queue without removing it. If there are no messages available in the queue it returns nil.
Parameters: - ctx context.Context operation context - correlationId string (optional) transaction id to trace execution through call chain.
Returns: result *cqueues.MessageEnvelope, err error message or error.
func (*NatsBareMessageQueue) PeekBatch ¶
func (c *NatsBareMessageQueue) PeekBatch(ctx context.Context, correlationId string, messageCount int64) ([]*cqueues.MessageEnvelope, error)
PeekBatch method are peeks multiple incoming messages from the queue without removing them. If there are no messages available in the queue it returns an empty list. Important: This method is not supported by NATS.
Parameters: - ctx context.Context operation context - correlationId (optional) transaction id to trace execution through call chain. - messageCount a maximum number of messages to peek.
Returns: callback function that receives a list with messages or error.
func (*NatsBareMessageQueue) Receive ¶
func (c *NatsBareMessageQueue) Receive(ctx context.Context, correlationId string, waitTimeout time.Duration) (*cqueues.MessageEnvelope, error)
Receive method are receives an incoming message and removes it from the queue.
Parameters: - ctx context.Context operation context - correlationId string (optional) transaction id to trace execution through call chain. - waitTimeout time.Duration a timeout in milliseconds to wait for a message to come.
Returns: result *cqueues.MessageEnvelope, err error receives a message or error.
type NatsMessageQueue ¶
type NatsMessageQueue struct { *NatsAbstractMessageQueue // contains filtered or unexported fields }
func NewNatsMessageQueue ¶
func NewNatsMessageQueue(name string) *NatsMessageQueue
NewNatsMessageQueue are creates a new instance of the message queue.
Parameters: - ctx context.Context operation context - name string (optional) a queue name.
func (*NatsMessageQueue) Clear ¶
func (c *NatsMessageQueue) Clear(ctx context.Context, correlationId string) (err error)
Clear method are clears component state.
Parameters: - ctx context.Context operation context - correlationId string (optional) transaction id to trace execution through call chain.
Returns error or nil no errors occured.
func (*NatsMessageQueue) Close ¶
func (c *NatsMessageQueue) Close(ctx context.Context, correlationId string) error
Close method are Closes component and frees used resources.
Parameters: - ctx context.Context operation context - correlationId string (optional) transaction id to trace execution through call chain.
Returns error or nil no errors occured.
func (*NatsMessageQueue) Configure ¶
func (c *NatsMessageQueue) Configure(ctx context.Context, config *cconf.ConfigParams)
Configures component by passing configuration parameters.
Parameters: - ctx context.Context operation context - config configuration parameters to be set.
func (*NatsMessageQueue) EndListen ¶
func (c *NatsMessageQueue) EndListen(ctx context.Context, correlationId string)
EndListen method are ends listening for incoming messages. When this method is call listen unblocks the thread and execution continues.
Parameters: - ctx context.Context operation context - correlationId string (optional) transaction id to trace execution through call chain.
func (*NatsMessageQueue) Listen ¶
func (c *NatsMessageQueue) Listen(ctx context.Context, correlationId string, receiver cqueues.IMessageReceiver) error
Listens for incoming messages and blocks the current thread until queue is closed.
Parameters: - ctx context.Context operation context - correlationId string (optional) transaction id to trace execution through call chain. - receiver cqueues.IMessageReceiver a receiver to receive incoming messages.
See IMessageReceiver See receive
func (*NatsMessageQueue) OnMessage ¶
func (c *NatsMessageQueue) OnMessage(msg *nats.Msg)
Function thath process incoming messages
Parameters: - msg *nats.Msg message from the NATs
func (*NatsMessageQueue) Open ¶
func (c *NatsMessageQueue) Open(ctx context.Context, correlationId string) error
Opens the component with given connection and credential parameters.
Parameters: - ctx context.Context operation context - correlationId (optional) transaction id to trace execution through call chain.
Returns error or nil no errors occured.
func (*NatsMessageQueue) Peek ¶
func (c *NatsMessageQueue) Peek(ctx context.Context, correlationId string) (*cqueues.MessageEnvelope, error)
Peek method are peeks a single incoming message from the queue without removing it. If there are no messages available in the queue it returns nil.
Parameters: - ctx context.Context operation context - correlationId string (optional) transaction id to trace execution through call chain.
Returns: result *cqueues.MessageEnvelope, err error message or error.
func (*NatsMessageQueue) PeekBatch ¶
func (c *NatsMessageQueue) PeekBatch(ctx context.Context, correlationId string, messageCount int64) ([]*cqueues.MessageEnvelope, error)
PeekBatch method are peeks multiple incoming messages from the queue without removing them. If there are no messages available in the queue it returns an empty list. Important: This method is not supported by NATS.
Parameters: - ctx context.Context operation context - correlationId (optional) transaction id to trace execution through call chain. - messageCount a maximum number of messages to peek.
Returns: receives a list with messages or error.
func (*NatsMessageQueue) ReadMessageCount ¶
func (c *NatsMessageQueue) ReadMessageCount() (count int64, err error)
ReadMessageCount method are reads the current number of messages in the queue to be delivered. Returns number of messages or error.
func (*NatsMessageQueue) Receive ¶
func (c *NatsMessageQueue) Receive(ctx context.Context, correlationId string, waitTimeout time.Duration) (*cqueues.MessageEnvelope, error)
Receive method are receives an incoming message and removes it from the queue.
Parameters: - ctx context.Context operation context - correlationId string (optional) transaction id to trace execution through call chain. - waitTimeout time.Duration a timeout in milliseconds to wait for a message to come.
Returns: result *cqueues.MessageEnvelope, err error receives a message or error.