Documentation ¶
Overview ¶
Package pulsar is the implementation of the iterator interface and the publisher interface for pulsar.
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var DefaultIteratorSettings = IteratorSettings{ ConnectionTimeout: 5 * time.Second, OperationTimeout: 30 * time.Second, NackRedeliveryDelay: 30 * time.Second, MaxConnectionsPerBroker: 1, SubscriptionType: Exclusive, ReceiverQueueSize: 1000, MaxReconnectToBroker: nil, TLSTrustCertsFilePath: "", TLSAllowInsecureConnection: false, }
View Source
var DefaultPublisherSettings = PublisherSettings{ ConnectionTimeout: 5 * time.Second, OperationTimeout: 30 * time.Second, SendTimeout: 30 * time.Second, MaxConnectionsPerBroker: 1, DisableBlockIfQueueFull: false, MaxPendingMessages: 1, MaxReconnectToBroker: nil, TLSTrustCertsFilePath: "", TLSAllowInsecureConnection: false, }
DefaultPublisherSettings stores the default values for ProducerSettings.
Functions ¶
This section is empty.
Types ¶
type Iterator ¶
type Iterator struct {
// contains filtered or unexported fields
}
func NewIterator ¶
func NewIterator(config IteratorConfig, settings IteratorSettings) (*Iterator, error)
type IteratorConfig ¶
type IteratorConfig struct { // ServiceURL the URL of the Pulsar service. ServiceURL string // Topic name of the topic. Topic string // Subscription name of the subscription. Subscription string // TLSConfig tls configuration for the Pulsar client. // // If nil, the iterator will not use tls. TLSConfig *tls.Config }
type IteratorSettings ¶
type IteratorSettings struct { // ConnectionTimeout timeout for the establishment of a TCP connection. ConnectionTimeout time.Duration // OperationTimeout timeout for creating the iterator. // // After this duration, it will return an error. OperationTimeout time.Duration // NackRedeliveryDelay delay after which to redeliver the messages that failed to be processed. NackRedeliveryDelay time.Duration // MaxConnectionsPerBroker max number of connections to a single broker that will be kept in the pool. MaxConnectionsPerBroker int // SubscriptionType determines the mode in which messages are dispatched and consumed for this subscription. SubscriptionType SubscriptionType // ReceiverQueueSize sets the size of the consumer receive queue. // // The consumer receive queue controls how many messages can be accumulated by the `Consumer` before the // application calls `Consumer.receive()`. Using a higher value could potentially increase the consumer // throughput at the expense of bigger memory utilization. // Default value is `1000` messages and should be good for most use cases. ReceiverQueueSize int // MaxReconnectToBroker specifies the maximum retry number of reconnectToBroker. // // If nil, the client will retry forever. MaxReconnectToBroker *uint // TLSTrustCertsFilePath path to trusted certificate file. // // If this is not set, TLSAllowInsecureConnection should be set to `true`. TLSTrustCertsFilePath string // TLSAllowInsecureConnection controls if the client accepts untrusted TLS certificate from the broker. // // If this is set to `true`, TLSTrustCertsFilePath is not needed. TLSAllowInsecureConnection bool }
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
Publisher models a Pulsar producer.
func NewPublisher ¶
func NewPublisher(config PublisherConfig, settings PublisherSettings) (*Publisher, error)
type PublisherConfig ¶
type PublisherConfig struct { // ServiceURL the URL of the Pulsar service. ServiceURL string // TLSConfig tls configuration for the Pulsar client. // // If nil, the publisher will not use tls. TLSConfig *tls.Config }
PublisherConfig defines the configuration properties required for initializing a Pulsar producer.
type PublisherSettings ¶
type PublisherSettings struct { // ConnectionTimeout timeout for the establishment of a TCP connection. ConnectionTimeout time.Duration // OperationTimeout timeout for creating the publisher. // // After this duration, it will return an error. OperationTimeout time.Duration // SendTimeout timeout for a published message to be acknowledged by the broker. // // After timing out, error is returned. // If set to negative value, such as -1, the timeout is disabled. SendTimeout time.Duration // MaxConnectionsPerBroker max number of connections to a single broker that will be kept in the pool. MaxConnectionsPerBroker int // DisableBlockIfQueueFull controls whether publishing blocks if producer's message queue is full. // Default is false, if set to true then Publish returns error when queue is full. DisableBlockIfQueueFull bool // MaxPendingMessages specifies the max size of the queue holding messages waiting an acknowledgment from the broker. MaxPendingMessages int // MaxReconnectToBroker specifies the maximum retry number of reconnectToBroker. // // If nil, the client will retry forever. MaxReconnectToBroker *uint // TLSTrustCertsFilePath path to trusted certificate file. // // If this is not set, TLSAllowInsecureConnection should be set to `true`. TLSTrustCertsFilePath string // TLSAllowInsecureConnection controls if the client accepts untrusted TLS certificate from the broker. // // If this is set to `true`, TLSTrustCertsFilePath is not needed. TLSAllowInsecureConnection bool }
PublisherSettings the optional settings for a Pulsar producer.
type SubscriptionType ¶
type SubscriptionType int
const ( // Exclusive there can be only 1 consumer on the same topic with the same subscription name. Exclusive SubscriptionType = iota // // Multiple consumer are able to use the same subscription name and messages are dispatched according to // a round-robin rotation between the connected consumers. Shared // Failover subscription mode. // // Multiple consumer are able to use the same subscription name but only 1 consumer receives the messages. // If that consumer disconnects, one of the other connected consumers will start receiving messages. Failover // // Multiple consumer are able to use the same subscription name // and all messages with the same key are dispatched to the same consumer. KeyShared )
type Topic ¶
type Topic struct {
// contains filtered or unexported fields
}
func (*Topic) BatchPublish ¶
Click to show internal directories.
Click to hide internal directories.