pulsar

package
v0.14.0-candidate-1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 23, 2024 License: Apache-2.0 Imports: 42 Imported by: 406

Documentation

Index

Constants

View Source
const (
	DlqTopicSuffix    = "-DLQ"
	RetryTopicSuffix  = "-RETRY"
	MaxReconsumeTimes = 16

	SysPropertyDelayTime       = "DELAY_TIME"
	SysPropertyRealTopic       = "REAL_TOPIC"
	SysPropertyRetryTopic      = "RETRY_TOPIC"
	SysPropertyReconsumeTimes  = "RECONSUMETIMES"
	SysPropertyOriginMessageID = "ORIGIN_MESSAGE_IDY_TIME"
	PropertyOriginMessageID    = "ORIGIN_MESSAGE_ID"
)
View Source
const (
	IoMaxSize = 1024
)
View Source
const TransactionCoordinatorAssign = "persistent://pulsar/system/transaction_coordinator_assign"

TransactionCoordinatorAssign is the transaction_impl coordinator topic which is used to look up the broker where the TC located.

Variables

View Source
var (
	ErrFailAddToBatch               = newError(AddToBatchFailed, "message add to batch failed")
	ErrSendTimeout                  = newError(TimeoutError, "message send timeout")
	ErrSendQueueIsFull              = newError(ProducerQueueIsFull, "producer send queue is full")
	ErrContextExpired               = newError(TimeoutError, "message send context expired")
	ErrMessageTooLarge              = newError(MessageTooBig, "message size exceeds MaxMessageSize")
	ErrMetaTooLarge                 = newError(InvalidMessage, "message metadata size exceeds MaxMessageSize")
	ErrProducerClosed               = newError(ProducerClosed, "producer already been closed")
	ErrMemoryBufferIsFull           = newError(ClientMemoryBufferIsFull, "client memory buffer is full")
	ErrSchema                       = newError(SchemaFailure, "schema error")
	ErrTransaction                  = errors.New("transaction error")
	ErrInvalidMessage               = newError(InvalidMessage, "invalid message")
	ErrTopicNotfound                = newError(TopicNotFound, "topic not found")
	ErrTopicTerminated              = newError(TopicTerminated, "topic terminated")
	ErrProducerBlockedQuotaExceeded = newError(ProducerBlockedQuotaExceededException, "producer blocked")
	ErrProducerFenced               = newError(ProducerFenced, "producer fenced")
)
View Source
var (
	ErrInvalidAck = errors.New("invalid ack")
)
View Source
var ErrMaxConcurrentOpsReached = newError(MaxConcurrentOperationsReached, "Max concurrent operations reached")
View Source
var ErrTransactionCoordinatorNotEnabled = newError(TransactionCoordinatorNotEnabled, "The broker doesn't enable "+
	"the transaction coordinator, or the transaction coordinator has not initialized")

Functions

func GetBatcherBuilderProvider added in v0.4.0

func GetBatcherBuilderProvider(typ BatcherBuilderType) (
	internal.BatcherBuilderProvider, error,
)

func NewDefaultRouter added in v0.3.0

func NewDefaultRouter(
	hashFunc func(string) uint32,
	maxBatchingMessages uint,
	maxBatchingSize uint,
	maxBatchingDelay time.Duration,
	disableBatching bool) func(*ProducerMessage, uint32) int

NewDefaultRouter set the message routing mode for the partitioned producer. Default routing mode is round-robin routing if no partition key is specified. If the batching is enabled, it honors the different thresholds for batching i.e. maximum batch size, maximum number of messages, maximum delay to publish a batch. When one of the threshold is reached the next partition is used.

func NewSinglePartitionRouter added in v0.11.0

func NewSinglePartitionRouter() func(*ProducerMessage, TopicMetadata) int

func ReadElements added in v0.3.0

func ReadElements(r io.Reader, elements ...interface{}) error

func WriteElements added in v0.3.0

func WriteElements(w io.Writer, elements ...interface{}) error

Types

type AckGroupingOptions added in v0.10.0

type AckGroupingOptions struct {
	// The maximum number of ACK requests to cache
	MaxSize uint32

	// The maximum time to cache ACK requests
	MaxTime time.Duration
}

AckGroupingOptions controls how to group ACK requests If maxSize is 0 or 1, any ACK request will be sent immediately. Otherwise, the ACK requests will be cached until one of the following conditions meets: 1. There are `MaxSize` pending ACK requests. 2. `MaxTime` is greater than 1 microsecond and ACK requests have been cached for `maxTime`. Specially, for cumulative acknowledgment, only the latest ACK is cached and it will only be sent after `MaxTime`.

type Authentication

type Authentication interface{}

Authentication Opaque interface that represents the authentication credentials

func NewAuthentication

func NewAuthentication(name string, params string) (Authentication, error)

NewAuthentication Creates an authentication by name and params

func NewAuthenticationAthenz added in v0.1.1

func NewAuthenticationAthenz(authParams map[string]string) Authentication

NewAuthenticationAthenz Creates Athenz Authentication provider

func NewAuthenticationBasic added in v0.9.0

func NewAuthenticationBasic(username, password string) (Authentication, error)

NewAuthenticationBasic Creates Basic Authentication provider

func NewAuthenticationFromTLSCertSupplier added in v0.2.0

func NewAuthenticationFromTLSCertSupplier(tlsCertSupplier func() (*tls.Certificate, error)) Authentication

NewAuthenticationFromTLSCertSupplier Create new Authentication provider with specified TLS certificate supplier

func NewAuthenticationOAuth2 added in v0.2.0

func NewAuthenticationOAuth2(authParams map[string]string) Authentication

NewAuthenticationOAuth2 Creates OAuth2 Authentication provider

func NewAuthenticationTLS

func NewAuthenticationTLS(certificatePath string, privateKeyPath string) Authentication

NewAuthenticationTLS Creates new Authentication provider with specified TLS certificate and private key

func NewAuthenticationToken

func NewAuthenticationToken(token string) Authentication

NewAuthenticationToken Creates new Authentication provider with specified auth token

func NewAuthenticationTokenFromFile

func NewAuthenticationTokenFromFile(tokenFilePath string) Authentication

NewAuthenticationTokenFromFile Creates new Authentication provider with specified auth token from a file

func NewAuthenticationTokenFromSupplier

func NewAuthenticationTokenFromSupplier(tokenSupplier func() (string, error)) Authentication

NewAuthenticationTokenFromSupplier returns a token auth provider that gets the token data from a user supplied function. The function is invoked each time the client library needs to use a token in talking with Pulsar brokers

type AvroSchema added in v0.3.0

type AvroSchema struct {
	Codec avro.Schema
	SchemaInfo
}

func NewAvroSchema added in v0.3.0

func NewAvroSchema(avroSchemaDef string, properties map[string]string) *AvroSchema

NewAvroSchema creates a new AvroSchema Note: the function will panic if creation of codec fails

func NewAvroSchemaWithValidation added in v0.9.0

func NewAvroSchemaWithValidation(avroSchemaDef string, properties map[string]string) (*AvroSchema, error)

NewAvroSchemaWithValidation creates a new AvroSchema and error to indicate codec failure

func (*AvroSchema) Decode added in v0.3.0

func (as *AvroSchema) Decode(data []byte, v interface{}) error

func (*AvroSchema) Encode added in v0.3.0

func (as *AvroSchema) Encode(data interface{}) ([]byte, error)

func (*AvroSchema) GetSchemaInfo added in v0.3.0

func (as *AvroSchema) GetSchemaInfo() *SchemaInfo

func (*AvroSchema) Validate added in v0.3.0

func (as *AvroSchema) Validate(message []byte) error

type BatcherBuilderType added in v0.4.0

type BatcherBuilderType int
const (
	DefaultBatchBuilder BatcherBuilderType = iota
	KeyBasedBatchBuilder
)

type BinaryFreeList added in v0.3.0

type BinaryFreeList chan []byte
var BinarySerializer BinaryFreeList = make(chan []byte, IoMaxSize)

func (BinaryFreeList) Borrow added in v0.3.0

func (b BinaryFreeList) Borrow() (buf []byte)

func (BinaryFreeList) Float32 added in v0.3.0

func (b BinaryFreeList) Float32(buf []byte) (float32, error)

func (BinaryFreeList) Float64 added in v0.3.0

func (b BinaryFreeList) Float64(buf []byte) (float64, error)

func (BinaryFreeList) PutDouble added in v0.3.0

func (b BinaryFreeList) PutDouble(datum interface{}) ([]byte, error)

func (BinaryFreeList) PutFloat added in v0.3.0

func (b BinaryFreeList) PutFloat(datum interface{}) ([]byte, error)

func (BinaryFreeList) PutUint16 added in v0.3.0

func (b BinaryFreeList) PutUint16(w io.Writer, byteOrder binary.ByteOrder, val uint16) error

func (BinaryFreeList) PutUint32 added in v0.3.0

func (b BinaryFreeList) PutUint32(w io.Writer, byteOrder binary.ByteOrder, val uint32) error

func (BinaryFreeList) PutUint64 added in v0.3.0

func (b BinaryFreeList) PutUint64(w io.Writer, byteOrder binary.ByteOrder, val uint64) error

func (BinaryFreeList) PutUint8 added in v0.3.0

func (b BinaryFreeList) PutUint8(w io.Writer, val uint8) error

func (BinaryFreeList) Return added in v0.3.0

func (b BinaryFreeList) Return(buf []byte)

func (BinaryFreeList) Uint16 added in v0.3.0

func (b BinaryFreeList) Uint16(r io.Reader, byteOrder binary.ByteOrder) (uint16, error)

func (BinaryFreeList) Uint32 added in v0.3.0

func (b BinaryFreeList) Uint32(r io.Reader, byteOrder binary.ByteOrder) (uint32, error)

func (BinaryFreeList) Uint64 added in v0.3.0

func (b BinaryFreeList) Uint64(r io.Reader, byteOrder binary.ByteOrder) (uint64, error)

func (BinaryFreeList) Uint8 added in v0.3.0

func (b BinaryFreeList) Uint8(r io.Reader) (uint8, error)

type BytesSchema added in v0.3.0

type BytesSchema struct {
	SchemaInfo
}

func NewBytesSchema added in v0.3.0

func NewBytesSchema(properties map[string]string) *BytesSchema

func (*BytesSchema) Decode added in v0.3.0

func (bs *BytesSchema) Decode(data []byte, v interface{}) error

func (*BytesSchema) Encode added in v0.3.0

func (bs *BytesSchema) Encode(data interface{}) ([]byte, error)

func (*BytesSchema) GetSchemaInfo added in v0.3.0

func (bs *BytesSchema) GetSchemaInfo() *SchemaInfo

func (*BytesSchema) Validate added in v0.3.0

func (bs *BytesSchema) Validate(message []byte) error

type Client

type Client interface {
	// CreateProducer Creates the producer instance
	// This method will block until the producer is created successfully
	CreateProducer(ProducerOptions) (Producer, error)

	// Subscribe Creates a `Consumer` by subscribing to a topic.
	//
	// If the subscription does not exist, a new subscription will be created and all messages published after the
	// creation will be retained until acknowledged, even if the consumer is not connected
	Subscribe(ConsumerOptions) (Consumer, error)

	// CreateReader Creates a Reader instance.
	// This method will block until the reader is created successfully.
	CreateReader(ReaderOptions) (Reader, error)

	// CreateTableView creates a table view instance.
	// This method will block until the table view is created successfully.
	CreateTableView(TableViewOptions) (TableView, error)

	// TopicPartitions Fetches the list of partitions for a given topic
	//
	// If the topic is partitioned, this will return a list of partition names.
	// If the topic is not partitioned, the returned list will contain the topic
	// name itself.
	//
	// This can be used to discover the partitions and create {@link Reader},
	// {@link Consumer} or {@link Producer} instances directly on a particular partition.
	TopicPartitions(topic string) ([]string, error)

	// NewTransaction creates a new Transaction instance.
	//
	// This function is used to initiate a new transaction for performing
	// atomic operations on the message broker. It returns a Transaction
	// object that can be used to produce, consume and commit messages in a
	// transactional manner.
	//
	// In case of any errors while creating the transaction, an error will
	// be returned.
	NewTransaction(duration time.Duration) (Transaction, error)

	// Close Closes the Client and free associated resources
	Close()
}

Client represents a pulsar client

func NewClient

func NewClient(options ClientOptions) (Client, error)

NewClient Creates a pulsar client instance

type ClientOptions

type ClientOptions struct {
	// Configure the service URL for the Pulsar service.
	// This parameter is required
	URL string

	// Timeout for the establishment of a TCP connection (default: 5 seconds)
	ConnectionTimeout time.Duration

	// Set the operation timeout (default: 30 seconds)
	// Producer-create, subscribe and unsubscribe operations will be retried until this interval, after which the
	// operation will be marked as failed
	OperationTimeout time.Duration

	// Configure the ping send and check interval, default to 30 seconds.
	KeepAliveInterval time.Duration

	// Configure the authentication provider. (default: no authentication)
	// Example: `Authentication: NewAuthenticationTLS("my-cert.pem", "my-key.pem")`
	Authentication

	// Set the path to the TLS key file
	TLSKeyFilePath string

	// Set the path to the TLS certificate file
	TLSCertificateFile string

	// Set the path to the trusted TLS certificate file
	TLSTrustCertsFilePath string

	// Configure whether the Pulsar client accept untrusted TLS certificate from broker (default: false)
	TLSAllowInsecureConnection bool

	// Configure whether the Pulsar client verify the validity of the host name from broker (default: false)
	TLSValidateHostname bool

	// TLSCipherSuites is a list of enabled TLS 1.0–1.2 cipher suites. See tls.Config CipherSuites for more information.
	TLSCipherSuites []uint16

	// TLSMinVersion contains the minimum TLS version that is acceptable. See tls.Config MinVersion for more information.
	TLSMinVersion uint16

	// TLSMaxVersion contains the maximum TLS version that is acceptable. See tls.Config MaxVersion for more information.
	TLSMaxVersion uint16

	// Configure the net model for vpc user to connect the pulsar broker
	ListenerName string

	// Max number of connections to a single broker that will kept in the pool. (Default: 1 connection)
	MaxConnectionsPerBroker int

	// Configure the logger used by the client.
	// By default, a wrapped logrus.StandardLogger will be used, namely,
	// log.NewLoggerWithLogrus(logrus.StandardLogger())
	// FIXME: use `logger` as internal field name instead of `log` as it's more idiomatic
	Logger log.Logger

	// Specify metric cardinality to the tenant, namespace or topic levels, or remove it completely.
	// Default: MetricsCardinalityNamespace
	MetricsCardinality MetricsCardinality

	// Add custom labels to all the metrics reported by this client instance
	CustomMetricsLabels map[string]string

	// Specify metric registerer used to register metrics.
	// Default prometheus.DefaultRegisterer
	MetricsRegisterer prometheus.Registerer

	// Release the connection if it is not used for more than ConnectionMaxIdleTime.
	// Default is 180 seconds, minimum is 60 seconds. Negative such as -1 to disable.
	ConnectionMaxIdleTime time.Duration

	EnableTransaction bool

	// Limit of client memory usage (in byte). The 64M default can guarantee a high producer throughput.
	// Config less than 0 indicates off memory limit.
	MemoryLimitBytes int64
}

ClientOptions is used to construct a Pulsar Client instance.

type CompressionLevel added in v0.2.0

type CompressionLevel int
const (
	// Default compression level
	Default CompressionLevel = iota

	// Faster compression, with lower compression ratio
	Faster

	// Higher compression rate, but slower
	Better
)

type CompressionType

type CompressionType int
const (
	NoCompression CompressionType = iota
	LZ4
	ZLib
	ZSTD
)

type Consumer

type Consumer interface {
	// Subscription get a subscription for the consumer
	Subscription() string

	// Unsubscribe the consumer
	//
	// Unsubscribing will cause the subscription to be deleted,
	// and all the retained data can potentially be deleted based on message retention and ttl policy.
	//
	// This operation will fail when performed on a shared subscription
	// where more than one consumer are currently connected.
	Unsubscribe() error

	// UnsubscribeForce the consumer, forcefully unsubscribe by disconnecting connected consumers.
	//
	// Unsubscribing will cause the subscription to be deleted,
	// and all the retained data can potentially be deleted based on message retention and ttl policy.
	//
	// This operation will fail when performed on a shared subscription
	// where more than one consumer are currently connected.
	UnsubscribeForce() error

	// GetLastMessageIDs get all the last message id of the topics the consumer subscribed.
	//
	// The list of MessageID instances of all the topics that the consumer subscribed
	GetLastMessageIDs() ([]TopicMessageID, error)

	// Receive a single message.
	// This calls blocks until a message is available.
	Receive(context.Context) (Message, error)

	// Chan returns a channel to consume messages from
	Chan() <-chan ConsumerMessage

	// Ack the consumption of a single message
	Ack(Message) error

	// AckID the consumption of a single message, identified by its MessageID
	AckID(MessageID) error

	// AckWithTxn the consumption of a single message with a transaction
	AckWithTxn(Message, Transaction) error

	// AckCumulative the reception of all the messages in the stream up to (and including)
	// the provided message.
	AckCumulative(msg Message) error

	// AckIDCumulative the reception of all the messages in the stream up to (and including)
	// the provided message, identified by its MessageID
	AckIDCumulative(msgID MessageID) error

	// ReconsumeLater mark a message for redelivery after custom delay
	ReconsumeLater(msg Message, delay time.Duration)

	// ReconsumeLaterWithCustomProperties mark a message for redelivery after custom delay with custom properties
	ReconsumeLaterWithCustomProperties(msg Message, customProperties map[string]string, delay time.Duration)

	// Nack acknowledges the failure to process a single message.
	//
	// When a message is "negatively acked" it will be marked for redelivery after
	// some fixed delay. The delay is configurable when constructing the consumer
	// with ConsumerOptions.NackRedeliveryDelay .
	//
	// This call is not blocking.
	Nack(Message)

	// NackID acknowledges the failure to process a single message.
	//
	// When a message is "negatively acked" it will be marked for redelivery after
	// some fixed delay. The delay is configurable when constructing the consumer
	// with ConsumerOptions.NackRedeliveryDelay .
	//
	// This call is not blocking.
	NackID(MessageID)

	// Close the consumer and stop the broker to push more messages
	Close()

	// Seek resets the subscription associated with this consumer to a specific message id.
	// The message id can either be a specific message or represent the first or last messages in the topic.
	//
	// Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
	//       seek() on the individual partitions.
	Seek(MessageID) error

	// SeekByTime resets the subscription associated with this consumer to a specific message publish time.
	//
	// @param time
	//            the message publish time when to reposition the subscription
	//
	SeekByTime(time time.Time) error

	// Name returns the name of consumer.
	Name() string
}

Consumer is an interface that abstracts behavior of Pulsar's consumer

type ConsumerEventListener added in v0.10.0

type ConsumerEventListener interface {
	BecameActive(consumer Consumer, topicName string, partition int32)
	BecameInactive(consumer Consumer, topicName string, partition int32)
}

type ConsumerInterceptor added in v0.2.0

type ConsumerInterceptor interface {
	// BeforeConsume This is called just before the message is send to Consumer's ConsumerMessage channel.
	BeforeConsume(message ConsumerMessage)

	// OnAcknowledge This is called consumer sends the acknowledgment to the broker.
	OnAcknowledge(consumer Consumer, msgID MessageID)

	// OnNegativeAcksSend This method will be called when a redelivery from a negative acknowledge occurs.
	OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID)
}

type ConsumerInterceptors added in v0.2.0

type ConsumerInterceptors []ConsumerInterceptor

func (ConsumerInterceptors) BeforeConsume added in v0.2.0

func (x ConsumerInterceptors) BeforeConsume(message ConsumerMessage)

func (ConsumerInterceptors) OnAcknowledge added in v0.2.0

func (x ConsumerInterceptors) OnAcknowledge(consumer Consumer, msgID MessageID)

func (ConsumerInterceptors) OnNegativeAcksSend added in v0.2.0

func (x ConsumerInterceptors) OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID)

type ConsumerMessage

type ConsumerMessage struct {
	Consumer
	Message
}

ConsumerMessage represents a pair of a Consumer and Message.

type ConsumerOptions

type ConsumerOptions struct {
	// Topic specifies the topic this consumer will subscribe on.
	// Either a topic, a list of topics or a topics pattern are required when subscribing
	Topic string

	// Topics specifies a list of topics this consumer will subscribe on.
	// Either a topic, a list of topics or a topics pattern are required when subscribing
	Topics []string

	// TopicsPattern specifies a regular expression to subscribe to multiple topics under the same namespace.
	// Either a topic, a list of topics or a topics pattern are required when subscribing
	TopicsPattern string

	// AutoDiscoveryPeriod specifies the interval in which to poll for new partitions or new topics
	// if using a TopicsPattern.
	AutoDiscoveryPeriod time.Duration

	// SubscriptionName specifies the subscription name for this consumer
	// This argument is required when subscribing
	SubscriptionName string

	// Properties represents a set of application defined properties for the consumer.
	// Those properties will be visible in the topic stats
	Properties map[string]string

	// SubscriptionProperties specify the subscription properties for this subscription.
	//
	// > Notice: SubscriptionProperties are immutable, and consumers under the same subscription will fail to create a
	// > subscription if they use different properties.
	SubscriptionProperties map[string]string

	// Type specifies the subscription type to be used when subscribing to a topic.
	// Default is `Exclusive`
	Type SubscriptionType

	// SubscriptionInitialPosition is the initial position at which the cursor will be set when subscribe
	// Default is `Latest`
	SubscriptionInitialPosition

	// EventListener will be called when active consumer changed (in failover subscription type)
	EventListener ConsumerEventListener

	// DLQ represents the configuration for Dead Letter Queue consumer policy.
	// eg. route the message to topic X after N failed attempts at processing it
	// By default is nil and there's no DLQ
	DLQ *DLQPolicy

	// KeySharedPolicy represents the configuration for Key Shared consumer policy.
	KeySharedPolicy *KeySharedPolicy

	// RetryEnable determines whether to automatically retry sending messages to default filled DLQPolicy topics.
	// Default is false
	RetryEnable bool

	// MessageChannel sets a `MessageChannel` for the consumer
	// When a message is received, it will be pushed to the channel for consumption
	MessageChannel chan ConsumerMessage

	// ReceiverQueueSize sets the size of the consumer receive queue.
	// The consumer receive queue controls how many messages can be accumulated by the `Consumer` before the
	// application calls `Consumer.receive()`. Using a higher value could potentially increase the consumer
	// throughput at the expense of bigger memory utilization.
	// Default value is `1000` messages and should be good for most use cases.
	ReceiverQueueSize int

	// EnableZeroQueueConsumer, if enabled, the ReceiverQueueSize will be 0.
	// Notice: only non-partitioned topic is supported.
	// Default is false.
	EnableZeroQueueConsumer bool

	// EnableAutoScaledReceiverQueueSize, if enabled, the consumer receive queue will be auto-scaled
	// by the consumer actual throughput. The ReceiverQueueSize will be the maximum size which consumer
	// receive queue can be scaled.
	// Default is false.
	EnableAutoScaledReceiverQueueSize bool

	// NackRedeliveryDelay specifies the delay after which to redeliver the messages that failed to be
	// processed. Default is 1 min. (See `Consumer.Nack()`)
	NackRedeliveryDelay time.Duration

	// Name specifies the consumer name.
	Name string

	// ReadCompacted, if enabled, the consumer will read messages from the compacted topic rather than reading the
	// full message backlog of the topic. This means that, if the topic has been compacted, the consumer will only
	// see the latest value for each key in the topic, up until the point in the topic message backlog that has been
	// compacted. Beyond that point, the messages will be sent as normal.
	//
	// ReadCompacted can only be enabled subscriptions to persistent topics, which have a single active consumer (i.e.
	//  failure or exclusive subscriptions). Attempting to enable it on subscriptions to a non-persistent topics or on a
	//  shared subscription, will lead to the subscription call throwing a PulsarClientException.
	ReadCompacted bool

	// ReplicateSubscriptionState marks the subscription as replicated to keep it in sync across clusters
	ReplicateSubscriptionState bool

	// Interceptors is a chain of interceptors. These interceptors will be called at some points defined in
	// ConsumerInterceptor interface.
	Interceptors ConsumerInterceptors

	// Schema represents the schema implementation.
	Schema Schema

	// MaxReconnectToBroker sets the maximum retry number of reconnectToBroker. (default: ultimate)
	MaxReconnectToBroker *uint

	// BackOffPolicyFunc parameterize the following options in the reconnection logic to
	// allow users to customize the reconnection logic (minBackoff, maxBackoff and jitterPercentage)
	BackOffPolicyFunc func() backoff.Policy

	// Decryption represents the encryption related fields required by the consumer to decrypt a message.
	Decryption *MessageDecryptionInfo

	// EnableDefaultNackBackoffPolicy, if enabled, the default implementation of NackBackoffPolicy will be used
	// to calculate the delay time of
	// nack backoff, Default: false.
	EnableDefaultNackBackoffPolicy bool

	// NackBackoffPolicy is a redelivery backoff mechanism which we can achieve redelivery with different
	// delays according to the number of times the message is retried.
	//
	// > Notice: the NackBackoffPolicy will not work with `consumer.NackID(MessageID)`
	// > because we are not able to get the redeliveryCount from the message ID.
	NackBackoffPolicy NackBackoffPolicy

	// AckWithResponse is a return value added to Ack Command, and its purpose is to confirm whether Ack Command
	// is executed correctly on the Broker side. When set to true, the error information returned by the Ack
	// method contains the return value of the Ack Command processed by the Broker side; when set to false, the
	// error information of the Ack method only contains errors that may occur in the Go SDK's own processing.
	// Default: false
	AckWithResponse bool

	// MaxPendingChunkedMessage sets the maximum pending chunked messages. (default: 100)
	MaxPendingChunkedMessage int

	// ExpireTimeOfIncompleteChunk sets the expiry time of discarding incomplete chunked message. (default: 60 seconds)
	ExpireTimeOfIncompleteChunk time.Duration

	// AutoAckIncompleteChunk sets whether consumer auto acknowledges incomplete chunked message when it should
	// be removed (e.g.the chunked message pending queue is full). (default: false)
	AutoAckIncompleteChunk bool

	// Enable or disable batch index acknowledgment. To enable this feature, ensure batch index acknowledgment
	// is enabled on the broker side. (default: false)
	EnableBatchIndexAcknowledgment bool

	// Controls how to group ACK requests, the default value is nil, which means:
	// MaxSize: 1000
	// MaxTime: 100*time.Millisecond
	// NOTE: This option does not work if AckWithResponse is true
	//	because there are only synchronous APIs for acknowledgment
	AckGroupingOptions *AckGroupingOptions

	// SubscriptionMode specifies the subscription mode to be used when subscribing to a topic.
	// Default is `Durable`
	SubscriptionMode SubscriptionMode

	// StartMessageIDInclusive, if true, the consumer will start at the `StartMessageID`, included.
	// Default is `false` and the consumer will start from the "next" message
	StartMessageIDInclusive bool
	// contains filtered or unexported fields
}

ConsumerOptions is used to configure and create instances of Consumer.

type DLQPolicy

type DLQPolicy struct {
	// MaxDeliveries specifies the maximum number of times that a message will be delivered before being
	// sent to the dead letter queue.
	MaxDeliveries uint32

	// DeadLetterTopic specifies the name of the topic where the failing messages will be sent.
	DeadLetterTopic string

	// ProducerOptions is the producer options to produce messages to the DLQ and RLQ topic
	ProducerOptions ProducerOptions

	// RetryLetterTopic specifies the name of the topic where the retry messages will be sent.
	RetryLetterTopic string

	// InitialSubscriptionName Name of the initial subscription name of the dead letter topic.
	// If this field is not set, the initial subscription for the dead letter topic will not be created.
	// If this field is set but the broker's `allowAutoSubscriptionCreation` is disabled, the DLQ producer
	// will fail to be created.
	InitialSubscriptionName string
}

DLQPolicy represents the configuration for the Dead Letter Queue consumer policy.

type DoubleSchema added in v0.3.0

type DoubleSchema struct {
	SchemaInfo
}

func NewDoubleSchema added in v0.3.0

func NewDoubleSchema(properties map[string]string) *DoubleSchema

func (*DoubleSchema) Decode added in v0.3.0

func (ds *DoubleSchema) Decode(data []byte, v interface{}) error

func (*DoubleSchema) Encode added in v0.3.0

func (ds *DoubleSchema) Encode(value interface{}) ([]byte, error)

func (*DoubleSchema) GetSchemaInfo added in v0.3.0

func (ds *DoubleSchema) GetSchemaInfo() *SchemaInfo

func (*DoubleSchema) Validate added in v0.3.0

func (ds *DoubleSchema) Validate(message []byte) error

type EncryptionContext added in v0.7.0

type EncryptionContext struct {
	Keys             map[string]EncryptionKey
	Param            []byte
	Algorithm        string
	CompressionType  CompressionType
	UncompressedSize int
	BatchSize        int
}

EncryptionContext It will be used to decrypt message outside of this client

type EncryptionKey added in v0.7.0

type EncryptionKey struct {
	KeyValue []byte
	Metadata map[string]string
}

EncryptionKey Encryption key used to encrypt the message payload

type Error

type Error struct {
	// contains filtered or unexported fields
}

Error implement error interface, composed of two parts: msg and result.

func (*Error) Error

func (e *Error) Error() string

func (*Error) Result

func (e *Error) Result() Result

Result get error's original result.

type FloatSchema added in v0.3.0

type FloatSchema struct {
	SchemaInfo
}

func NewFloatSchema added in v0.3.0

func NewFloatSchema(properties map[string]string) *FloatSchema

func (*FloatSchema) Decode added in v0.3.0

func (fs *FloatSchema) Decode(data []byte, v interface{}) error

func (*FloatSchema) Encode added in v0.3.0

func (fs *FloatSchema) Encode(value interface{}) ([]byte, error)

func (*FloatSchema) GetSchemaInfo added in v0.3.0

func (fs *FloatSchema) GetSchemaInfo() *SchemaInfo

func (*FloatSchema) Validate added in v0.3.0

func (fs *FloatSchema) Validate(message []byte) error

type HashingScheme

type HashingScheme int
const (
	// JavaStringHash and Java String.hashCode() equivalent
	JavaStringHash HashingScheme = iota
	// Murmur3_32Hash use Murmur3 hashing function
	Murmur3_32Hash
)

type Int16Schema added in v0.3.0

type Int16Schema struct {
	SchemaInfo
}

func NewInt16Schema added in v0.3.0

func NewInt16Schema(properties map[string]string) *Int16Schema

func (*Int16Schema) Decode added in v0.3.0

func (is16 *Int16Schema) Decode(data []byte, v interface{}) error

func (*Int16Schema) Encode added in v0.3.0

func (is16 *Int16Schema) Encode(value interface{}) ([]byte, error)

func (*Int16Schema) GetSchemaInfo added in v0.3.0

func (is16 *Int16Schema) GetSchemaInfo() *SchemaInfo

func (*Int16Schema) Validate added in v0.3.0

func (is16 *Int16Schema) Validate(message []byte) error

type Int32Schema added in v0.3.0

type Int32Schema struct {
	SchemaInfo
}

func NewInt32Schema added in v0.3.0

func NewInt32Schema(properties map[string]string) *Int32Schema

func (*Int32Schema) Decode added in v0.3.0

func (is32 *Int32Schema) Decode(data []byte, v interface{}) error

func (*Int32Schema) Encode added in v0.3.0

func (is32 *Int32Schema) Encode(value interface{}) ([]byte, error)

func (*Int32Schema) GetSchemaInfo added in v0.3.0

func (is32 *Int32Schema) GetSchemaInfo() *SchemaInfo

func (*Int32Schema) Validate added in v0.3.0

func (is32 *Int32Schema) Validate(message []byte) error

type Int64Schema added in v0.3.0

type Int64Schema struct {
	SchemaInfo
}

func NewInt64Schema added in v0.3.0

func NewInt64Schema(properties map[string]string) *Int64Schema

func (*Int64Schema) Decode added in v0.3.0

func (is64 *Int64Schema) Decode(data []byte, v interface{}) error

func (*Int64Schema) Encode added in v0.3.0

func (is64 *Int64Schema) Encode(value interface{}) ([]byte, error)

func (*Int64Schema) GetSchemaInfo added in v0.3.0

func (is64 *Int64Schema) GetSchemaInfo() *SchemaInfo

func (*Int64Schema) Validate added in v0.3.0

func (is64 *Int64Schema) Validate(message []byte) error

type Int8Schema added in v0.3.0

type Int8Schema struct {
	SchemaInfo
}

func NewInt8Schema added in v0.3.0

func NewInt8Schema(properties map[string]string) *Int8Schema

func (*Int8Schema) Decode added in v0.3.0

func (is8 *Int8Schema) Decode(data []byte, v interface{}) error

func (*Int8Schema) Encode added in v0.3.0

func (is8 *Int8Schema) Encode(value interface{}) ([]byte, error)

func (*Int8Schema) GetSchemaInfo added in v0.3.0

func (is8 *Int8Schema) GetSchemaInfo() *SchemaInfo

func (*Int8Schema) Validate added in v0.3.0

func (is8 *Int8Schema) Validate(message []byte) error

type JSONSchema added in v0.3.0

type JSONSchema struct {
	SchemaInfo
}

func NewJSONSchema added in v0.3.0

func NewJSONSchema(jsonAvroSchemaDef string, properties map[string]string) *JSONSchema

NewJSONSchema creates a new JSONSchema Note: the function will panic if creation of codec fails

func NewJSONSchemaWithValidation added in v0.9.0

func NewJSONSchemaWithValidation(jsonAvroSchemaDef string, properties map[string]string) (*JSONSchema, error)

NewJSONSchemaWithValidation creates a new JSONSchema and error to indicate codec failure

func (*JSONSchema) Decode added in v0.3.0

func (js *JSONSchema) Decode(data []byte, v interface{}) error

func (*JSONSchema) Encode added in v0.3.0

func (js *JSONSchema) Encode(data interface{}) ([]byte, error)

func (*JSONSchema) GetSchemaInfo added in v0.3.0

func (js *JSONSchema) GetSchemaInfo() *SchemaInfo

func (*JSONSchema) Validate added in v0.3.0

func (js *JSONSchema) Validate(message []byte) error

type KeySharedPolicy added in v0.3.0

type KeySharedPolicy struct {
	//KeySharedPolicyMode
	Mode KeySharedPolicyMode
	//HashRanges value pair list
	HashRanges []int
	// If enabled, it will relax the ordering requirement, allowing the broker to send out-of-order messages in case of
	// failures. This will make it faster for new consumers to join without being stalled by an existing slow consumer.
	AllowOutOfOrderDelivery bool
}

KeySharedPolicy for KeyShared subscription

func NewKeySharedPolicySticky added in v0.3.0

func NewKeySharedPolicySticky(hashRanges []int) (*KeySharedPolicy, error)

NewKeySharedPolicySticky construct KeySharedPolicy in Sticky mode with hashRanges formed in value pair list: [x1, x2, y1, y2, z1, z2], and must not overlap with each others

type KeySharedPolicyMode added in v0.3.0

type KeySharedPolicyMode int
const (
	// KeySharedPolicyModeAutoSplit Auto split hash range key shared policy.
	KeySharedPolicyModeAutoSplit KeySharedPolicyMode = iota
	// KeySharedPolicyModeSticky is Sticky attach topic with fixed hash range.
	KeySharedPolicyModeSticky
)

type Message

type Message interface {
	// Topic returns the topic from which this message originated from.
	Topic() string

	// ProducerName returns the name of the producer that has published the message.
	ProducerName() string

	// Properties are application defined key/value pairs that will be attached to the message.
	// Returns the properties attached to the message.
	Properties() map[string]string

	// Payload returns the payload of the message
	Payload() []byte

	// ID returns the unique message ID associated with this message.
	// The message id can be used to univocally refer to a message without having the keep the entire payload in memory.
	ID() MessageID

	// PublishTime returns the publish time of this message. The publish time is the timestamp that a client
	// publish the message.
	PublishTime() time.Time

	// EventTime returns the event time associated with this message. It is typically set by the applications via
	// `ProducerMessage.EventTime`.
	// If EventTime is 0, it means there isn't any event time associated with this message.
	EventTime() time.Time

	// Key returns the key of the message, if any
	Key() string

	// OrderingKey returns the ordering key of the message, if any
	OrderingKey() string

	// RedeliveryCount returns message redelivery count, redelivery count maintain in pulsar broker.
	// When client nack acknowledge messages,
	// broker will dispatch message again with message redelivery count in CommandMessage defined.
	//
	// Message redelivery increases monotonically in a broker, when topic switch ownership to a another broker
	// redelivery count will be recalculated.
	RedeliveryCount() uint32

	// IsReplicated determines whether the message is replicated from another cluster.
	IsReplicated() bool

	// GetReplicatedFrom returns the name of the cluster, from which the message is replicated.
	GetReplicatedFrom() string

	// GetSchemaValue returns the de-serialized value of the message, according to the configuration.
	GetSchemaValue(v interface{}) error

	//SchemaVersion get the schema version of the message, if any
	SchemaVersion() []byte

	// GetEncryptionContext returns the ecryption context of the message.
	// It will be used by the application to parse the undecrypted message.
	GetEncryptionContext() *EncryptionContext

	// Index returns index from broker entry metadata,
	// or empty if the feature is not enabled in the broker.
	Index() *uint64

	// BrokerPublishTime returns broker publish time from broker entry metadata,
	// or empty if the feature is not enabled in the broker.
	BrokerPublishTime() *time.Time
}

Message abstraction used in Pulsar

type MessageDecryptionInfo added in v0.7.0

type MessageDecryptionInfo struct {
	// KeyReader read RSA public/private key pairs
	KeyReader crypto.KeyReader

	// MessageCrypto used to encrypt and decrypt the data and session keys
	MessageCrypto crypto.MessageCrypto

	// ConsumerCryptoFailureAction action to be taken on failure of message decryption
	ConsumerCryptoFailureAction int
}

MessageDecryptionInfo encryption related fields required by the consumer to decrypt the message

type MessageID

type MessageID interface {
	// Serialize the message id into a sequence of bytes that can be stored somewhere else
	Serialize() []byte

	// LedgerID returns the message ledgerID
	LedgerID() int64

	// EntryID returns the message entryID
	EntryID() int64

	// BatchIdx returns the message batchIdx
	BatchIdx() int32

	// PartitionIdx returns the message partitionIdx
	PartitionIdx() int32

	// BatchSize returns 0 or the batch size, which must be greater than BatchIdx()
	BatchSize() int32

	// String returns message id in string format
	String() string
}

MessageID identifier for a particular message

func DeserializeMessageID

func DeserializeMessageID(data []byte) (MessageID, error)

DeserializeMessageID reconstruct a MessageID object from its serialized representation

func EarliestMessageID

func EarliestMessageID() MessageID

EarliestMessageID returns a messageID that points to the earliest message available in a topic

func LatestMessageID

func LatestMessageID() MessageID

LatestMessageID returns a messageID that points to the latest message

func NewMessageID added in v0.10.0

func NewMessageID(ledgerID int64, entryID int64, batchIdx int32, partitionIdx int32) MessageID

NewMessageID Custom Create MessageID

type MetricsCardinality added in v0.7.0

type MetricsCardinality int

MetricsCardinality represents the specificty of labels on a per-metric basis

const (
	MetricsCardinalityNone      MetricsCardinality // Do not add additional labels to metrics
	MetricsCardinalityTenant                       // Label metrics by tenant
	MetricsCardinalityNamespace                    // Label metrics by tenant and namespace
	MetricsCardinalityTopic                        // Label metrics by topic
)

type NackBackoffPolicy added in v0.8.0

type NackBackoffPolicy interface {
	// Next param redeliveryCount indicates the number of times the message was redelivered.
	// We can get the redeliveryCount from the CommandMessage.
	Next(redeliveryCount uint32) time.Duration
}

NackBackoffPolicy is a interface for custom message negativeAcked policy, users can specify a NackBackoffPolicy for a consumer.

> Notice: the consumer crashes will trigger the redelivery of the unacked message, this case will not respect the > NackBackoffPolicy, which means the message might get redelivered earlier than the delay time > from the backoff.

type Producer

type Producer interface {
	// Topic return the topic to which producer is publishing to
	Topic() string

	// Name return the producer name which could have been assigned by the system or specified by the client
	Name() string

	// Send a message
	// This call will be blocking until is successfully acknowledged by the Pulsar broker.
	// Example:
	// producer.Send(ctx, pulsar.ProducerMessage{ Payload: myPayload })
	Send(context.Context, *ProducerMessage) (MessageID, error)

	// SendAsync a message in asynchronous mode
	// This call is blocked when the `maxPendingMessages` becomes full (default: 1000)
	// The callback will report back the message being published and
	// the eventual error in publishing
	SendAsync(context.Context, *ProducerMessage, func(MessageID, *ProducerMessage, error))

	// LastSequenceID get the last sequence id that was published by this producer.
	// This represent either the automatically assigned or custom sequence id (set on the ProducerMessage) that
	// was published and acknowledged by the broker.
	// After recreating a producer with the same producer name, this will return the last message that was
	// published in the previous producer session, or -1 if there no message was ever published.
	// return the last sequence id published by this producer.
	LastSequenceID() int64

	// Deprecated: Use `FlushWithCtx()` instead.
	Flush() error

	// Flush all the messages buffered in the client and wait until all messageshave been successfully
	// persisted.
	FlushWithCtx(ctx context.Context) error

	// Close the producer and releases resources allocated
	// No more writes will be accepted from this producer. Waits until all pending write request are persisted. In case
	// of errors, pending writes will not be retried.
	Close()
}

Producer is used to publish messages on a topic

type ProducerAccessMode added in v0.10.0

type ProducerAccessMode int
const (
	// ProducerAccessModeShared is default multiple producers can publish on a topic.
	ProducerAccessModeShared ProducerAccessMode = iota

	// ProducerAccessModeExclusive is required exclusive access for producer.
	// Fail immediately if there's already a producer connected.
	ProducerAccessModeExclusive

	// ProducerAccessModeWaitForExclusive is pending until producer can acquire exclusive access.
	ProducerAccessModeWaitForExclusive
)

type ProducerEncryptionInfo added in v0.7.0

type ProducerEncryptionInfo struct {
	// KeyReader read RSA public/private key pairs
	KeyReader crypto.KeyReader

	// MessageCrypto used to encrypt and decrypt the data and session keys
	MessageCrypto crypto.MessageCrypto

	// Keys list of encryption key names to encrypt session key
	Keys []string

	// ProducerCryptoFailureAction action to be taken on failure of message encryption
	// default is ProducerCryptoFailureActionFail
	ProducerCryptoFailureAction int
}

ProducerEncryptionInfo encryption related fields required by the producer

type ProducerInterceptor added in v0.2.0

type ProducerInterceptor interface {
	// BeforeSend This is called before send the message to the brokers. This method is allowed to modify the
	// message.
	BeforeSend(producer Producer, message *ProducerMessage)

	// OnSendAcknowledgement This method is called when the message sent to the broker has been acknowledged,
	// or when sending the message fails.
	OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID)
}

type ProducerInterceptors added in v0.2.0

type ProducerInterceptors []ProducerInterceptor

func (ProducerInterceptors) BeforeSend added in v0.2.0

func (x ProducerInterceptors) BeforeSend(producer Producer, message *ProducerMessage)

func (ProducerInterceptors) OnSendAcknowledgement added in v0.2.0

func (x ProducerInterceptors) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID)

type ProducerMessage

type ProducerMessage struct {
	// Payload for the message
	Payload []byte

	// Value and payload is mutually exclusive, `Value interface{}` for schema message.
	Value interface{}

	// Key sets the key of the message for routing policy
	Key string

	// OrderingKey sets the ordering key of the message
	OrderingKey string

	// Properties attach application defined properties on the message
	Properties map[string]string

	// EventTime set the event time for a given message
	// By default, messages don't have an event time associated, while the publish
	// time will be be always present.
	// Set the event time to a non-zero timestamp to explicitly declare the time
	// that the event "happened", as opposed to when the message is being published.
	EventTime time.Time

	// ReplicationClusters override the replication clusters for this message.
	ReplicationClusters []string

	// DisableReplication disables the replication for this message
	DisableReplication bool

	// SequenceID sets the sequence id to assign to the current message
	SequenceID *int64

	// DeliverAfter requests to deliver the message only after the specified relative delay.
	// Note: messages are only delivered with delay when a consumer is consuming
	//     through a `SubscriptionType=Shared` subscription. With other subscription
	//     types, the messages will still be delivered immediately.
	DeliverAfter time.Duration

	// DeliverAt delivers the message only at or after the specified absolute timestamp.
	// Note: messages are only delivered with delay when a consumer is consuming
	//     through a `SubscriptionType=Shared` subscription. With other subscription
	//     types, the messages will still be delivered immediately.
	DeliverAt time.Time

	//Schema assign to the current message
	//Note: messages may have a different schema from producer schema, use it instead of producer schema when assigned
	Schema Schema

	//Transaction assign to the current message
	//Note: The message is not visible before the transaction is committed.
	Transaction Transaction
}

ProducerMessage abstraction used in Pulsar producer

type ProducerOptions

type ProducerOptions struct {
	// Topic specifies the topic this producer will be publishing on.
	// This argument is required when constructing the producer.
	Topic string

	// Name specifies a name for the producer.
	// If not assigned, the system will generate a globally unique name which can be access with
	// Producer.ProducerName().
	// When specifying a name, it is up to the user to ensure that, for a given topic, the producer name is unique
	// across all Pulsar's clusters. Brokers will enforce that only a single producer a given name can be publishing on
	// a topic.
	Name string

	// Properties specifies a set of application defined properties for the producer.
	// This properties will be visible in the topic stats
	Properties map[string]string

	// SendTimeout specifies the timeout for a message that has not been acknowledged by the server since sent.
	// Send and SendAsync returns an error after timeout.
	// Default is 30 seconds, negative such as -1 to disable.
	SendTimeout time.Duration

	// DisableBlockIfQueueFull controls whether Send and SendAsync block if producer's message queue is full.
	// Default is false, if set to true then Send and SendAsync return error when queue is full.
	DisableBlockIfQueueFull bool

	// MaxPendingMessages specifies the max size of the queue holding the messages pending to receive an
	// acknowledgment from the broker.
	MaxPendingMessages int

	// HashingScheme is used to define the partition on where to publish a particular message.
	// Standard hashing functions available are:
	//
	//  - `JavaStringHash` : Java String.hashCode() equivalent
	//  - `Murmur3_32Hash` : Use Murmur3 hashing function.
	// 		https://en.wikipedia.org/wiki/MurmurHash">https://en.wikipedia.org/wiki/MurmurHash
	//
	// Default is `JavaStringHash`.
	HashingScheme

	// CompressionType specifies the compression type for the producer.
	// By default, message payloads are not compressed. Supported compression types are:
	//  - LZ4
	//  - ZLIB
	//  - ZSTD
	//
	// Note: ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that
	// release in order to be able to receive messages compressed with ZSTD.
	CompressionType

	// CompressionLevel defines the desired compression level. Options:
	// - Default
	// - Faster
	// - Better
	CompressionLevel

	// MessageRouter represents a custom message routing policy by passing an implementation of MessageRouter
	// The router is a function that given a particular message and the topic metadata, returns the
	// partition index where the message should be routed to
	MessageRouter func(*ProducerMessage, TopicMetadata) int

	// DisableBatching controls whether automatic batching of messages is enabled for the producer. By default batching
	// is enabled.
	// When batching is enabled, multiple calls to Producer.sendAsync can result in a single batch to be sent to the
	// broker, leading to better throughput, especially when publishing small messages. If compression is enabled,
	// messages will be compressed at the batch level, leading to a much better compression ratio for similar headers or
	// contents.
	// When enabled default batch delay is set to 1 ms and default batch size is 1000 messages
	// Setting `DisableBatching: true` will make the producer to send messages individually
	DisableBatching bool

	// BatchingMaxPublishDelay specifies the time period within which the messages sent will be batched (default: 10ms)
	// if batch messages are enabled. If set to a non zero value, messages will be queued until this time
	// interval or until
	BatchingMaxPublishDelay time.Duration

	// BatchingMaxMessages specifies the maximum number of messages permitted in a batch. (default: 1000)
	// If set to a value greater than 1, messages will be queued until this threshold is reached or
	// BatchingMaxSize (see below) has been reached or the batch interval has elapsed.
	BatchingMaxMessages uint

	// BatchingMaxSize specifies the maximum number of bytes permitted in a batch. (default 128 KB)
	// If set to a value greater than 1, messages will be queued until this threshold is reached or
	// BatchingMaxMessages (see above) has been reached or the batch interval has elapsed.
	BatchingMaxSize uint

	// Interceptors is a chain of interceptors, These interceptors will be called at some points defined
	// in ProducerInterceptor interface
	Interceptors ProducerInterceptors

	// Schema represents the schema implementation.
	Schema Schema

	// MaxReconnectToBroker specifies the maximum retry number of reconnectToBroker. (default: ultimate)
	MaxReconnectToBroker *uint

	// BackOffPolicyFunc parameterize the following options in the reconnection logic to
	// allow users to customize the reconnection logic (minBackoff, maxBackoff and jitterPercentage)
	BackOffPolicyFunc func() backoff.Policy

	// BatcherBuilderType sets the batch builder type (default DefaultBatchBuilder)
	// This will be used to create batch container when batching is enabled.
	// Options:
	// - DefaultBatchBuilder
	// - KeyBasedBatchBuilder
	BatcherBuilderType

	// PartitionsAutoDiscoveryInterval is the time interval for the background process to discover new partitions
	// Default is 1 minute
	PartitionsAutoDiscoveryInterval time.Duration

	// Disable multiple Schame Version
	// Default false
	DisableMultiSchema bool

	// Encryption specifies the fields required to encrypt a message
	Encryption *ProducerEncryptionInfo

	// EnableChunking controls whether automatic chunking of messages is enabled for the producer. By default, chunking
	// is disabled.
	// Chunking can not be enabled when batching is enabled.
	EnableChunking bool

	// ChunkMaxMessageSize is the max size of single chunk payload.
	// It will actually only take effect if it is smaller than the maxMessageSize from the broker.
	ChunkMaxMessageSize uint

	// The type of access to the topic that the producer requires. (default ProducerAccessModeShared)
	// Options:
	// - ProducerAccessModeShared
	// - ProducerAccessModeExclusive
	ProducerAccessMode
	// contains filtered or unexported fields
}

type ProtoNativeSchema added in v0.11.0

type ProtoNativeSchema struct {
	SchemaInfo
}

func NewProtoNativeSchemaWithMessage added in v0.11.0

func NewProtoNativeSchemaWithMessage(message proto.Message, properties map[string]string) *ProtoNativeSchema

func (*ProtoNativeSchema) Decode added in v0.11.0

func (ps *ProtoNativeSchema) Decode(data []byte, v interface{}) error

func (*ProtoNativeSchema) Encode added in v0.11.0

func (ps *ProtoNativeSchema) Encode(data interface{}) ([]byte, error)

func (*ProtoNativeSchema) GetSchemaInfo added in v0.11.0

func (ps *ProtoNativeSchema) GetSchemaInfo() *SchemaInfo

func (*ProtoNativeSchema) Validate added in v0.11.0

func (ps *ProtoNativeSchema) Validate(message []byte) error

type ProtoNativeSchemaData added in v0.11.0

type ProtoNativeSchemaData struct {
	FileDescriptorSet      []byte `json:"fileDescriptorSet"`
	RootMessageTypeName    string `json:"rootMessageTypeName"`
	RootFileDescriptorName string `json:"rootFileDescriptorName"`
}

type ProtoSchema added in v0.3.0

type ProtoSchema struct {
	SchemaInfo
}

func NewProtoSchema added in v0.3.0

func NewProtoSchema(protoAvroSchemaDef string, properties map[string]string) *ProtoSchema

NewProtoSchema creates a new ProtoSchema Note: the function will panic if creation of codec fails

func NewProtoSchemaWithValidation added in v0.9.0

func NewProtoSchemaWithValidation(protoAvroSchemaDef string, properties map[string]string) (*ProtoSchema, error)

NewProtoSchemaWithValidation creates a new ProtoSchema and error to indicate codec failure

func (*ProtoSchema) Decode added in v0.3.0

func (ps *ProtoSchema) Decode(data []byte, v interface{}) error

func (*ProtoSchema) Encode added in v0.3.0

func (ps *ProtoSchema) Encode(data interface{}) ([]byte, error)

func (*ProtoSchema) GetSchemaInfo added in v0.3.0

func (ps *ProtoSchema) GetSchemaInfo() *SchemaInfo

