pulsar

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 22, 2024 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Overview

Package pulsar is the implementation of the iterator interface and the publisher interface for pulsar.

Index

Constants

This section is empty.

Variables

View Source
var DefaultIteratorSettings = IteratorSettings{
	ConnectionTimeout:          5 * time.Second,
	OperationTimeout:           30 * time.Second,
	NackRedeliveryDelay:        30 * time.Second,
	MaxConnectionsPerBroker:    1,
	SubscriptionType:           Exclusive,
	ReceiverQueueSize:          1000,
	MaxReconnectToBroker:       nil,
	TLSTrustCertsFilePath:      "",
	TLSAllowInsecureConnection: false,
}
View Source
var DefaultPublisherSettings = PublisherSettings{
	ConnectionTimeout:          5 * time.Second,
	OperationTimeout:           30 * time.Second,
	SendTimeout:                30 * time.Second,
	MaxConnectionsPerBroker:    1,
	DisableBlockIfQueueFull:    false,
	MaxPendingMessages:         1,
	MaxReconnectToBroker:       nil,
	TLSTrustCertsFilePath:      "",
	TLSAllowInsecureConnection: false,
}

DefaultPublisherSettings stores the default values for ProducerSettings.

Functions

This section is empty.

Types

type Iterator

type Iterator struct {
	// contains filtered or unexported fields
}

func NewIterator

func NewIterator(config IteratorConfig, settings IteratorSettings) (*Iterator, error)

func (*Iterator) Close

func (it *Iterator) Close() error

func (*Iterator) NextMessage

func (it *Iterator) NextMessage(ctx context.Context) (broker.Message, error)

type IteratorConfig

type IteratorConfig struct {
	// ServiceURL the URL of the Pulsar service.
	ServiceURL string

	// Topic name of the topic.
	Topic string

	// Subscription name of the subscription.
	Subscription string

	// TLSConfig tls configuration for the Pulsar client.
	//
	// If nil, the iterator will not use tls.
	TLSConfig *tls.Config
}

type IteratorSettings

type IteratorSettings struct {
	// ConnectionTimeout timeout for the establishment of a TCP connection.
	ConnectionTimeout time.Duration

	// OperationTimeout timeout for creating the iterator.
	//
	// After this duration, it will return an error.
	OperationTimeout time.Duration

	// NackRedeliveryDelay delay after which to redeliver the messages that failed to be processed.
	NackRedeliveryDelay time.Duration

	// MaxConnectionsPerBroker max number of connections to a single broker that will be kept in the pool.
	MaxConnectionsPerBroker int

	// SubscriptionType determines the mode in which messages are dispatched and consumed for this subscription.
	SubscriptionType SubscriptionType

	// ReceiverQueueSize sets the size of the consumer receive queue.
	//
	// The consumer receive queue controls how many messages can be accumulated by the `Consumer` before the
	// application calls `Consumer.receive()`. Using a higher value could potentially increase the consumer
	// throughput at the expense of bigger memory utilization.
	// Default value is `1000` messages and should be good for most use cases.
	ReceiverQueueSize int

	// MaxReconnectToBroker specifies the maximum retry number of reconnectToBroker.
	//
	// If nil, the client will retry forever.
	MaxReconnectToBroker *uint

	// TLSTrustCertsFilePath path to trusted certificate file.
	//
	// If this is not set, TLSAllowInsecureConnection should be set to `true`.
	TLSTrustCertsFilePath string

	// TLSAllowInsecureConnection controls if the client accepts untrusted TLS certificate from the broker.
	//
	// If this is set to `true`, TLSTrustCertsFilePath is not needed.
	TLSAllowInsecureConnection bool
}

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher models a Pulsar producer.

func NewPublisher

func NewPublisher(config PublisherConfig, settings PublisherSettings) (*Publisher, error)

func (*Publisher) Close

func (p *Publisher) Close() error

func (*Publisher) Topic

func (p *Publisher) Topic(topicID string) (broker.Topic, error)

type PublisherConfig

type PublisherConfig struct {
	// ServiceURL the URL of the Pulsar service.
	ServiceURL string

	// TLSConfig tls configuration for the Pulsar client.
	//
	// If nil, the publisher will not use tls.
	TLSConfig *tls.Config
}

PublisherConfig defines the configuration properties required for initializing a Pulsar producer.

type PublisherSettings

type PublisherSettings struct {
	// ConnectionTimeout timeout for the establishment of a TCP connection.
	ConnectionTimeout time.Duration

	// OperationTimeout timeout for creating the publisher.
	//
	// After this duration, it will return an error.
	OperationTimeout time.Duration

	// SendTimeout timeout for a published message to be acknowledged by the broker.
	//
	// After timing out, error is returned.
	// If set to negative value, such as -1, the timeout is disabled.
	SendTimeout time.Duration

	// MaxConnectionsPerBroker max number of connections to a single broker that will be kept in the pool.
	MaxConnectionsPerBroker int

	// DisableBlockIfQueueFull controls whether publishing blocks if producer's message queue is full.
	// Default is false, if set to true then Publish returns error when queue is full.
	DisableBlockIfQueueFull bool

	// MaxPendingMessages specifies the max size of the queue holding messages waiting an acknowledgment from the broker.
	MaxPendingMessages int

	// MaxReconnectToBroker specifies the maximum retry number of reconnectToBroker.
	//
	// If nil, the client will retry forever.
	MaxReconnectToBroker *uint

	// TLSTrustCertsFilePath path to trusted certificate file.
	//
	// If this is not set, TLSAllowInsecureConnection should be set to `true`.
	TLSTrustCertsFilePath string

	// TLSAllowInsecureConnection controls if the client accepts untrusted TLS certificate from the broker.
	//
	// If this is set to `true`, TLSTrustCertsFilePath is not needed.
	TLSAllowInsecureConnection bool
}

PublisherSettings the optional settings for a Pulsar producer.

type SubscriptionType

type SubscriptionType int
const (
	// Exclusive there can be only 1 consumer on the same topic with the same subscription name.
	Exclusive SubscriptionType = iota

	// Shared subscription mode.
	//
	// Multiple consumer are able to use the same subscription name and messages are dispatched according to
	// a round-robin rotation between the connected consumers.
	Shared

	// Failover subscription mode.
	//
	// Multiple consumer are able to use the same subscription name but only 1 consumer receives the messages.
	// If that consumer disconnects, one of the other connected consumers will start receiving messages.
	Failover

	// KeyShared subscription mode.
	//
	// Multiple consumer are able to use the same subscription name
	// and all messages with the same key are dispatched to the same consumer.
	KeyShared
)

type Topic

type Topic struct {
	// contains filtered or unexported fields
}

func (*Topic) BatchPublish

func (t *Topic) BatchPublish(ctx context.Context, messages ...broker.OutboundMessage) error

func (*Topic) Close

func (t *Topic) Close() error

func (*Topic) Publish

func (t *Topic) Publish(ctx context.Context, message broker.OutboundMessage) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL