connector

package
v0.0.61 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 7, 2025 License: Apache-2.0 Imports: 13 Imported by: 3

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BuildNonPersistentTopic

func BuildNonPersistentTopic(tenant, namespace string, topicName TopicName) string

func BuildPersistentTopic

func BuildPersistentTopic(tenant, namespace string, topicName TopicName) string

func BuildTopic

func BuildTopic(persistency TopicPersistency, tenant, namespace string, topicName TopicName) string

func GetTopicName added in v0.0.11

func GetTopicName(topic string) string

func NewDlq

func NewDlq(tenant, namespace string, topic TopicName, maxDeliveryAttempts uint32, retryTopic string) *pulsar.DLQPolicy

func ProduceMessage added in v0.0.5

func ProduceMessage(producer pulsar.Producer, producerOpts ...ProduceMessageOption) error

func Ptr added in v0.0.3

func Ptr[T any](v T) *T

func WithConfig added in v0.0.3

func WithConfig(config *config.PulsarConfig) func(*PulsarClientOptions)

func WithOperationTimeout added in v0.0.3

func WithOperationTimeout(operationTimeout time.Duration) func(*PulsarClientOptions)

func WithRetryAttempts added in v0.0.3

func WithRetryAttempts(retryAttempts int) func(*PulsarClientOptions)

func WithRetryMaxDelay added in v0.0.3

func WithRetryMaxDelay(maxDelay time.Duration) func(*PulsarClientOptions)

Types

type Client added in v0.0.7

type Client interface {
	pulsar.Client
	GetConfig() config.PulsarConfig
	NewProducer(createProducerOption ...CreateProducerOption) (Producer, error)
	NewConsumer(createConsumerOpts ...CreateConsumerOption) (Consumer, error)
}

func NewClient added in v0.0.7

func NewClient(options ...func(*PulsarClientOptions)) (Client, error)

NewClient creates a new pulsar client

type Consumer added in v0.0.7

type Consumer interface {
	pulsar.Consumer
	//ReconsumeLaterDLQSafe returns false if the message is not Reconsumable (i.e. out of redeliveries or out of time. next attempt will go to dlq)
	//If the message was sent for reconsuming the return value is true
	ReconsumeLaterDLQSafe(msg pulsar.Message, delay time.Duration) bool
	//IsReconsumable returns true if the message can be reconsumed or false if next attempt will go to dlq
	IsReconsumable(msg pulsar.Message) bool
}

type CreateConsumerOption added in v0.0.5

type CreateConsumerOption func(*createConsumerOptions)

func WithBackoffPolicy added in v0.0.7

func WithBackoffPolicy(backoffPolicy pulsar.NackBackoffPolicy) CreateConsumerOption

func WithDLQ added in v0.0.7

func WithDLQ(maxDeliveryAttempts uint32) CreateConsumerOption

maxDeliveryAttempts before sending to DLQ - 0 means no DLQ by default, maxDeliveryAttempts is 5

func WithDefaultBackoffPolicy added in v0.0.7

func WithDefaultBackoffPolicy() CreateConsumerOption

func WithFullTopics added in v0.0.31

func WithFullTopics(topics []TopicName) CreateConsumerOption

func WithMessageChannel added in v0.0.5

func WithMessageChannel(messageChannel chan pulsar.ConsumerMessage) CreateConsumerOption

func WithName added in v0.0.45

func WithName(name string) CreateConsumerOption

func WithNamespace added in v0.0.7

func WithNamespace(tenant, namespace string) CreateConsumerOption

func WithRedeliveryDelay added in v0.0.7

func WithRedeliveryDelay(redeliveryDelay time.Duration) CreateConsumerOption

func WithRetryEnable added in v0.0.23

func WithRetryEnable(enable, forceDLQSafeRetry bool, retryDuration time.Duration) CreateConsumerOption

func WithSubscriptionName added in v0.0.5

func WithSubscriptionName(subscriptionName string) CreateConsumerOption

func WithSubscriptionType added in v0.0.41

func WithSubscriptionType(subscriptionType pulsar.SubscriptionType) CreateConsumerOption

func WithTopic added in v0.0.5

func WithTopic(topic TopicName) CreateConsumerOption

func WithTopics added in v0.0.5

func WithTopics(topics []TopicName) CreateConsumerOption

type CreateProducerOption added in v0.0.7

type CreateProducerOption func(*createProducerOptions)

func WithProducerFullTopic added in v0.0.13

func WithProducerFullTopic(topic TopicName) CreateProducerOption

func WithProducerNamespace added in v0.0.7

func WithProducerNamespace(tenant, namespace string) CreateProducerOption

func WithProducerTopic added in v0.0.7

func WithProducerTopic(topic TopicName) CreateProducerOption

type NackBackoffPolicy added in v0.0.7

type NackBackoffPolicy struct {
	// contains filtered or unexported fields
}

NackBackoffPolicy implements pulsar's NackBackoffPolicy interface NackBackoffPolicy is a redelivery backoff mechanism which we can achieve redelivery with different delays according to the number of times the message is retried.

func NewNackBackoffPolicy added in v0.0.7

func NewNackBackoffPolicy(minRedeliveryDelayMultiplier, maxRedeliveryDelayMultiplier uint32, baseDelay time.Duration) (*NackBackoffPolicy, error)

NewNackBackoffPolicy creates a new NackBackoffPolicy or returns an error if the parameters are invalid

func (*NackBackoffPolicy) Next added in v0.0.7

func (nbp *NackBackoffPolicy) Next(redeliveryCount uint32) time.Duration

Next determines the next redelivery delay based on the given redelivery count.

func (*NackBackoffPolicy) Validate added in v0.0.7

func (nbp *NackBackoffPolicy) Validate() error

Validate checks the consistency of the backoff policy parameters.

type ProduceMessageOption added in v0.0.5

type ProduceMessageOption func(*produceMessageOptions)

func WithContext added in v0.0.5

func WithContext(ctx context.Context) ProduceMessageOption

func WithDelay added in v0.0.61

func WithDelay(deliveryDelay time.Duration) ProduceMessageOption

through a `SubscriptionType=Shared` subscription. With other subscription types, the messages will still be delivered immediately.

func WithMessageKey added in v0.0.41

func WithMessageKey(key string) ProduceMessageOption

func WithMessageToSend added in v0.0.5

func WithMessageToSend(msgToSend interface{}) ProduceMessageOption

func WithProperties added in v0.0.5

func WithProperties(properties map[string]string) ProduceMessageOption

func WithPulsarClient added in v0.0.5

func WithPulsarClient(pulsarClient Client) ProduceMessageOption

type Producer added in v0.0.7

type Producer interface {
	pulsar.Producer
}

type PulsarClientOptions added in v0.0.3

type PulsarClientOptions struct {
	// contains filtered or unexported fields
}

type TopicName

type TopicName string

type TopicPersistency

type TopicPersistency string
const (
	//topic persistency prefix
	TopicTypePersistent    TopicPersistency = "persistent"
	TopicTypeNonPersistent TopicPersistency = "non-persistent"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL