pulsarlib

package
v0.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 20, 2023 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Cleanup

func Cleanup()

func CreateNamespace

func CreateNamespace(tenantID string, namespace string) error

func CreatePartitionedTopic

func CreatePartitionedTopic(tenantID string, namespace string, topic string, partitions int) error

func CreateSubscriptionOnTopic added in v0.2.1

func CreateSubscriptionOnTopic(tenantID, namespace, topic, subscription, position string) error

func CreateTenant

func CreateTenant(tenantID string, adminRoles []string, allowedClusters []string) error

func CreateTopic

func CreateTopic(tenantID string, namespace string, topic string) error

func DeleteNamespace

func DeleteNamespace(tenantID string, namespace string) error

func DeletePartitionedTopic added in v0.5.0

func DeletePartitionedTopic(tenantID string, namespace string, topic string) error

func DeleteSubscriptionOnTopic added in v0.2.2

func DeleteSubscriptionOnTopic(tenantID, namespace, topic, subscription string) error

func DeleteTenant

func DeleteTenant(tenantID string) error

func DeleteTopic

func DeleteTopic(tenantID string, namespace string, topic string) error

func GetStatsForPartitionedTopic added in v0.5.0

func GetStatsForPartitionedTopic(tenantID string, namespace string, topic string) (map[string]interface{}, error)

func GetSubscriptionsOnTopic added in v0.2.3

func GetSubscriptionsOnTopic(tenantID, namespace, topic string) (subscriptionsOnTopic []string, err error)

func InitMessaging

func InitMessaging(workerCount int, host string, dataPort int, httpPort int) error

This API will initialize the messaging channel. It will do all the connection initialiations. workerCount is the number of message processing workers.

func ListNamespaces

func ListNamespaces(tenantID string) ([]string, error)

func ListPartitionedTopics added in v0.5.0

func ListPartitionedTopics(tenantID string, namespace string) ([]string, error)

func ListTenants

func ListTenants() ([]string, error)

func ListTopics

func ListTopics(tenantID string, namespace string) ([]string, error)

func ResetSubscriptionOnTopic added in v0.2.10

func ResetSubscriptionOnTopic(tenantID string, namespace string, topic string, subscription string, timestampMilliseconds int64) (err error)

func SetAutoSubscriptionCreationOnNamespace added in v0.3.4

func SetAutoSubscriptionCreationOnNamespace(tenantID, namespace string) error

func SetNamespaceRetention added in v0.2.7

func SetNamespaceRetention(tenantID string, namespace string, retentionInMB int64, retentionInMinutes int64) error

func UnloadNamespace

func UnloadNamespace(tenantID string, namespace string) error

Types

type Consumer

type Consumer interface {
	//This function will start the consumption of messages.
	Start() error
	//This function will flush any existing messages and stop the consumer client.
	Stop() error
	//This function will delete the subscription created by the consumer.
	Unsubscribe() error
	//This function will flush existing messages and pause further consumption.
	Pause()
	//This function will unpause the message consumption.
	Unpause()
	//This function will provide stats of the messages consumed.
	Stats() Stats
}

func CreateConsumer

func CreateConsumer(tenantID, namespace string, topics []string, handler Handler, opts ConsumerOpts) (Consumer, error)

This API will create a Consumer for a particular topic. The handler passed should implement the Handler interface from this module. The consumer will create the subscription and be in a passive state until Start() is called. The consumer can be Paused and Unpaused at any point. The commitInterval used to commit messages after every n messages are consumed. The Pause() function will flushout the already received messages and pause receiving any further messages. The Unpause() function will resume receiving messages. The Stop() function will flush existing messages and stop the consumer. It won't delete the subscription. The Unsubscribe() function can be used if subscription needs to be deleted. The Stats() function provides the stats for messages consumed.

Creating multiple instances of Consumer for same topic will deliver message to only one of the instances. Inorder to recreate a Consumer for same topic make sure Stop() is called on old Consumer instance.

func CreateRegexConsumer added in v0.2.8

func CreateRegexConsumer(tenantID, namespace, topicsPattern string, handler Handler, opts ConsumerOpts) (Consumer, error)

This API will create a Consumer for a topics matching the topics pattern. The handler passed should implement the Handler interface from this module. The consumer will create the subscription and be in a passive state until Start() is called. The consumer can be Paused and Unpaused at any point. The Pause() function will flushout the already received messages and pause receiving any further messages. The Unpause() function will resume receiving messages. The Stop() function will flush existing messages and stop the consumer. It won't delete the subscription. The Unsubscribe() function can be used if subscription needs to be deleted. The Stats() function provides the stats for messages consumed.

Creating multiple instances of Consumer for same topic will deliver message to only one of the instances. Inorder to recreate a Consumer for same topic make sure Stop() is called on old Consumer instance.

func CreateSingleTopicConsumer added in v0.3.3

func CreateSingleTopicConsumer(tenantID, namespace, topic string, handler Handler, opts ConsumerOpts) (Consumer, error)

This API will create a Consumer for a particular topic. The handler passed should implement the Handler interface from this module. The consumer will create the subscription and be in a passive state until Start() is called. The consumer can be Paused and Unpaused at any point. The commitInterval used to commit messages after every n messages are consumed. The Pause() function will flushout the already received messages and pause receiving any further messages. The Unpause() function will resume receiving messages. The Stop() function will flush existing messages and stop the consumer. It won't delete the subscription. The Unsubscribe() function can be used if subscription needs to be deleted. The Stats() function provides the stats for messages consumed.

Creating multiple instances of Consumer for same topic will deliver message to only one of the instances. Inorder to recreate a Consumer for same topic make sure Stop() is called on old Consumer instance.

type ConsumerOpts added in v0.3.3

type ConsumerOpts struct {
	SubscriptionName string
	InitialPosition  InitialPosition
}

type Handler

type Handler interface {
	// Handle consumed pulsar message. 'RetryMessage' is used to negatively ack message
	// and requeued to retry after some delay.
	HandleMessage(*Message) *RetryMessage
}

type InitialPosition added in v0.3.3

type InitialPosition int
const (
	// Latest position which means the start consuming position will be the last message
	Latest InitialPosition = iota

	// Earliest position which means the start consuming position will be the first message
	Earliest
)

type Message

type Message struct {
	Key        string
	Value      []byte
	Properties map[string]string
}

type Producer

type Producer interface {
	//This function publishes the messages to the topic.
	Publish([]*Message) error
	//This will close the producer client.
	Stop()
	//This function will provide stats of the messages produced.
	Stats() Stats
}

func CreateProducer

func CreateProducer(tenantID string, namespace string, topic string) (Producer, error)

This API will create a Producer for a particular topic. The Producer instance can be used to Publish messages to the topic.

type RetryMessage added in v0.3.1

type RetryMessage struct {
	RetryAfter time.Duration
}

Struct for retrying consumed message due to failure. Client can return this struct to ensure the message will be enqueued for a given RetryAfter duration.

type Stats

type Stats struct {
	TotalMessages uint64
}

func (Stats) IncrementMessageCount

func (s Stats) IncrementMessageCount(messages uint64)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL