wpcm

package
v2.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 18, 2023 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Cleanup

func Cleanup()

func InitMessaging

func InitMessaging(workerCount int, pulsarClientOptions *pulsar.ClientOptions) error

InitMessaging @Description: This API will initialize the messaging channel.

It will do all the connection initialization.
workerCount is the number of message processing workers.

@Param workerCount

Types

type Consumer

type Consumer interface {
	// Start
	// @Description: This function will start the consumption of messages.
	Start() error

	// Stop
	// @Description: This function will flush any existing messages and stop the consumer client.
	Stop() error

	// Unsubscribe
	// @Description: This function will delete the subscription created by the consumer.
	Unsubscribe() error

	// Pause
	// @Description: This function will flush existing messages and pause further consumption.
	Pause()

	// Unpause
	// @Description: This function will unpause the message consumption.
	Unpause()

	// Stats
	// @Description: This function will provide stats of the messages consumed.
	Stats() Stats
}

Consumer @Description: Interface for pulsar consumer.

func CreateConsumer

func CreateConsumer(tenantID, namespace string, topics []string, handler Handler, opts ConsumerOpts) (Consumer, error)

CreateConsumer @Description: This API will create a Consumer for a particular topic.

The handler passed should implement the Handler interface from this module.
The consumer will create the subscription and be in a passive state until Start() is called.
The consumer can be Paused and Unpaused at any point.
The commitInterval used to commit messages after every n messages are consumed.
The Pause() function will flushout the already received messages and pause receiving any further messages.
The Unpause() function will resume receiving messages.
The Stop() function will flush existing messages and stop the consumer. It won't delete the subscription.
The Unsubscribe() function can be used if subscription needs to be deleted.
The Stats() function provides the stats for messages consumed.
Creating multiple instances of Consumer for same topic will deliver message to only one of the instances.
Inorder to recreate a Consumer for same topic make sure Stop() is called on old Consumer instance.

@Param tenantID @Param namespace @Param topics @Param handler @Param opts

func CreateRegexConsumer

func CreateRegexConsumer(tenantID, namespace, topicsPattern string, handler Handler, opts ConsumerOpts) (Consumer, error)

CreateRegexConsumer @Description: This API will create a Consumer for a topics matching the topics pattern.

The handler passed should implement the Handler interface from this module.
The consumer will create the subscription and be in a passive state until Start() is called.
The consumer can be Paused and Unpaused at any point.
The Pause() function will flushout the already received messages and pause receiving any further messages.
The Unpause() function will resume receiving messages.
The Stop() function will flush existing messages and stop the consumer. It won't delete the subscription.
The Unsubscribe() function can be used if subscription needs to be deleted.
The Stats() function provides the stats for messages consumed.
Creating multiple instances of Consumer for same topic will deliver message to only one of the instances.
Inorder to recreate a Consumer for same topic make sure Stop() is called on old Consumer instance.

@Param tenantID @Param namespace @Param topicsPattern @Param handler @Param opts

func CreateSingleTopicConsumer

func CreateSingleTopicConsumer(tenantID, namespace, topic string, handler Handler, opts ConsumerOpts) (Consumer, error)

CreateSingleTopicConsumer @Description: This API will create a Consumer for a particular topic.

 	The handler passed should implement the Handler interface from this module.
	The consumer will create the subscription and be in a passive state until Start() is called.
	The consumer can be Paused and Unpaused at any point.
	The commitInterval used to commit messages after every n messages are consumed.
	The Pause() function will flushout the already received messages and pause receiving any further messages.
	The Unpause() function will resume receiving messages.
	The Stop() function will flush existing messages and stop the consumer. It won't delete the subscription.
	The Unsubscribe() function can be used if subscription needs to be deleted.
	The Stats() function provides the stats for messages consumed.
	Creating multiple instances of Consumer for same topic will deliver message to only one of the instances.
	Inorder to recreate a Consumer for same topic make sure Stop() is called on old Consumer instance.
	retryEnabled will let the consumer retry message in case of HandleMessage return `RetryMessage` struct.

@Param tenantID @Param namespace @Param topic @Param handler @Param opts

type ConsumerOpts

type ConsumerOpts struct {
	SubscriptionName string
	RetryEnabled     bool
	InitialPosition  InitialPosition
}

type Handler

type Handler interface {
	// HandleMessage
	// @Description: Handle consumed pulsar message. 'RetryMessage' is used to negatively ack message and requeued to retry after some delay.
	// @Param *Message
	HandleMessage(*Message) *RetryMessage
}

Handler @Description: Interface for handling consumed pulsar messages.

type InitialPosition

type InitialPosition int
const (
	// Latest position which means the start consuming position will be the last message
	Latest InitialPosition = iota

	// Earliest position which means the start consuming position will be the first message
	Earliest
)

type Message

type Message struct {
	Payload      []byte
	Properties   map[string]string
	DeliverAfter time.Duration
}

Message @Description: Struct for pulsar message.

type Producer

type Producer interface {
	// Publish
	// @Description: This function publishes the messages to the topic.
	// @Param []*Message
	Publish([]*Message) error

	// PublishOne
	// @Description: This function publishes the message to the topic.
	// @Param *Message
	PublishOne(*Message) error

	// Stop
	// @Description: This will close the producer client.
	Stop()

	// Stats
	// @Description: This function will provide stats of the messages produced.
	Stats() Stats
}

Producer @Description: Interface for pulsar producer.

func CreateProducer

func CreateProducer(tenantID string, namespace string, topic string) (Producer, error)

CreateProducer @Description: This API will create a Producer for a particular topic. The Producer instance can be used to Publish messages to the topic. @Param tenantID @Param namespace @Param topic

type RetryMessage

type RetryMessage struct {
	RetryAfter time.Duration
}

RetryMessage @Description: Struct for retrying consumed message due to failure. Client can return this struct to ensure the message will be enqueued for a given RetryAfter duration.

type Stats

type Stats struct {
	TotalMessages uint64
}

Stats @Description: Struct for pulsar message stats.

func (Stats) IncrementMessageCount

func (s Stats) IncrementMessageCount(messages uint64)

IncrementMessageCount @Description: This function will increment the total messages consumed. @Param messages

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL