Documentation ¶
Index ¶
- func Cleanup()
- func InitMessaging(workerCount int, pulsarClientOptions *pulsar.ClientOptions) error
- type Consumer
- func CreateConsumer(tenantID, namespace string, topics []string, handler Handler, ...) (Consumer, error)
- func CreateRegexConsumer(tenantID, namespace, topicsPattern string, handler Handler, opts ConsumerOpts) (Consumer, error)
- func CreateSingleTopicConsumer(tenantID, namespace, topic string, handler Handler, opts ConsumerOpts) (Consumer, error)
- type ConsumerOpts
- type Handler
- type InitialPosition
- type Message
- type Producer
- type RetryMessage
- type Stats
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func InitMessaging ¶
func InitMessaging(workerCount int, pulsarClientOptions *pulsar.ClientOptions) error
InitMessaging @Description: This API will initialize the messaging channel.
It will do all the connection initialization. workerCount is the number of message processing workers.
@Param workerCount
Types ¶
type Consumer ¶
type Consumer interface { // Start // @Description: This function will start the consumption of messages. Start() error // Stop // @Description: This function will flush any existing messages and stop the consumer client. Stop() error // Unsubscribe // @Description: This function will delete the subscription created by the consumer. Unsubscribe() error // Pause // @Description: This function will flush existing messages and pause further consumption. Pause() // Unpause // @Description: This function will unpause the message consumption. Unpause() // Stats // @Description: This function will provide stats of the messages consumed. Stats() Stats }
Consumer @Description: Interface for pulsar consumer.
func CreateConsumer ¶
func CreateConsumer(tenantID, namespace string, topics []string, handler Handler, opts ConsumerOpts) (Consumer, error)
CreateConsumer @Description: This API will create a Consumer for a particular topic.
The handler passed should implement the Handler interface from this module. The consumer will create the subscription and be in a passive state until Start() is called. The consumer can be Paused and Unpaused at any point. The commitInterval used to commit messages after every n messages are consumed. The Pause() function will flushout the already received messages and pause receiving any further messages. The Unpause() function will resume receiving messages. The Stop() function will flush existing messages and stop the consumer. It won't delete the subscription. The Unsubscribe() function can be used if subscription needs to be deleted. The Stats() function provides the stats for messages consumed. Creating multiple instances of Consumer for same topic will deliver message to only one of the instances. Inorder to recreate a Consumer for same topic make sure Stop() is called on old Consumer instance.
@Param tenantID @Param namespace @Param topics @Param handler @Param opts
func CreateRegexConsumer ¶
func CreateRegexConsumer(tenantID, namespace, topicsPattern string, handler Handler, opts ConsumerOpts) (Consumer, error)
CreateRegexConsumer @Description: This API will create a Consumer for a topics matching the topics pattern.
The handler passed should implement the Handler interface from this module. The consumer will create the subscription and be in a passive state until Start() is called. The consumer can be Paused and Unpaused at any point. The Pause() function will flushout the already received messages and pause receiving any further messages. The Unpause() function will resume receiving messages. The Stop() function will flush existing messages and stop the consumer. It won't delete the subscription. The Unsubscribe() function can be used if subscription needs to be deleted. The Stats() function provides the stats for messages consumed. Creating multiple instances of Consumer for same topic will deliver message to only one of the instances. Inorder to recreate a Consumer for same topic make sure Stop() is called on old Consumer instance.
@Param tenantID @Param namespace @Param topicsPattern @Param handler @Param opts
func CreateSingleTopicConsumer ¶
func CreateSingleTopicConsumer(tenantID, namespace, topic string, handler Handler, opts ConsumerOpts) (Consumer, error)
CreateSingleTopicConsumer @Description: This API will create a Consumer for a particular topic.
The handler passed should implement the Handler interface from this module. The consumer will create the subscription and be in a passive state until Start() is called. The consumer can be Paused and Unpaused at any point. The commitInterval used to commit messages after every n messages are consumed. The Pause() function will flushout the already received messages and pause receiving any further messages. The Unpause() function will resume receiving messages. The Stop() function will flush existing messages and stop the consumer. It won't delete the subscription. The Unsubscribe() function can be used if subscription needs to be deleted. The Stats() function provides the stats for messages consumed. Creating multiple instances of Consumer for same topic will deliver message to only one of the instances. Inorder to recreate a Consumer for same topic make sure Stop() is called on old Consumer instance. retryEnabled will let the consumer retry message in case of HandleMessage return `RetryMessage` struct.
@Param tenantID @Param namespace @Param topic @Param handler @Param opts
type ConsumerOpts ¶
type ConsumerOpts struct { SubscriptionName string RetryEnabled bool InitialPosition InitialPosition }
type Handler ¶
type Handler interface { // HandleMessage // @Description: Handle consumed pulsar message. 'RetryMessage' is used to negatively ack message and requeued to retry after some delay. // @Param *Message HandleMessage(*Message) *RetryMessage }
Handler @Description: Interface for handling consumed pulsar messages.
type InitialPosition ¶
type InitialPosition int
const ( // Latest position which means the start consuming position will be the last message Latest InitialPosition = iota // Earliest position which means the start consuming position will be the first message Earliest )
type Producer ¶
type Producer interface { // Publish // @Description: This function publishes the messages to the topic. // @Param []*Message Publish([]*Message) error // PublishOne // @Description: This function publishes the message to the topic. // @Param *Message PublishOne(*Message) error // Stop // @Description: This will close the producer client. Stop() // Stats // @Description: This function will provide stats of the messages produced. Stats() Stats }
Producer @Description: Interface for pulsar producer.
func CreateProducer ¶
CreateProducer @Description: This API will create a Producer for a particular topic. The Producer instance can be used to Publish messages to the topic. @Param tenantID @Param namespace @Param topic
type RetryMessage ¶
RetryMessage @Description: Struct for retrying consumed message due to failure. Client can return this struct to ensure the message will be enqueued for a given RetryAfter duration.
type Stats ¶
type Stats struct {
TotalMessages uint64
}
Stats @Description: Struct for pulsar message stats.
func (Stats) IncrementMessageCount ¶
IncrementMessageCount @Description: This function will increment the total messages consumed. @Param messages