Documentation ¶
Index ¶
- func BuildNonPersistentTopic(tenant, namespace string, topicName TopicName) string
- func BuildPersistentTopic(tenant, namespace string, topicName TopicName) string
- func BuildTopic(persistency TopicPersistency, tenant, namespace string, topicName TopicName) string
- func GetTopicName(topic string) string
- func NewDlq(tenant, namespace string, topic TopicName, maxDeliveryAttempts uint32, ...) *pulsar.DLQPolicy
- func ProduceMessage(producer pulsar.Producer, producerOpts ...ProduceMessageOption) error
- func Ptr[T any](v T) *T
- func WithConfig(config *config.PulsarConfig) func(*PulsarClientOptions)
- func WithOperationTimeout(operationTimeout time.Duration) func(*PulsarClientOptions)
- func WithRetryAttempts(retryAttempts int) func(*PulsarClientOptions)
- func WithRetryMaxDelay(maxDelay time.Duration) func(*PulsarClientOptions)
- type Client
- type Consumer
- type CreateConsumerOption
- func WithBackoffPolicy(backoffPolicy pulsar.NackBackoffPolicy) CreateConsumerOption
- func WithDLQ(maxDeliveryAttempts uint32) CreateConsumerOption
- func WithDefaultBackoffPolicy() CreateConsumerOption
- func WithFullTopics(topics []TopicName) CreateConsumerOption
- func WithMessageChannel(messageChannel chan pulsar.ConsumerMessage) CreateConsumerOption
- func WithName(name string) CreateConsumerOption
- func WithNamespace(tenant, namespace string) CreateConsumerOption
- func WithRedeliveryDelay(redeliveryDelay time.Duration) CreateConsumerOption
- func WithRetryEnable(enable, forceDLQSafeRetry bool, retryDuration time.Duration) CreateConsumerOption
- func WithSubscriptionName(subscriptionName string) CreateConsumerOption
- func WithSubscriptionType(subscriptionType pulsar.SubscriptionType) CreateConsumerOption
- func WithTopic(topic TopicName) CreateConsumerOption
- func WithTopics(topics []TopicName) CreateConsumerOption
- type CreateProducerOption
- type NackBackoffPolicy
- type ProduceMessageOption
- func WithContext(ctx context.Context) ProduceMessageOption
- func WithDelay(deliveryDelay time.Duration) ProduceMessageOption
- func WithMessageKey(key string) ProduceMessageOption
- func WithMessageToSend(msgToSend interface{}) ProduceMessageOption
- func WithProperties(properties map[string]string) ProduceMessageOption
- func WithPulsarClient(pulsarClient Client) ProduceMessageOption
- type Producer
- type PulsarClientOptions
- type TopicName
- type TopicPersistency
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func BuildNonPersistentTopic ¶
func BuildPersistentTopic ¶
func BuildTopic ¶
func BuildTopic(persistency TopicPersistency, tenant, namespace string, topicName TopicName) string
func GetTopicName ¶ added in v0.0.11
func ProduceMessage ¶ added in v0.0.5
func ProduceMessage(producer pulsar.Producer, producerOpts ...ProduceMessageOption) error
func WithConfig ¶ added in v0.0.3
func WithConfig(config *config.PulsarConfig) func(*PulsarClientOptions)
func WithOperationTimeout ¶ added in v0.0.3
func WithOperationTimeout(operationTimeout time.Duration) func(*PulsarClientOptions)
func WithRetryAttempts ¶ added in v0.0.3
func WithRetryAttempts(retryAttempts int) func(*PulsarClientOptions)
func WithRetryMaxDelay ¶ added in v0.0.3
func WithRetryMaxDelay(maxDelay time.Duration) func(*PulsarClientOptions)
Types ¶
type Client ¶ added in v0.0.7
type Client interface { pulsar.Client GetConfig() config.PulsarConfig NewProducer(createProducerOption ...CreateProducerOption) (Producer, error) NewConsumer(createConsumerOpts ...CreateConsumerOption) (Consumer, error) }
func NewClient ¶ added in v0.0.7
func NewClient(options ...func(*PulsarClientOptions)) (Client, error)
NewClient creates a new pulsar client
type Consumer ¶ added in v0.0.7
type Consumer interface { pulsar.Consumer //ReconsumeLaterDLQSafe returns false if the message is not Reconsumable (i.e. out of redeliveries or out of time. next attempt will go to dlq) //If the message was sent for reconsuming the return value is true ReconsumeLaterDLQSafe(msg pulsar.Message, delay time.Duration) bool //IsReconsumable returns true if the message can be reconsumed or false if next attempt will go to dlq IsReconsumable(msg pulsar.Message) bool }
type CreateConsumerOption ¶ added in v0.0.5
type CreateConsumerOption func(*createConsumerOptions)
func WithBackoffPolicy ¶ added in v0.0.7
func WithBackoffPolicy(backoffPolicy pulsar.NackBackoffPolicy) CreateConsumerOption
func WithDLQ ¶ added in v0.0.7
func WithDLQ(maxDeliveryAttempts uint32) CreateConsumerOption
maxDeliveryAttempts before sending to DLQ - 0 means no DLQ by default, maxDeliveryAttempts is 5
func WithDefaultBackoffPolicy ¶ added in v0.0.7
func WithDefaultBackoffPolicy() CreateConsumerOption
func WithFullTopics ¶ added in v0.0.31
func WithFullTopics(topics []TopicName) CreateConsumerOption
func WithMessageChannel ¶ added in v0.0.5
func WithMessageChannel(messageChannel chan pulsar.ConsumerMessage) CreateConsumerOption
func WithName ¶ added in v0.0.45
func WithName(name string) CreateConsumerOption
func WithNamespace ¶ added in v0.0.7
func WithNamespace(tenant, namespace string) CreateConsumerOption
func WithRedeliveryDelay ¶ added in v0.0.7
func WithRedeliveryDelay(redeliveryDelay time.Duration) CreateConsumerOption
func WithRetryEnable ¶ added in v0.0.23
func WithRetryEnable(enable, forceDLQSafeRetry bool, retryDuration time.Duration) CreateConsumerOption
func WithSubscriptionName ¶ added in v0.0.5
func WithSubscriptionName(subscriptionName string) CreateConsumerOption
func WithSubscriptionType ¶ added in v0.0.41
func WithSubscriptionType(subscriptionType pulsar.SubscriptionType) CreateConsumerOption
func WithTopic ¶ added in v0.0.5
func WithTopic(topic TopicName) CreateConsumerOption
func WithTopics ¶ added in v0.0.5
func WithTopics(topics []TopicName) CreateConsumerOption
type CreateProducerOption ¶ added in v0.0.7
type CreateProducerOption func(*createProducerOptions)
func WithProducerFullTopic ¶ added in v0.0.13
func WithProducerFullTopic(topic TopicName) CreateProducerOption
func WithProducerNamespace ¶ added in v0.0.7
func WithProducerNamespace(tenant, namespace string) CreateProducerOption
func WithProducerTopic ¶ added in v0.0.7
func WithProducerTopic(topic TopicName) CreateProducerOption
type NackBackoffPolicy ¶ added in v0.0.7
type NackBackoffPolicy struct {
// contains filtered or unexported fields
}
NackBackoffPolicy implements pulsar's NackBackoffPolicy interface NackBackoffPolicy is a redelivery backoff mechanism which we can achieve redelivery with different delays according to the number of times the message is retried.
func NewNackBackoffPolicy ¶ added in v0.0.7
func NewNackBackoffPolicy(minRedeliveryDelayMultiplier, maxRedeliveryDelayMultiplier uint32, baseDelay time.Duration) (*NackBackoffPolicy, error)
NewNackBackoffPolicy creates a new NackBackoffPolicy or returns an error if the parameters are invalid
func (*NackBackoffPolicy) Next ¶ added in v0.0.7
func (nbp *NackBackoffPolicy) Next(redeliveryCount uint32) time.Duration
Next determines the next redelivery delay based on the given redelivery count.
func (*NackBackoffPolicy) Validate ¶ added in v0.0.7
func (nbp *NackBackoffPolicy) Validate() error
Validate checks the consistency of the backoff policy parameters.
type ProduceMessageOption ¶ added in v0.0.5
type ProduceMessageOption func(*produceMessageOptions)
func WithContext ¶ added in v0.0.5
func WithContext(ctx context.Context) ProduceMessageOption
func WithDelay ¶ added in v0.0.61
func WithDelay(deliveryDelay time.Duration) ProduceMessageOption
through a `SubscriptionType=Shared` subscription. With other subscription types, the messages will still be delivered immediately.
func WithMessageKey ¶ added in v0.0.41
func WithMessageKey(key string) ProduceMessageOption
func WithMessageToSend ¶ added in v0.0.5
func WithMessageToSend(msgToSend interface{}) ProduceMessageOption
func WithProperties ¶ added in v0.0.5
func WithProperties(properties map[string]string) ProduceMessageOption
func WithPulsarClient ¶ added in v0.0.5
func WithPulsarClient(pulsarClient Client) ProduceMessageOption
type PulsarClientOptions ¶ added in v0.0.3
type PulsarClientOptions struct {
// contains filtered or unexported fields
}
type TopicPersistency ¶
type TopicPersistency string
const ( //topic persistency prefix TopicTypePersistent TopicPersistency = "persistent" TopicTypeNonPersistent TopicPersistency = "non-persistent" )