queues

package
v1.0.0-13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 9, 2022 License: MIT Imports: 18 Imported by: 12

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CallbackMessageReceiver

type CallbackMessageReceiver struct {
	Callback func(message *MessageEnvelope, queue IMessageQueue) error
}

CallbackMessageReceiver allows to wrap message callback into IMessageReceiver

func NewCallbackMessageReceiver

func NewCallbackMessageReceiver(callback func(message *MessageEnvelope, queue IMessageQueue) error) *CallbackMessageReceiver

func (*CallbackMessageReceiver) ReceiveMessage

func (c *CallbackMessageReceiver) ReceiveMessage(message *MessageEnvelope, queue IMessageQueue) (err error)

type IMessageQueue

type IMessageQueue interface {
	crun.IOpenable

	// Name are gets the queue name
	// Return the queue name.
	Name() string

	// Capabilities method are gets the queue capabilities
	// Return the queue's capabilities object.
	Capabilities() *MessagingCapabilities

	// ReadMessageCount method are reads the current number of messages in the queue to be delivered.
	// Returns number of messages or error.
	ReadMessageCount() (count int64, err error)

	// Send method are sends a message into the queue.
	//  - ctx       operation context
	//  - correlationId     (optional) transaction id to trace execution through call chain.
	//  - envelope          a message envelop to be sent.
	// Returns: error or nil for success.
	Send(ctx context.Context, correlationId string, envelope *MessageEnvelope) error

	// SendAsObject method are sends an object into the queue.
	// Before sending the object is converted into JSON string and wrapped in a MessageEnvelop.
	//  - ctx       operation context
	//  - correlationId     (optional) transaction id to trace execution through call chain.
	//  - messageType       a message type
	//  - value             an object value to be sent
	// Returns: error or nil for success.
	// See Send
	SendAsObject(ctx context.Context, correlationId string, messageType string, value interface{}) error

	// Peek method are peeks a single incoming message from the queue without removing it.
	// If there are no messages available in the queue it returns nil.
	//  - ctx       operation context
	//  - correlationId     (optional) transaction id to trace execution through call chain.
	// Returns: received message or error.
	Peek(ctx context.Context, correlationId string) (result *MessageEnvelope, err error)

	// PeekBatch method are peeks multiple incoming messages from the queue without removing them.
	// If there are no messages available in the queue it returns an empty list.
	//  - ctx       operation context
	//  - correlationId     (optional) transaction id to trace execution through call chain.
	//  - messageCount      a maximum number of messages to peek.
	// Returns:            list with messages or error.
	PeekBatch(ctx context.Context, correlationId string, messageCount int64) (result []*MessageEnvelope, err error)

	// Receive method are receives an incoming message and removes it from the queue.
	//  - ctx       operation context
	//  - correlationId     (optional) transaction id to trace execution through call chain.
	//  - waitTimeout       a timeout in milliseconds to wait for a message to come.
	// Returns: a message or error.
	Receive(ctx context.Context, correlationId string, waitTimeout time.Duration) (result *MessageEnvelope, err error)

	// RenewLock methodd are renews a lock on a message that makes it invisible from other receivers in the queue.
	// This method is usually used to extend the message processing time.
	//  - ctx       operation context
	//  - message       a message to extend its lock.
	//  - lockTimeout   a locking timeout in milliseconds.
	// Returns:      error or nil for success.
	RenewLock(ctx context.Context, message *MessageEnvelope, lockTimeout time.Duration) error

	// Complete method are permanently removes a message from the queue.
	// This method is usually used to remove the message after successful processing.
	//  - ctx       operation context
	//  - message   a message to remove.
	// Returns: error or nil for success.
	Complete(ctx context.Context, message *MessageEnvelope) error

	// Abandon method are returnes message into the queue and makes it available for all subscribers to receive it again.
	// This method is usually used to return a message which could not be processed at the moment
	// to repeat the attempt. Messages that cause unrecoverable errors shall be removed permanently
	// or/and send to dead letter queue.
	//  - ctx       operation context
	//  - message   a message to return.
	// Retruns: error or nil for success.
	Abandon(ctx context.Context, message *MessageEnvelope) error

	// MoveToDeadLetter method are permanently removes a message from the queue and sends it to dead letter queue.
	//  - ctx       operation context
	//  - message   a message to be removed.
	// Results: error or nil for success.
	MoveToDeadLetter(ctx context.Context, message *MessageEnvelope) error

	// Listen method are listens for incoming messages and blocks the current thread until queue is closed.
	//  - ctx       operation context
	//  - correlationId     (optional) transaction id to trace execution through call chain.
	//  - receiver          a receiver to receive incoming messages.
	// See IMessageReceiver
	// See receive
	Listen(ctx context.Context, correlationId string, receiver IMessageReceiver) error

	// BeginListen method are listens for incoming messages without blocking the current thread.
	//  - ctx       operation context
	//  - correlationId     (optional) transaction id to trace execution through call chain.
	//  - receiver          a receiver to receive incoming messages.
	// See listen
	// See IMessageReceiver
	BeginListen(ctx context.Context, correlationId string, receiver IMessageReceiver)

	// EndListen method are ends listening for incoming messages.
	// When this method is call listen unblocks the thread and execution continues.
	//  - ctx       operation context
	//  - correlationId     (optional) transaction id to trace execution through call chain.
	EndListen(ctx context.Context, correlationId string)
}

IMessageQueue Interface for asynchronous message queues.

Not all queues may implement all the methods. Attempt to call non-supported method will result in NotImplemented exception. To verify if specific method is supported consult with MessagingCapabilities.

See MessageEnvelop See MessagingCapabilities

type IMessageQueueOverrides

type IMessageQueueOverrides interface {
	IMessageQueue

	// OpenWithParams method are opens the component with given connection and credential parameters.
	//  - correlationId     (optional) transaction id to trace execution through call chain.
	//  - connections        connection parameters
	//  - credential        credential parameters
	// Returns error or nil no errors occured.
	OpenWithParams(ctx context.Context, correlationId string, connections []*cconn.ConnectionParams, credential *cauth.CredentialParams) error
}

type IMessageReceiver

type IMessageReceiver interface {

	// ReceiveMessage method are receives incoming message from the queue.
	//   - ctx       operation context
	//   - envelope  an incoming message
	//   - queue     a queue where the message comes from
	// See: MessageEnvelope
	// See: IMessageQueue
	ReceiveMessage(ctx context.Context, envelope *MessageEnvelope, queue IMessageQueue) (err error)
}

IMessageReceiver callback interface to receive incoming messages. Example:

    type MyMessageReceiver struct {
      func (c*MyMessageReceiver) ReceiveMessage(envelop MessageEnvelop, queue IMessageQueue) {
          fmt.Println("Received message: " + envelop.GetMessageAsString());
      }
    }

    messageQueue := NewMemoryMessageQueue();
    messageQueue.Listen("123", NewMyMessageReceiver());

	opnErr := messageQueue.Open("123")
	if opnErr == nil{
       messageQueue.Send("123", NewMessageEnvelop("", "mymessage", "ABC")); // Output in console: "Received message: ABC"
    }

type LockedMessage

type LockedMessage struct {
	Message        *MessageEnvelope `json:"message" bson:"message"`                 //The incoming message.
	ExpirationTime time.Time        `json:"expiration_time" bson:"expiration_time"` // The expiration time for the message lock. If it is null then the message is not locked.
	Timeout        time.Duration    `json:"timeout" bson:"timeout"`                 //The lock timeout in milliseconds.
}

LockedMessage data object used to store and lock incoming messages in MemoryMessageQueue. See: MemoryMessageQueue

type MemoryMessageQueue

type MemoryMessageQueue struct {
	MessageQueue
	// contains filtered or unexported fields
}

MemoryMessageQueue Message queue that sends and receives messages within the same process by using shared memory. This queue is typically used for testing to mock real queues. Configuration parameters:

  • name: name of the message queue

References:

- *:logger:*:*:1.0 (optional) ILogger components to pass log messages - *:counters:*:*:1.0 (optional) ICounters components to pass collected measurements

See MessageQueue See MessagingCapabilities

Example:

    queue := NewMessageQueue("myqueue");
    queue.Send("123", NewMessageEnvelop("", "mymessage", "ABC"));
	message, err := queue.Receive("123")
        if (message != nil) {
           ...
           queue.Complete("123", message);
        }

func NewMemoryMessageQueue

func NewMemoryMessageQueue(name string) *MemoryMessageQueue

NewMemoryMessageQueue method are creates a new instance of the message queue.

  • name (optional) a queue name.

Returns: *MemoryMessageQueue See MessagingCapabilities

func (*MemoryMessageQueue) Abandon

func (c *MemoryMessageQueue) Abandon(ctx context.Context, message *MessageEnvelope) (err error)

Abandon method are returnes message into the queue and makes it available for all subscribers to receive it again. This method is usually used to return a message which could not be processed at the moment to repeat the attempt. Messages that cause unrecoverable errors shall be removed permanently or/and send to dead letter queue.

  • message a message to return.

Returns: error or nil for success.

func (*MemoryMessageQueue) Clear

func (c *MemoryMessageQueue) Clear(ctx context.Context, correlationId string) (err error)

Clear method are clears component state.

  • correlationId (optional) transaction id to trace execution through call chain.

Returns: error or nil no errors occured.

func (*MemoryMessageQueue) Close

func (c *MemoryMessageQueue) Close(ctx context.Context, correlationId string) (err error)

Close method are closes component and frees used resources.

  • correlationId (optional) transaction id to trace execution through call chain.

Returns: error or nil no errors occured.

func (*MemoryMessageQueue) Complete

func (c *MemoryMessageQueue) Complete(ctx context.Context, message *MessageEnvelope) (err error)

Complete method are permanently removes a message from the queue. This method is usually used to remove the message after successful processing.

  • message a message to remove.

Returns: error or nil for success.

func (*MemoryMessageQueue) EndListen

func (c *MemoryMessageQueue) EndListen(ctx context.Context, correlationId string)

EndListen method are ends listening for incoming messages. When c method is call listen unblocks the thread and execution continues.

  • correlationId (optional) transaction id to trace execution through call chain.

func (*MemoryMessageQueue) IsOpen

func (c *MemoryMessageQueue) IsOpen() bool

IsOpen method are checks if the component is opened. Return true if the component has been opened and false otherwise.

func (*MemoryMessageQueue) Listen

func (c *MemoryMessageQueue) Listen(ctx context.Context, correlationId string, receiver IMessageReceiver) error

Listen method are listens for incoming messages and blocks the current thread until queue is closed.

  • correlationId (optional) transaction id to trace execution through call chain.
  • receiver a receiver to receive incoming messages.

See IMessageReceiver See Receive

func (*MemoryMessageQueue) MoveToDeadLetter

func (c *MemoryMessageQueue) MoveToDeadLetter(ctx context.Context, message *MessageEnvelope) (err error)

MoveToDeadLetter method are permanently removes a message from the queue and sends it to dead letter queue.

  • message a message to be removed.

Returns: error or nil for success.

func (*MemoryMessageQueue) Open

func (c *MemoryMessageQueue) Open(ctx context.Context, correlationId string) (err error)

OpenWithParams method are opens the component with given connection and credential parameters.

  • correlationId (optional) transaction id to trace execution through call chain.
  • connection connection parameters
  • credential credential parameters

Retruns: error or nil no errors occured.

func (*MemoryMessageQueue) Peek

func (c *MemoryMessageQueue) Peek(ctx context.Context, correlationId string) (result *MessageEnvelope, err error)

Peek meethod are peeks a single incoming message from the queue without removing it. If there are no messages available in the queue it returns nil.

  • correlationId (optional) transaction id to trace execution through call chain.

Returns: a message or error.

func (*MemoryMessageQueue) PeekBatch

func (c *MemoryMessageQueue) PeekBatch(ctx context.Context, correlationId string, messageCount int64) (result []*MessageEnvelope, err error)

PeekBatch method are peeks multiple incoming messages from the queue without removing them. If there are no messages available in the queue it returns an empty list.

  • correlationId (optional) transaction id to trace execution through call chain.
  • messageCount a maximum number of messages to peek.

Returns: a list with messages or error.

func (*MemoryMessageQueue) ReadMessageCount

func (c *MemoryMessageQueue) ReadMessageCount() (count int64, err error)

ReadMessageCount method are reads the current number of messages in the queue to be delivered. Returns: number of messages or error.

func (*MemoryMessageQueue) Receive

func (c *MemoryMessageQueue) Receive(ctx context.Context, correlationId string, waitTimeout time.Duration) (*MessageEnvelope, error)
Receive method are receives an incoming message and removes it from the queue.
 - correlationId     (optional) transaction id to trace execution through call chain.
 - waitTimeout       a timeout in milliseconds to wait for a message to come.

Returns: a message or error.

func (*MemoryMessageQueue) RenewLock

func (c *MemoryMessageQueue) RenewLock(ctx context.Context, message *MessageEnvelope, lockTimeout time.Duration) (err error)

RenewLock method are renews a lock on a message that makes it invisible from other receivers in the queue. This method is usually used to extend the message processing time.

  • message a message to extend its lock.
  • lockTimeout a locking timeout in milliseconds.

Returns: error or nil for success.

func (*MemoryMessageQueue) Send

func (c *MemoryMessageQueue) Send(ctx context.Context, correlationId string, envelope *MessageEnvelope) (err error)

Send method are sends a message into the queue.

  • correlationId (optional) transaction id to trace execution through call chain.
  • envelope a message envelop to be sent.

Returns: error or nil for success.

type MessageEnvelope

type MessageEnvelope struct {
	CorrelationId string    `json:"correlation_id"` //The unique business transaction id that is used to trace calls across components.
	MessageId     string    `json:"message_id"`     // The message"s auto-generated ID.
	MessageType   string    `json:"message_type"`   // String value that defines the stored message"s type.
	SentTime      time.Time `json:"sent_time"`      // The time at which the message was sent.
	Message       []byte    `json:"message"`        //The stored message.
	// contains filtered or unexported fields
}

MessageEnvelope allows adding additional information to messages. A correlation id, message id, and a message type are added to the data being sent/received. Additionally, a MessageEnvelope can reference a lock token. Side note: a MessageEnvelope"s message is stored as a buffer, so strings are converted using utf8 conversions.

func NewEmptyMessageEnvelope

func NewEmptyMessageEnvelope() *MessageEnvelope

NewMessageEnvelope method are creates an empty MessageEnvelope Returns: *MessageEnvelope new instance

func NewMessageEnvelope

func NewMessageEnvelope(correlationId string, messageType string, message []byte) *MessageEnvelope

NewMessageEnvelope method are creates a new MessageEnvelope, which adds a correlation id, message id, and a type to the data being sent/received.

  • correlationId (optional) transaction id to trace execution through call chain.
  • messageType a string value that defines the message"s type.
  • message the data being sent/received.

Returns: *MessageEnvelope new instance

func (*MessageEnvelope) GetMessageAs

func (c *MessageEnvelope) GetMessageAs(value interface{}) interface{}

GetMessageAs method are returns the value that was stored in this message as object. See SetMessageAsObject

func (*MessageEnvelope) GetMessageAsJson

func (c *MessageEnvelope) GetMessageAsJson() interface{}

GetMessageAsJson method are returns the value that was stored in this message as a JSON string. See SetMessageAsJson

func (*MessageEnvelope) GetMessageAsString

func (c *MessageEnvelope) GetMessageAsString() string

GetMessageAsString method are returns the information stored in this message as a string.

func (*MessageEnvelope) GetReference

func (c *MessageEnvelope) GetReference() interface{}

GetReference method are returns the lock token that this MessageEnvelope references.

func (*MessageEnvelope) MarshalJSON

func (c *MessageEnvelope) MarshalJSON() ([]byte, error)

func (*MessageEnvelope) SetMessageAsJson

func (c *MessageEnvelope) SetMessageAsJson(value interface{})

SetMessageAsJson method are stores the given value as a JSON string.

  • value the value to convert to JSON and store in this message.

See GetMessageAsJson

func (*MessageEnvelope) SetMessageAsObject