func (*ProtoSchema) Validate added in v0.3.0

func (ps *ProtoSchema) Validate(message []byte) error

type Reader

type Reader interface {
	// Topic from which this reader is reading from
	Topic() string

	// Next reads the next message in the topic, blocking until a message is available
	Next(context.Context) (Message, error)

	// HasNext checks if there is any message available to read from the current position
	// If there is any errors, it will return false
	HasNext() bool

	// Close the reader and stop the broker to push more messages
	Close()

	// Seek resets the subscription associated with this reader to a specific message id.
	// The message id can either be a specific message or represent the first or last messages in the topic.
	//
	// Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
	//       seek() on the individual partitions.
	Seek(MessageID) error

	// SeekByTime resets the subscription associated with this reader to a specific message publish time.
	//
	// Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the seek() on
	// the individual partitions.
	//
	// @param timestamp
	//            the message publish time where to reposition the subscription
	//
	SeekByTime(time time.Time) error

	// GetLastMessageID get the last message id available for consume.
	// It only works for single topic reader. It will return an error when the reader is the multi-topic reader.
	GetLastMessageID() (MessageID, error)
}

Reader can be used to scan through all the messages currently available in a topic.

type ReaderMessage

type ReaderMessage struct {
	Reader
	Message
}

ReaderMessage packages Reader and Message as a struct to use.

type ReaderOptions

type ReaderOptions struct {
	// Topic specifies the topic this consumer will subscribe on.
	// This argument is required when constructing the reader.
	Topic string

	// Name set the reader name.
	Name string

	// Properties represents a set of application defined properties for the reader.
	// Those properties will be visible in the topic stats.
	Properties map[string]string

	// StartMessageID initial reader positioning is done by specifying a message id. The options are:
	//  * `pulsar.EarliestMessage` : Start reading from the earliest message available in the topic
	//  * `pulsar.LatestMessage` : Start reading from the end topic, only getting messages published after the
	//                           reader was created
	//  * `MessageID` : Start reading from a particular message id, the reader will position itself on that
	//                  specific position. The first message to be read will be the message next to the specified
	//                  messageID
	StartMessageID MessageID

	// StartMessageIDInclusive, if true, the reader will start at the `StartMessageID`, included.
	// Default is `false` and the reader will start from the "next" message
	StartMessageIDInclusive bool

	// MessageChannel sets a `MessageChannel` for the consumer
	// When a message is received, it will be pushed to the channel for consumption
	MessageChannel chan ReaderMessage

	// ReceiverQueueSize sets the size of the consumer receive queue.
	// The consumer receive queue controls how many messages can be accumulated by the Reader before the
	// application calls Reader.readNext(). Using a higher value could potentially increase the consumer
	// throughput at the expense of bigger memory utilization.
	// Default value is {@code 1000} messages and should be good for most use cases.
	ReceiverQueueSize int

	// SubscriptionRolePrefix sets the subscription role prefix. The default prefix is "reader".
	SubscriptionRolePrefix string

	// SubscriptionName sets the subscription name.
	// If subscriptionRolePrefix is set at the same time, this configuration will prevail
	SubscriptionName string

	// ReadCompacted, if enabled, the reader will read messages from the compacted topic rather than reading the
	// full message backlog of the topic. This means that, if the topic has been compacted, the reader will only
	// see the latest value for each key in the topic, up until the point in the topic message backlog that has
	// been compacted. Beyond that point, the messages will be sent as normal.
	//
	// ReadCompacted can only be enabled when reading from a persistent topic. Attempting to enable it on non-persistent
	// topics will lead to the reader create call throwing a PulsarClientException.
	ReadCompacted bool

	// Decryption represents the encryption related fields required by the reader to decrypt a message.
	Decryption *MessageDecryptionInfo

	// Schema represents the schema implementation.
	Schema Schema

	// BackoffPolicyFunc parameterize the following options in the reconnection logic to
	// allow users to customize the reconnection logic (minBackoff, maxBackoff and jitterPercentage)
	BackoffPolicyFunc func() backoff.Policy

	// MaxPendingChunkedMessage sets the maximum pending chunked messages. (default: 100)
	MaxPendingChunkedMessage int

	// ExpireTimeOfIncompleteChunk sets the expiry time of discarding incomplete chunked message. (default: 60 seconds)
	ExpireTimeOfIncompleteChunk time.Duration

	// AutoAckIncompleteChunk sets whether reader auto acknowledges incomplete chunked message when it should
	// be removed (e.g.the chunked message pending queue is full). (default: false)
	AutoAckIncompleteChunk bool
}

ReaderOptions represents Reader options to use.

type Result

type Result int

Result used to represent pulsar processing is an alias of type int.

const (
	// Ok means no errors
	Ok Result = iota
	// UnknownError means unknown error happened on broker
	UnknownError
	// InvalidConfiguration means invalid configuration
	InvalidConfiguration
	// TimeoutError means operation timed out
	TimeoutError
	//LookupError means broker lookup failed
	LookupError
	// ConnectError means failed to connect to broker
	ConnectError
	// ReadError means failed to read from socket
	ReadError
	// AuthenticationError means authentication failed on broker
	AuthenticationError
	// AuthorizationError client is not authorized to create producer/consumer
	AuthorizationError
	// ErrorGettingAuthenticationData client cannot find authorization data
	ErrorGettingAuthenticationData
	// BrokerMetadataError broker failed in updating metadata
	BrokerMetadataError
	// BrokerPersistenceError broker failed to persist entry
	BrokerPersistenceError
	// ChecksumError corrupt message checksum failure
	ChecksumError
	// ConsumerBusy means Exclusive consumer is already connected
	ConsumerBusy
	// NotConnectedError producer/consumer is not currently connected to broker
	NotConnectedError
	// AlreadyClosedError producer/consumer is already closed and not accepting any operation
	AlreadyClosedError
	// InvalidMessage error in publishing an already used message
	InvalidMessage
	// ConsumerNotInitialized consumer is not initialized
	ConsumerNotInitialized
	// ProducerNotInitialized producer is not initialized
	ProducerNotInitialized
	// TooManyLookupRequestException too many concurrent LookupRequest
	TooManyLookupRequestException
	// InvalidTopicName means invalid topic name
	InvalidTopicName
	// InvalidURL means Client Initialized with Invalid Broker Url (VIP Url passed to Client Constructor)
	InvalidURL
	// ServiceUnitNotReady unloaded between client did lookup and producer/consumer got created
	ServiceUnitNotReady
	// OperationNotSupported operation not supported
	OperationNotSupported
	// ProducerBlockedQuotaExceededError producer is blocked
	ProducerBlockedQuotaExceededError
	// ProducerBlockedQuotaExceededException producer is getting exception
	ProducerBlockedQuotaExceededException
	// ProducerQueueIsFull producer queue is full
	ProducerQueueIsFull
	// MessageTooBig trying to send a messages exceeding the max size
	MessageTooBig
	// TopicNotFound topic not found
	TopicNotFound
	// SubscriptionNotFound subscription not found
	SubscriptionNotFound
	// ConsumerNotFound consumer not found
	ConsumerNotFound
	// UnsupportedVersionError when an older client/version doesn't support a required feature
	UnsupportedVersionError
	// TopicTerminated topic was already terminated
	TopicTerminated
	// CryptoError error when crypto operation fails
	CryptoError
	// ConsumerClosed means consumer already been closed
	ConsumerClosed
	// InvalidBatchBuilderType invalid batch builder type
	InvalidBatchBuilderType
	// AddToBatchFailed failed to add sendRequest to batchBuilder
	AddToBatchFailed
	// SeekFailed seek failed
	SeekFailed
	// ProducerClosed means producer already been closed
	ProducerClosed
	// SchemaFailure means the payload could not be encoded using the Schema
	SchemaFailure
	// InvalidStatus means the component status is not as expected.
	InvalidStatus
	// TransactionNoFoundError The transaction is not exist in the transaction coordinator, It may be an error txn
	// or already ended.
	TransactionNoFoundError
	// ClientMemoryBufferIsFull client limit buffer is full
	ClientMemoryBufferIsFull
	// ProducerFenced When a producer asks and fail to get exclusive producer access,
	// or loses the exclusive status after a reconnection, the broker will
	// use this error to indicate that this producer is now permanently
	// fenced. Applications are now supposed to close it and create a
	// new producer
	ProducerFenced
	// MaxConcurrentOperationsReached indicates that the maximum number of concurrent operations
	// has been reached. This means that no additional operations can be started until some
	// of the current operations complete.
	MaxConcurrentOperationsReached
	// TransactionCoordinatorNotEnabled indicates that the transaction coordinator is not enabled.
	// This error is returned when an operation that requires the transaction coordinator is attempted
	// but the transaction coordinator feature is not enabled in the system or the transaction coordinator
	// has not initialized
	TransactionCoordinatorNotEnabled
)

type RetryMessage added in v0.3.0

type RetryMessage struct {
	// contains filtered or unexported fields
}

type Schema added in v0.3.0

type Schema interface {
	Encode(v interface{}) ([]byte, error)
	Decode(data []byte, v interface{}) error
	Validate(message []byte) error
	GetSchemaInfo() *SchemaInfo
}

func NewSchema added in v0.9.0

func NewSchema(schemaType SchemaType, schemaData []byte, properties map[string]string) (schema Schema, err error)

type SchemaInfo added in v0.3.0

type SchemaInfo struct {
	Name       string
	Schema     string
	Type       SchemaType
	Properties map[string]string
	// contains filtered or unexported fields
}

Encapsulates data around the schema definition

type SchemaType added in v0.3.0

type SchemaType int
const (
	NONE     SchemaType = iota //No schema defined
	STRING                     //Simple String encoding with UTF-8
	JSON                       //JSON object encoding and validation
	PROTOBUF                   //Protobuf message encoding and decoding
	AVRO                       //Serialize and deserialize via Avro
	BOOLEAN                    //
	INT8                       //A 8-byte integer.
	INT16                      //A 16-byte integer.
	INT32                      //A 32-byte integer.
	INT64                      //A 64-byte integer.
	FLOAT                      //A float number.
	DOUBLE                     //A double number

	KeyValue         //A Schema that contains Key Schema and Value Schema.
	BYTES       = 0  //A bytes array.
	AUTO        = -2 //
	AutoConsume = -3 //Auto Consume Type.
	AutoPublish = -4 // Auto Publish Type.
	ProtoNative = 20 //Protobuf native message encoding and decoding
)

type StringSchema added in v0.3.0

type StringSchema struct {
	SchemaInfo
}

func NewStringSchema added in v0.3.0

func NewStringSchema(properties map[string]string) *StringSchema

func (*StringSchema) Decode added in v0.3.0

func (ss *StringSchema) Decode(data []byte, v interface{}) error

Decode convert from byte slice to string without allocating a new string

func (*StringSchema) Encode added in v0.3.0

func (ss *StringSchema) Encode(v interface{}) ([]byte, error)

func (*StringSchema) GetSchemaInfo added in v0.3.0

func (ss *StringSchema) GetSchemaInfo() *SchemaInfo

func (*StringSchema) Validate added in v0.3.0

func (ss *StringSchema) Validate(message []byte) error

type SubscriptionInitialPosition

type SubscriptionInitialPosition int
const (
	// SubscriptionPositionLatest is the latest position which means the start consuming position
	// will be the last message
	SubscriptionPositionLatest SubscriptionInitialPosition = iota

	// SubscriptionPositionEarliest is the earliest position which means the start consuming position
	// will be the first message
	SubscriptionPositionEarliest
)

type SubscriptionMode added in v0.11.0

type SubscriptionMode int
const (
	// Make the subscription to be backed by a durable cursor that will retain messages and persist the current
	// position
	Durable SubscriptionMode = iota

	// Lightweight subscription mode that doesn't have a durable cursor associated
	NonDurable
)

type SubscriptionType

type SubscriptionType int

SubscriptionType of subscription supported by Pulsar

const (
	// Exclusive there can be only 1 consumer on the same topic with the same subscription name
	Exclusive SubscriptionType = iota

	// Shared subscription mode, multiple consumer will be able to use the same subscription name
	// and the messages will be dispatched according to
	// a round-robin rotation between the connected consumers
	Shared

	// Failover subscription mode, multiple consumer will be able to use the same subscription name
	// but only 1 consumer will receive the messages.
	// If that consumer disconnects, one of the other connected consumers will start receiving messages.
	Failover

	// KeyShared subscription mode, multiple consumer will be able to use the same
	// subscription and all messages with the same key will be dispatched to only one consumer
	KeyShared
)

type TableView added in v0.9.0

type TableView interface {
	// Size returns the number of key-value mappings in the TableView.
	Size() int

	// IsEmpty returns true if this TableView contains no key-value mappings.
	IsEmpty() bool

	// ContainsKey returns true if this TableView contains a mapping for the specified key.
	ContainsKey(key string) bool

	// Get returns the value to which the specified key is mapped, or nil if this map contains no mapping for the key.
	Get(key string) interface{}

	// Entries returns a map view of the mappings contained in this TableView.
	Entries() map[string]interface{}

	// Keys returns a slice of the keys contained in this TableView.
	Keys() []string

	// ForEach performs the given action for each entry in this map until all entries have been processed or the action
	// returns an error.
	ForEach(func(string, interface{}) error) error

	// ForEachAndListen performs the given action for each entry in this map until all entries have been processed or
	// the action returns an error.  The given action will then be performed on each new entry in this map.
	ForEachAndListen(func(string, interface{}) error) error

	// Close closes the table view and releases resources allocated.
	Close()
}

TableView provides a key-value map view of a compacted topic. Messages without keys will be ignored.

type TableViewImpl added in v0.9.0

type TableViewImpl struct {
	// contains filtered or unexported fields
}

func (*TableViewImpl) Close added in v0.9.0

func (tv *TableViewImpl) Close()

func (*TableViewImpl) ContainsKey added in v0.9.0

func (tv *TableViewImpl) ContainsKey(key string) bool

func (*TableViewImpl) Entries added in v0.9.0

func (tv *TableViewImpl) Entries() map[string]interface{}

func (*TableViewImpl) ForEach added in v0.9.0

func (tv *TableViewImpl) ForEach(action func(string, interface{}) error) error

func (*TableViewImpl) ForEachAndListen added in v0.9.0

func (tv *TableViewImpl) ForEachAndListen(action func(string, interface{}) error) error

func (*TableViewImpl) Get added in v0.9.0

func (tv *TableViewImpl) Get(key string) interface{}

func (*TableViewImpl) IsEmpty added in v0.9.0

func (tv *TableViewImpl) IsEmpty() bool

func (*TableViewImpl) Keys added in v0.9.0

func (tv *TableViewImpl) Keys() []string

func (*TableViewImpl) Size added in v0.9.0

func (tv *TableViewImpl) Size() int

type TableViewOptions added in v0.9.0

type TableViewOptions struct {
	// Topic specifies the topic this table view will subscribe on.
	// This argument is required when constructing the table view.
	Topic string

	// Set the interval of updating partitions. Default to 1 minute.
	AutoUpdatePartitionsInterval time.Duration

	// Schema represents the schema implementation.
	Schema Schema

	// SchemaValueType represents the type of values for the given schema.
	SchemaValueType reflect.Type

	// Configure the logger used by the TableView.
	// By default, a wrapped logrus.StandardLogger will be used, namely,
	// log.NewLoggerWithLogrus(logrus.StandardLogger())
	Logger log.Logger
}

TableViewOptions contains the options for creating a TableView

type TopicMessageID added in v0.13.0

type TopicMessageID interface {
	MessageID
	Topic() string
}

TopicMessageID defined the correspondence between topic and MessageID.

type TopicMetadata

type TopicMetadata interface {
	// NumPartitions returns the number of partitions for a particular topic.
	NumPartitions() uint32
}

TopicMetadata represents a topic metadata.

type Transaction added in v0.10.0

type Transaction interface {
	//Commit You can commit the transaction after all the sending/acknowledging operations with the transaction success.
	Commit(context.Context) error
	//Abort You can abort the transaction when you want to abort all the sending/acknowledging operations
	// with the transaction.
	Abort(context.Context) error
	//GetState Get the state of the transaction.
	GetState() TxnState
	//GetTxnID Get the identified ID of the transaction.
	GetTxnID() TxnID
}

Transaction used to guarantee exactly-once

type TxnID added in v0.10.0

type TxnID struct {
	// MostSigBits The most significant 64 bits of this TxnID.
	MostSigBits uint64
	// LeastSigBits The least significant 64 bits of this TxnID.
	LeastSigBits uint64
}

TxnID An identifier for representing a transaction.

type TxnState added in v0.10.0

type TxnState int32

TxnState The state of the transaction. Check the state of the transaction before executing some operation with the transaction is necessary.

const (

	// TxnOpen The transaction in TxnOpen state can be used to send/ack messages.
	TxnOpen TxnState
	// TxnCommitting The state of the transaction will be TxnCommitting after the commit method is called.
	// The transaction in TxnCommitting state can be committed again.
	TxnCommitting
	// TxnAborting The state of the transaction will be TxnAborting after the abort method is called.
	// The transaction in TxnAborting state can be aborted again.
	TxnAborting
	// TxnCommitted The state of the transaction will be TxnCommitted after the commit method is executed success.
	// This means that all the operations with the transaction are success.
	TxnCommitted
	// TxnAborted The state of the transaction will be TxnAborted after the abort method is executed success.
	// This means that all the operations with the transaction are aborted.
	TxnAborted
	// TxnError The state of the transaction will be TxnError after the operation of transaction get a non-retryable error.
	TxnError
	// TxnTimeout The state of the transaction will be TxnTimeout after the transaction timeout.
	TxnTimeout
)

Directories

Path Synopsis
Package log defines the logger interfaces used by pulsar client.
Package log defines the logger interfaces used by pulsar client.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL