Documentation ¶
Index ¶
- type KafkaMessageQueue
- func (c *KafkaMessageQueue) Abandon(ctx context.Context, message *cqueues.MessageEnvelope) error
- func (c *KafkaMessageQueue) Cleanup(kafka.ConsumerGroupSession) error
- func (c *KafkaMessageQueue) Clear(ctx context.Context, correlationId string) error
- func (c *KafkaMessageQueue) Close(ctx context.Context, correlationId string) (err error)
- func (c *KafkaMessageQueue) Complete(ctx context.Context, message *cqueues.MessageEnvelope) error
- func (c *KafkaMessageQueue) Configure(ctx context.Context, config *cconf.ConfigParams)
- func (c *KafkaMessageQueue) ConsumeClaim(session kafka.ConsumerGroupSession, claim kafka.ConsumerGroupClaim) error
- func (c *KafkaMessageQueue) EndListen(ctx context.Context, correlationId string)
- func (c *KafkaMessageQueue) IsOpen() bool
- func (c *KafkaMessageQueue) Listen(ctx context.Context, correlationId string, receiver cqueues.IMessageReceiver) error
- func (c *KafkaMessageQueue) MoveToDeadLetter(ctx context.Context, message *cqueues.MessageEnvelope) error
- func (c *KafkaMessageQueue) OnMessage(ctx context.Context, msg *connect.KafkaMessage)
- func (c *KafkaMessageQueue) Open(ctx context.Context, correlationId string) (err error)
- func (c *KafkaMessageQueue) Peek(ctx context.Context, correlationId string) (*cqueues.MessageEnvelope, error)
- func (c *KafkaMessageQueue) PeekBatch(ctx context.Context, correlationId string, messageCount int64) ([]*cqueues.MessageEnvelope, error)
- func (c *KafkaMessageQueue) ReadMessageCount() (int64, error)
- func (c *KafkaMessageQueue) Ready() chan bool
- func (c *KafkaMessageQueue) Receive(ctx context.Context, correlationId string, waitTimeout time.Duration) (*cqueues.MessageEnvelope, error)
- func (c *KafkaMessageQueue) RenewLock(ctx context.Context, message *cqueues.MessageEnvelope, ...) (err error)
- func (c *KafkaMessageQueue) Send(ctx context.Context, correlationId string, envelop *cqueues.MessageEnvelope) error
- func (c *KafkaMessageQueue) SetReady(chFlag chan bool)
- func (c *KafkaMessageQueue) SetReferences(ctx context.Context, references cref.IReferences)
- func (c *KafkaMessageQueue) Setup(kafka.ConsumerGroupSession) error
- func (c *KafkaMessageQueue) UnsetReferences(ctx context.Context)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type KafkaMessageQueue ¶
type KafkaMessageQueue struct { *cqueues.MessageQueue // The dependency resolver. DependencyResolver *cref.DependencyResolver // The logger. Logger *clog.CompositeLogger // The Kafka connection component. Connection *connect.KafkaConnection // contains filtered or unexported fields }
KafkaMessageQueue are message queue that sends and receives messages via Kafka message broker.
Configuration parameters:
- topic: name of Kafka topic to subscribe
- group_id: (optional) consumer group id (default: default)
- from_beginning: (optional) restarts receiving messages from the beginning (default: false)
- read_partitions: (optional) number of partitions to be consumed concurrently (default: 1)
- autocommit: (optional) turns on/off autocommit (default: true)
- connection(s):
- discovery_key: (optional) a key to retrieve the connection from IDiscovery
- host: host name or IP address
- port: port number
- uri: resource URI or connection string with all parameters in it
- credential(s):
- store_key: (optional) a key to retrieve the credentials from ICredentialStore
- username: user name
- password: user password
- options:
- read_partitions: (optional) list of partition indexes to be read (default: all, set for example: "1;5;7")
- write_partition: (optional) list of partition indexes to be read (default: auto (-1))
- autosubscribe: (optional) true to automatically subscribe on option (default: false)
- log_level: (optional) log level 0 - None, 1 - Error, 2 - Warn, 3 - Info, 4 - Debug (default: 1)
- connect_timeout: (optional) number of milliseconds to connect to broker (default: 1000)
- max_retries: (optional) maximum retry attempts (default: 5)
- retry_timeout: (optional) number of milliseconds to wait on each reconnection attempt (default: 30000)
- request_timeout: (optional) number of milliseconds to wait on flushing messages (default: 30000)
References:
- *:logger:*:*:1.0 (optional) ILogger components to pass log messages
- *:counters:*:*:1.0 (optional) ICounters components to pass collected measurements
- *:discovery:*:*:1.0 (optional) IDiscovery services to resolve connections
- *:credential-store:*:*:1.0 (optional) Credential stores to resolve credentials
- *:connection:kafka:*:1.0 (optional) Shared connection to Kafka service
See MessageQueue See MessagingCapabilities
Example:
ctx := context.Context() queue := NewKafkaMessageQueue("myqueue") queue.Configure(ctx, cconf.NewConfigParamsFromTuples( "subject", "mytopic", "connection.protocol", "kafka", "connection.host", "localhost", "connection.port", 1883, )) _ = queue.Open(ctx, "123") _ = queue.Send(ctx, "123", NewMessageEnvelope("", "mymessage", "ABC")) message, err := queue.Receive(ctx, "123", 10000*time.Milliseconds) if (message != nil) { ... queue.Complete(ctx, message); }
func NewKafkaMessageQueue ¶
func NewKafkaMessageQueue(name string) *KafkaMessageQueue
Creates a new instance of the queue component. Parameters:
- name (optional) a queue name.
func (*KafkaMessageQueue) Abandon ¶
func (c *KafkaMessageQueue) Abandon(ctx context.Context, message *cqueues.MessageEnvelope) error
Abandon method are returnes message into the queue and makes it available for all subscribers to receive it again. This method is usually used to return a message which could not be processed at the moment to repeat the attempt. Messages that cause unrecoverable errors shall be removed permanently or/and send to dead letter queue. Parameters: - ctx context.Context operation context - message *cqueues.MessageEnvelope a message to return. Returns: error error or nil for success.
func (*KafkaMessageQueue) Cleanup ¶
func (c *KafkaMessageQueue) Cleanup(kafka.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*KafkaMessageQueue) Clear ¶
func (c *KafkaMessageQueue) Clear(ctx context.Context, correlationId string) error
Clear method are clears component state. Parameters:
- ctx context.Context operation context
- correlationId string (optional) transaction id to trace execution through call chain.
Returns error or nil no errors occured.
func (*KafkaMessageQueue) Close ¶
func (c *KafkaMessageQueue) Close(ctx context.Context, correlationId string) (err error)
Closes component and frees used resources.
- correlationId (optional) transaction id to trace execution through call chain.
- Returns error or nil no errors occured.
func (*KafkaMessageQueue) Complete ¶
func (c *KafkaMessageQueue) Complete(ctx context.Context, message *cqueues.MessageEnvelope) error
Complete method are permanently removes a message from the queue. This method is usually used to remove the message after successful processing. Parameters:
- ctx context.Context operation context
- message *cqueues.MessageEnvelope a message to remove.
Returns: error error or nil for success.
func (*KafkaMessageQueue) Configure ¶
func (c *KafkaMessageQueue) Configure(ctx context.Context, config *cconf.ConfigParams)
Configures component by passing configuration parameters. Parameters:
- ctx context.Context operation context
- config configuration parameters to be set.
func (*KafkaMessageQueue) ConsumeClaim ¶
func (c *KafkaMessageQueue) ConsumeClaim(session kafka.ConsumerGroupSession, claim kafka.ConsumerGroupClaim) error
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (*KafkaMessageQueue) EndListen ¶
func (c *KafkaMessageQueue) EndListen(ctx context.Context, correlationId string)
EndListen method are ends listening for incoming messages. When this method is call listen unblocks the thread and execution continues. Parameters:
- ctx context.Context operation context
- correlationId string (optional) transaction id to trace execution through call chain.
func (*KafkaMessageQueue) IsOpen ¶
func (c *KafkaMessageQueue) IsOpen() bool
Checks if the component is opened. Returns true if the component has been opened and false otherwise.
func (*KafkaMessageQueue) Listen ¶
func (c *KafkaMessageQueue) Listen(ctx context.Context, correlationId string, receiver cqueues.IMessageReceiver) error
Listens for incoming messages and blocks the current thread until queue is closed. Parameters:
- ctx context.Context operation context
- correlationId string (optional) transaction id to trace execution through call chain.
- receiver cqueues.IMessageReceiver a receiver to receive incoming messages.
See IMessageReceiver See receive
func (*KafkaMessageQueue) MoveToDeadLetter ¶
func (c *KafkaMessageQueue) MoveToDeadLetter(ctx context.Context, message *cqueues.MessageEnvelope) error
Permanently removes a message from the queue and sends it to dead letter queue. Important: This method is not supported by Kafka. Parameters:
- ctx context.Context operation context
- message *cqueues.MessageEnvelope a message to be removed.
Returns: error error or nil for success.
func (*KafkaMessageQueue) OnMessage ¶
func (c *KafkaMessageQueue) OnMessage(ctx context.Context, msg *connect.KafkaMessage)
Callback for processing messages from kafka Parameters:
- ctx context.Context operation context
- msg *connect.KafkaMessage consumer message
func (*KafkaMessageQueue) Open ¶
func (c *KafkaMessageQueue) Open(ctx context.Context, correlationId string) (err error)
Opens the component. Parameters:
- ctx context.Context operation context
- correlationId (optional) transaction id to trace execution through call chain.
- Returns error or nil no errors occured.
func (*KafkaMessageQueue) Peek ¶
func (c *KafkaMessageQueue) Peek(ctx context.Context, correlationId string) (*cqueues.MessageEnvelope, error)
Peek method are peeks a single incoming message from the queue without removing it. If there are no messages available in the queue it returns nil. Parameters:
- ctx context.Context operation context
- correlationId string (optional) transaction id to trace execution through call chain.
Returns: result *cqueues.MessageEnvelope, err error message or error.
func (*KafkaMessageQueue) PeekBatch ¶
func (c *KafkaMessageQueue) PeekBatch(ctx context.Context, correlationId string, messageCount int64) ([]*cqueues.MessageEnvelope, error)
PeekBatch method are peeks multiple incoming messages from the queue without removing them. If there are no messages available in the queue it returns an empty list. Important: This method is not supported by Kafka. Parameters:
- ctx context.Context operation context
- correlationId (optional) transaction id to trace execution through call chain.
- messageCount a maximum number of messages to peek.
Returns: callback function that receives a list with messages or error.
func (*KafkaMessageQueue) ReadMessageCount ¶
func (c *KafkaMessageQueue) ReadMessageCount() (int64, error)
ReadMessageCount method are reads the current number of messages in the queue to be delivered. Returns number of messages or error.
func (*KafkaMessageQueue) Ready ¶
func (c *KafkaMessageQueue) Ready() chan bool
Returns: channel with bool flag ready
func (*KafkaMessageQueue) Receive ¶
func (c *KafkaMessageQueue) Receive(ctx context.Context, correlationId string, waitTimeout time.Duration) (*cqueues.MessageEnvelope, error)
Receive method are receives an incoming message and removes it from the queue. Parameters:
- ctx context.Context operation context
- correlationId string (optional) transaction id to trace execution through call chain.
- waitTimeout time.Duration a timeout in milliseconds to wait for a message to come.
Returns: result *cqueues.MessageEnvelope, err error receives a message or error.
func (*KafkaMessageQueue) RenewLock ¶
func (c *KafkaMessageQueue) RenewLock(ctx context.Context, message *cqueues.MessageEnvelope, lockTimeout time.Duration) (err error)
RenewLock method are renews a lock on a message that makes it invisible from other receivers in the queue. This method is usually used to extend the message processing time. Important: This method is not supported by Kafka. Parameters:
- ctx context.Context operation context
- message *cqueues.MessageEnvelope a message to extend its lock.
- lockTimeout time.Duration a locking timeout in milliseconds.
Returns: error receives an error or nil for success.
func (*KafkaMessageQueue) Send ¶
func (c *KafkaMessageQueue) Send(ctx context.Context, correlationId string, envelop *cqueues.MessageEnvelope) error
Send method are sends a message into the queue. Parameters:
- ctx context.Context operation context
- correlationId string (optional) transaction id to trace execution through call chain.
- envelope *cqueues.MessageEnvelope a message envelop to be sent.
Returns: error or nil for success.
func (*KafkaMessageQueue) SetReady ¶
func (c *KafkaMessageQueue) SetReady(chFlag chan bool)
Set bool channel with ready flag for consumer
Parameters: - chFlag bool channel
func (*KafkaMessageQueue) SetReferences ¶
func (c *KafkaMessageQueue) SetReferences(ctx context.Context, references cref.IReferences)
Sets references to dependent components. Parameters:
- ctx context.Context
- references references to locate the component dependencies.
func (*KafkaMessageQueue) Setup ¶
func (c *KafkaMessageQueue) Setup(kafka.ConsumerGroupSession) error
Setup is run at the beginning of a new session, before ConsumeClaim Send ready flag into channel Returns: error
func (*KafkaMessageQueue) UnsetReferences ¶
func (c *KafkaMessageQueue) UnsetReferences(ctx context.Context)
Unsets (clears) previously set references to dependent components. Parameters:
- ctx context.Context operation context