func (c *MessageEnvelope) SetMessageAsObject(value interface{})

SetMessageAsJson method are stores the given value as a JSON string.

  • value the value to convert to JSON and store in this message.

See GetMessageAs

func (*MessageEnvelope) SetMessageAsString

func (c *MessageEnvelope) SetMessageAsString(value string)

SetMessageAsString method are stores the given string.

  • value the string to set. Will be converted to a bufferg.

func (*MessageEnvelope) SetReference

func (c *MessageEnvelope) SetReference(value interface{})

SetReference method are sets a lock token reference for this MessageEnvelope.

  • value the lock token to reference.

func (*MessageEnvelope) String

func (c *MessageEnvelope) String() string

String method are convert"s this MessageEnvelope to a string, using the following format: <correlation_id>,<MessageType>,<message.toString> If any of the values are nil, they will be replaced with ---. Returns the generated string.

func (*MessageEnvelope) UnmarshalJSON

func (c *MessageEnvelope) UnmarshalJSON(data []byte) error

type MessageQueue

type MessageQueue struct {
	Overrides          IMessageQueueOverrides
	Logger             *clog.CompositeLogger
	Counters           *ccount.CompositeCounters
	ConnectionResolver *cconn.ConnectionResolver
	CredentialResolver *cauth.CredentialResolver
	Lock               sync.Mutex
	// contains filtered or unexported fields
}

MessageQueue message queue that is used as a basis for specific message queue implementations.

Configuration parameters:

  • name: name of the message queue
  • connection(s):
  • discovery_key: key to retrieve parameters from discovery service
  • protocol: connection protocol like http, https, tcp, udp
  • host: host name or IP address
  • port: port number
  • uri: resource URI or connection string with all parameters in it
  • credential(s):
  • store_key: key to retrieve parameters from credential store
  • username: user name
  • password: user password
  • access_id: application access id
  • access_key: application secret key

References:

- *:Logger:*:*:1.0 (optional) ILogger components to pass log messages - *:Counters:*:*:1.0 (optional) ICounters components to pass collected measurements - *:discovery:*:*:1.0 (optional) IDiscovery components to discover connection(s) - *:credential-store:*:*:1.0 (optional) ICredentialStore componetns to lookup credential(s)

func InheritMessageQueue

func InheritMessageQueue(overrides IMessageQueueOverrides, name string, capabilities *MessagingCapabilities) *MessageQueue

NewMessageQueue method are creates a new instance of the message queue.

  • overrides a message queue overrides
  • name (optional) a queue name
  • capabilities (optional) capabilities of this message queue

func (*MessageQueue) BeginListen

func (c *MessageQueue) BeginListen(ctx context.Context, correlationId string, receiver IMessageReceiver)

BeginListen method are listens for incoming messages without blocking the current thread.

  • ctx operation context
  • correlationId (optional) transaction id to trace execution through call chain.
  • receiver a receiver to receive incoming messages.

See Listen See IMessageReceiver

func (*MessageQueue) Capabilities

func (c *MessageQueue) Capabilities() *MessagingCapabilities

Capabilities method are gets the queue capabilities Return the queue's capabilities object.

func (*MessageQueue) CheckOpen

func (c *MessageQueue) CheckOpen(correlationId string) error

Checks if message queue has been opened

  • correlationId (optional) transaction id to trace execution through call chain.

Returns: error or null for success.

func (*MessageQueue) Configure

func (c *MessageQueue) Configure(ctx context.Context, config *cconf.ConfigParams)

Configure method are configures component by passing configuration parameters.

  • config configuration parameters to be set.

func (*MessageQueue) Name

func (c *MessageQueue) Name() string

Name method are gets the queue name Return the queue name.

func (*MessageQueue) Open

func (c *MessageQueue) Open(ctx context.Context, correlationId string) error

Open method are opens the component.

  • ctx operation context
  • correlationId (optional) transaction id to trace execution through call chain.

Returns: error or null no errors occured.

func (*MessageQueue) OpenWithParams

func (c *MessageQueue) OpenWithParams(ctx context.Context, correlationId string, connections []*cconn.ConnectionParams,
	credential *cauth.CredentialParams) error

OpenWithParams method are opens the component with given connection and credential parameters.

  • ctx operation context
  • correlationId (optional) transaction id to trace execution through call chain.
  • connections connection parameters
  • credential credential parameters

Returns error or nil no errors occured.

func (*MessageQueue) SendAsObject

func (c *MessageQueue) SendAsObject(ctx context.Context, correlationId string, messageType string, message interface{}) (err error)

SendAsObject method are sends an object into the queue. Before sending the object is converted into JSON string and wrapped in a MessageEnvelop.

  • ctx operation context
  • correlationId (optional) transaction id to trace execution through call chain.
  • messageType a message type
  • value an object value to be sent

Returns: error or nil for success. See Send

func (*MessageQueue) SetReferences

func (c *MessageQueue) SetReferences(ctx context.Context, references cref.IReferences)

SetReferences mmethod are sets references to dependent components.

  • ctx operation context
  • references references to locate the component dependencies.

func (*MessageQueue) String

func (c *MessageQueue) String() string

String method are gets a string representation of the object. Return a string representation of the object.

type MessagingCapabilities

type MessagingCapabilities struct {
	// contains filtered or unexported fields
}

MessagingCapabilities data object that contains supported capabilities of a message queue. If certain capability is not supported a queue will throw NotImplemented exception.

func NewMessagingCapabilities

func NewMessagingCapabilities(canMessageCount bool, canSend bool, canReceive bool,
	canPeek bool, canPeekBatch bool, canRenewLock bool, canAbandon bool,
	canDeadLetter bool, canClear bool) *MessagingCapabilities

NewMessagingCapabilities method are creates a new instance of the capabilities object.

  • canMessageCount true if queue supports reading message count.
  • canSend true if queue is able to send messages.
  • canReceive true if queue is able to receive messages.
  • canPeek true if queue is able to peek messages.
  • canPeekBatch true if queue is able to peek multiple messages in one batch.
  • canRenewLock true if queue is able to renew message lock.
  • canAbandon true if queue is able to abandon messages.
  • canDeadLetter true if queue is able to send messages to dead letter queue.
  • canClear true if queue can be cleared.

Returns *MessagingCapabilities

func (*MessagingCapabilities) CanAbandon

func (c *MessagingCapabilities) CanAbandon() bool

CanAbandon method are informs if the queue is able to abandon messages. Returns: true if queue is able to abandon.

func (*MessagingCapabilities) CanClear

func (c *MessagingCapabilities) CanClear() bool

CanClear method are informs if the queue can be cleared. Returns: true if queue can be cleared.

func (*MessagingCapabilities) CanDeadLetter

func (c *MessagingCapabilities) CanDeadLetter() bool

CanDeadLetter method are informs if the queue is able to send messages to dead letter queue. Returns: true if queue is able to send messages to dead letter queue.

func (*MessagingCapabilities) CanMessageCount

func (c *MessagingCapabilities) CanMessageCount() bool

CanMessageCount method are informs if the queue is able to read number of messages. Returns: true if queue supports reading message count.

func (*MessagingCapabilities) CanPeek

func (c *MessagingCapabilities) CanPeek() bool

CanPeek method are informs if the queue is able to peek messages. Returns: true if queue is able to peek messages.

func (*MessagingCapabilities) CanPeekBatch

func (c *MessagingCapabilities) CanPeekBatch() bool

CanPeekBatch method are informs if the queue is able to peek multiple messages in one batch. Returns: true if queue is able to peek multiple messages in one batch.

func (*MessagingCapabilities) CanReceive

func (c *MessagingCapabilities) CanReceive() bool

CanReceive method are informs if the queue is able to receive messages. Returns: true if queue is able to receive messages.

func (*MessagingCapabilities) CanRenewLock

func (c *MessagingCapabilities) CanRenewLock() bool

CanRenewLock method are informs if the queue is able to renew message lock. Returns: true if queue is able to renew message lock.

func (*MessagingCapabilities) CanSend

func (c *MessagingCapabilities) CanSend() bool

CanSend method are informs if the queue is able to send messages. Returns: true if queue is able to send messages.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL