pulsar

package
v0.0.0-...-2054841 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 27, 2019 License: Apache-2.0, BSD-2-Clause, BSD-3-Clause, + 1 more Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const (
	IoMaxSize = 1024
)

Variables

This section is empty.

Functions

func ReadElements

func ReadElements(r io.Reader, elements ...interface{}) error

func WriteElements

func WriteElements(w io.Writer, elements ...interface{}) error

Types

type Authentication

type Authentication interface{}

Opaque interface that represents the authentication credentials

func NewAuthenticationAthenz

func NewAuthenticationAthenz(authParams string) Authentication

Create new Athenz Authentication provider with configuration in JSON form

func NewAuthenticationTLS

func NewAuthenticationTLS(certificatePath string, privateKeyPath string) Authentication

Create new Authentication provider with specified TLS certificate and private key

func NewAuthenticationToken

func NewAuthenticationToken(token string) Authentication

Create new Authentication provider with specified auth token

func NewAuthenticationTokenSupplier

func NewAuthenticationTokenSupplier(tokenSupplier func() string) Authentication

Create new Authentication provider with specified auth token supplier

type AvroCodec

type AvroCodec struct {
	Codec *goavro.Codec
}

func NewSchemaDefinition

func NewSchemaDefinition(schema *goavro.Codec) *AvroCodec

type AvroSchema

type AvroSchema struct {
	AvroCodec
	SchemaInfo
}

func NewAvroSchema

func NewAvroSchema(avroSchemaDef string, properties map[string]string) *AvroSchema

func (*AvroSchema) Decode

func (as *AvroSchema) Decode(data []byte, v interface{}) error

func (*AvroSchema) Encode

func (as *AvroSchema) Encode(data interface{}) ([]byte, error)

func (*AvroSchema) GetSchemaInfo

func (as *AvroSchema) GetSchemaInfo() *SchemaInfo

func (*AvroSchema) Validate

func (as *AvroSchema) Validate(message []byte) error

type BinaryFreeList

type BinaryFreeList chan []byte
var BinarySerializer BinaryFreeList = make(chan []byte, IoMaxSize)

func (BinaryFreeList) Borrow

func (b BinaryFreeList) Borrow() (buf []byte)

func (BinaryFreeList) Float32

func (b BinaryFreeList) Float32(buf []byte) (float32, error)

func (BinaryFreeList) Float64

func (b BinaryFreeList) Float64(buf []byte) (float64, error)

func (BinaryFreeList) PutDouble

func (b BinaryFreeList) PutDouble(datum interface{}) ([]byte, error)

func (BinaryFreeList) PutFloat

func (b BinaryFreeList) PutFloat(datum interface{}) ([]byte, error)

func (BinaryFreeList) PutUint16

func (b BinaryFreeList) PutUint16(w io.Writer, byteOrder binary.ByteOrder, val uint16) error

func (BinaryFreeList) PutUint32

func (b BinaryFreeList) PutUint32(w io.Writer, byteOrder binary.ByteOrder, val uint32) error

func (BinaryFreeList) PutUint64

func (b BinaryFreeList) PutUint64(w io.Writer, byteOrder binary.ByteOrder, val uint64) error

func (BinaryFreeList) PutUint8

func (b BinaryFreeList) PutUint8(w io.Writer, val uint8) error

func (BinaryFreeList) Return

func (b BinaryFreeList) Return(buf []byte)

func (BinaryFreeList) Uint16

func (b BinaryFreeList) Uint16(r io.Reader, byteOrder binary.ByteOrder) (uint16, error)

func (BinaryFreeList) Uint32

func (b BinaryFreeList) Uint32(r io.Reader, byteOrder binary.ByteOrder) (uint32, error)

func (BinaryFreeList) Uint64

func (b BinaryFreeList) Uint64(r io.Reader, byteOrder binary.ByteOrder) (uint64, error)

func (BinaryFreeList) Uint8

func (b BinaryFreeList) Uint8(r io.Reader) (uint8, error)

type BytesSchema

type BytesSchema struct {
	SchemaInfo
}

func NewBytesSchema

func NewBytesSchema(properties map[string]string) *BytesSchema

func (*BytesSchema) Decode

func (bs *BytesSchema) Decode(data []byte, v interface{}) error

func (*BytesSchema) Encode

func (bs *BytesSchema) Encode(data interface{}) ([]byte, error)

func (*BytesSchema) GetSchemaInfo

func (bs *BytesSchema) GetSchemaInfo() *SchemaInfo

func (*BytesSchema) Validate

func (bs *BytesSchema) Validate(message []byte) error

type Client

type Client interface {
	// Create the producer instance
	// This method will block until the producer is created successfully
	CreateProducer(ProducerOptions) (Producer, error)

	CreateProducerWithSchema(ProducerOptions, Schema) (Producer, error)

	// Create a `Consumer` by subscribing to a topic.
	//
	// If the subscription does not exist, a new subscription will be created and all messages published after the
	// creation will be retained until acknowledged, even if the consumer is not connected
	Subscribe(ConsumerOptions) (Consumer, error)

	SubscribeWithSchema(ConsumerOptions, Schema) (Consumer, error)

	// Create a Reader instance.
	// This method will block until the reader is created successfully.
	CreateReader(ReaderOptions) (Reader, error)

	CreateReaderWithSchema(ReaderOptions, Schema) (Reader, error)

	// Fetch the list of partitions for a given topic
	//
	// If the topic is partitioned, this will return a list of partition names.
	// If the topic is not partitioned, the returned list will contain the topic
	// name itself.
	//
	// This can be used to discover the partitions and create {@link Reader},
	// {@link Consumer} or {@link Producer} instances directly on a particular partition.
	TopicPartitions(topic string) ([]string, error)

	// Close the Client and free associated resources
	Close() error
}

func NewClient

func NewClient(options ClientOptions) (Client, error)

type ClientOptions

type ClientOptions struct {
	// Configure the service URL for the Pulsar service.
	// This parameter is required
	URL string

	// Number of threads to be used for handling connections to brokers (default: 1 thread)
	IOThreads int

	// Set the operation timeout (default: 30 seconds)
	// Producer-create, subscribe and unsubscribe operations will be retried until this interval, after which the
	// operation will be marked as failed
	OperationTimeoutSeconds time.Duration

	// Set the number of threads to be used for message listeners (default: 1 thread)
	MessageListenerThreads int

	// Number of concurrent lookup-requests allowed to send on each broker-connection to prevent overload on broker.
	// (default: 5000) It should be configured with higher value only in case of it requires to produce/subscribe
	// on thousands of topic using created Pulsar Client
	ConcurrentLookupRequests int

	// Provide a custom logger implementation where all Pulsar library info/warn/error messages will be routed
	// By default, log messages will be printed on standard output. By passing a logger function, application
	// can determine how to print logs. This function will be called each time the Pulsar client library wants
	// to write any logs.
	Logger func(level log.LoggerLevel, file string, line int, message string)

	// Set the path to the trusted TLS certificate file
	TLSTrustCertsFilePath string

	// Configure whether the Pulsar client accept untrusted TLS certificate from broker (default: false)
	TLSAllowInsecureConnection bool

	// Configure whether the Pulsar client verify the validity of the host name from broker (default: false)
	TLSValidateHostname bool

	// Configure the authentication provider. (default: no authentication)
	// Example: `Authentication: NewAuthenticationTLS("my-cert.pem", "my-key.pem")`
	Authentication

	// Set the interval between each stat info (default: 60 seconds). Stats will be activated with positive
	// statsIntervalSeconds It should be set to at least 1 second
	StatsIntervalInSeconds int
}

Builder interface that is used to construct a Pulsar Client instance.

type CompressionType

type CompressionType int
const (
	NoCompression CompressionType = iota
	LZ4
	ZLib
	ZSTD
	SNAPPY
)

type Consumer

type Consumer interface {
	// Get the topic for the consumer
	Topic() string

	// Get a subscription for the consumer
	Subscription() string

	// Unsubscribe the consumer
	Unsubscribe() error

	// Receives a single message.
	// This calls blocks until a message is available.
	Receive(context.Context) (Message, error)

	//Ack the consumption of a single message
	Ack(Message) error

	// Ack the consumption of a single message, identified by its MessageID
	AckID(MessageID) error

	// Ack the reception of all the messages in the stream up to (and including) the provided message.
	// This method will block until the acknowledge has been sent to the broker. After that, the messages will not be
	// re-delivered to this consumer.
	//
	// Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared.
	//
	// It's equivalent to calling asyncAcknowledgeCumulative(Message) and waiting for the callback to be triggered.
	AckCumulative(Message) error

	// Ack the reception of all the messages in the stream up to (and including) the provided message.
	// This method will block until the acknowledge has been sent to the broker. After that, the messages will not be
	// re-delivered to this consumer.
	//
	// Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared.
	//
	// It's equivalent to calling asyncAcknowledgeCumulative(MessageID) and waiting for the callback to be triggered.
	AckCumulativeID(MessageID) error

	// Acknowledge the failure to process a single message.
	//
	// When a message is "negatively acked" it will be marked for redelivery after
	// some fixed delay. The delay is configurable when constructing the consumer
	// with ConsumerOptions.NAckRedeliveryDelay .
	//
	// This call is not blocking.
	Nack(Message) error

	// Acknowledge the failure to process a single message.
	//
	// When a message is "negatively acked" it will be marked for redelivery after
	// some fixed delay. The delay is configurable when constructing the consumer
	// with ConsumerOptions.NackRedeliveryDelay .
	//
	// This call is not blocking.
	NackID(MessageID) error

	// Close the consumer and stop the broker to push more messages
	Close() error

	// Reset the subscription associated with this consumer to a specific message id.
	// The message id can either be a specific message or represent the first or last messages in the topic.
	//
	// Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
	//       seek() on the individual partitions.
	Seek(msgID MessageID) error

	// Redelivers all the unacknowledged messages. In Failover mode, the request is ignored if the consumer is not
	// active for the given topic. In Shared mode, the consumers messages to be redelivered are distributed across all
	// the connected consumers. This is a non blocking call and doesn't throw an exception. In case the connection
	// breaks, the messages are redelivered after reconnect.
	RedeliverUnackedMessages()

	Schema() Schema
}

An interface that abstracts behavior of Pulsar's consumer

type ConsumerMessage

type ConsumerMessage struct {
	Consumer
	Message
}

Pair of a Consumer and Message

type ConsumerOptions

type ConsumerOptions struct {
	// Specify the topic this consumer will subscribe on.
	// Either a topic, a list of topics or a topics pattern are required when subscribing
	Topic string

	// Specify a list of topics this consumer will subscribe on.
	// Either a topic, a list of topics or a topics pattern are required when subscribing
	Topics []string

	// Specify a regular expression to subscribe to multiple topics under the same namespace.
	// Either a topic, a list of topics or a topics pattern are required when subscribing
	TopicsPattern string

	// Specify the subscription name for this consumer
	// This argument is required when subscribing
	SubscriptionName string

	// Attach a set of application defined properties to the consumer
	// This properties will be visible in the topic stats
	Properties map[string]string

	// Set the timeout for unacked messages
	// Message not acknowledged within the give time, will be replayed by the broker to the same or a different consumer
	// Default is 0, which means message are not being replayed based on ack time
	AckTimeout time.Duration

	// The delay after which to redeliver the messages that failed to be
	// processed. Default is 1min. (See `Consumer.Nack()`)
	NackRedeliveryDelay *time.Duration

	// Select the subscription type to be used when subscribing to the topic.
	// Default is `Exclusive`
	Type SubscriptionType

	// InitialPosition at which the cursor will be set when subscribe
	// Default is `Latest`
	SubscriptionInitPos InitialPosition

	// Sets a `MessageChannel` for the consumer
	// When a message is received, it will be pushed to the channel for consumption
	MessageChannel chan ConsumerMessage

	// Sets the size of the consumer receive queue.
	// The consumer receive queue controls how many messages can be accumulated by the `Consumer` before the
	// application calls `Consumer.receive()`. Using a higher value could potentially increase the consumer
	// throughput at the expense of bigger memory utilization.
	// Default value is `1000` messages and should be good for most use cases.
	// Set to -1 to disable prefetching in consumer
	ReceiverQueueSize int

	// Set the max total receiver queue size across partitions.
	// This setting will be used to reduce the receiver queue size for individual partitions
	// ReceiverQueueSize(int) if the total exceeds this value (default: 50000).
	MaxTotalReceiverQueueSizeAcrossPartitions int

	// Set the consumer name.
	Name string

	// If enabled, the consumer will read messages from the compacted topic rather than reading the full message backlog
	// of the topic. This means that, if the topic has been compacted, the consumer will only see the latest value for
	// each key in the topic, up until the point in the topic message backlog that has been compacted. Beyond that
	// point, the messages will be sent as normal.
	//
	// ReadCompacted can only be enabled subscriptions to persistent topics, which have a single active consumer (i.e.
	//  failure or exclusive subscriptions). Attempting to enable it on subscriptions to a non-persistent topics or on a
	//  shared subscription, will lead to the subscription call throwing a PulsarClientException.
	ReadCompacted bool

	Schema
}

ConsumerBuilder is used to configure and create instances of Consumer

type DoubleSchema

type DoubleSchema struct {
	SchemaInfo
}

func NewDoubleSchema

func NewDoubleSchema(properties map[string]string) *DoubleSchema

func (*DoubleSchema) Decode

func (ds *DoubleSchema) Decode(data []byte, v interface{}) error

func (*DoubleSchema) Encode

func (ds *DoubleSchema) Encode(value interface{}) ([]byte, error)

func (*DoubleSchema) GetSchemaInfo

func (ds *DoubleSchema) GetSchemaInfo() *SchemaInfo

func (*DoubleSchema) Validate

func (ds *DoubleSchema) Validate(message []byte) error

type Error

type Error struct {
	// contains filtered or unexported fields
}

func (*Error) Error

func (e *Error) Error() string

func (*Error) Result

func (e *Error) Result() Result

type FloatSchema

type FloatSchema struct {
	SchemaInfo
}

func NewFloatSchema

func NewFloatSchema(properties map[string]string) *FloatSchema

func (*FloatSchema) Decode

func (fs *FloatSchema) Decode(data []byte, v interface{}) error

func (*FloatSchema) Encode

func (fs *FloatSchema) Encode(value interface{}) ([]byte, error)

func (*FloatSchema) GetSchemaInfo

func (fs *FloatSchema) GetSchemaInfo() *SchemaInfo

func (*FloatSchema) Validate

func (fs *FloatSchema) Validate(message []byte) error

type HashingScheme

type HashingScheme int
const (
	JavaStringHash HashingScheme = iota // Java String.hashCode() equivalent
	Murmur3_32Hash                      // Use Murmur3 hashing function
	BoostHash                           // C++ based boost::hash
)

type InitialPosition

type InitialPosition int
const (
	// Latest position which means the start consuming position will be the last message
	Latest InitialPosition = iota

	// Earliest position which means the start consuming position will be the first message
	Earliest
)

type Int16Schema

type Int16Schema struct {
	SchemaInfo
}

func NewInt16Schema

func NewInt16Schema(properties map[string]string) *Int16Schema

func (*Int16Schema) Decode

func (is16 *Int16Schema) Decode(data []byte, v interface{}) error

func (*Int16Schema) Encode

func (is16 *Int16Schema) Encode(value interface{}) ([]byte, error)

func (*Int16Schema) GetSchemaInfo

func (is16 *Int16Schema) GetSchemaInfo() *SchemaInfo

func (*Int16Schema) Validate

func (is16 *Int16Schema) Validate(message []byte) error

type Int32Schema

type Int32Schema struct {
	SchemaInfo
}

func NewInt32Schema

func NewInt32Schema(properties map[string]string) *Int32Schema

func (*Int32Schema) Decode

func (is32 *Int32Schema) Decode(data []byte, v interface{}) error

func (*Int32Schema) Encode

func (is32 *Int32Schema) Encode(value interface{}) ([]byte, error)

func (*Int32Schema) GetSchemaInfo

func (is32 *Int32Schema) GetSchemaInfo() *SchemaInfo

func (*Int32Schema) Validate

func (is32 *Int32Schema) Validate(message []byte) error

type Int64Schema

type Int64Schema struct {
	SchemaInfo
}

func NewInt64Schema

func NewInt64Schema(properties map[string]string) *Int64Schema

func (*Int64Schema) Decode

func (is64 *Int64Schema) Decode(data []byte, v interface{}) error

func (*Int64Schema) Encode

func (is64 *Int64Schema) Encode(value interface{}) ([]byte, error)

func (*Int64Schema) GetSchemaInfo

func (is64 *Int64Schema) GetSchemaInfo() *SchemaInfo

func (*Int64Schema) Validate

func (is64 *Int64Schema) Validate(message []byte) error

type Int8Schema

type Int8Schema struct {
	SchemaInfo
}

func NewInt8Schema

func NewInt8Schema(properties map[string]string) *Int8Schema

func (*Int8Schema) Decode

func (is8 *Int8Schema) Decode(data []byte, v interface{}) error

func (*Int8Schema) Encode

func (is8 *Int8Schema) Encode(value interface{}) ([]byte, error)

func (*Int8Schema) GetSchemaInfo

func (is8 *Int8Schema) GetSchemaInfo() *SchemaInfo

func (*Int8Schema) Validate

func (is8 *Int8Schema) Validate(message []byte) error

type JsonSchema

type JsonSchema struct {
	AvroCodec
	SchemaInfo
}

func NewJsonSchema

func NewJsonSchema(jsonAvroSchemaDef string, properties map[string]string) *JsonSchema

func (*JsonSchema) Decode

func (js *JsonSchema) Decode(data []byte, v interface{}) error

func (*JsonSchema) Encode

func (js *JsonSchema) Encode(data interface{}) ([]byte, error)

func (*JsonSchema) GetSchemaInfo

func (js *JsonSchema) GetSchemaInfo() *SchemaInfo

func (*JsonSchema) Validate

func (js *JsonSchema) Validate(message []byte) error

type Message

type Message interface {
	// Get the topic from which this message originated from
	Topic() string

	// Return the properties attached to the message.
	// Properties are application defined key/value pairs that will be attached to the message
	Properties() map[string]string

	// Get the payload of the message
	Payload() []byte

	// Get the unique message ID associated with this message.
	// The message id can be used to univocally refer to a message without having the keep the entire payload in memory.
	ID() MessageID

	// Get the publish time of this message. The publish time is the timestamp that a client publish the message.
	PublishTime() time.Time

	// Get the event time associated with this message. It is typically set by the applications via
	// `ProducerMessage.EventTime`.
	// If there isn't any event time associated with this event, it will be nil.
	EventTime() *time.Time

	// Get the key of the message, if any
	Key() string

	//Get the de-serialized value of the message, according the configured
	GetValue(v interface{}) error
}

type MessageID

type MessageID interface {
	// Serialize the message id into a sequence of bytes that can be stored somewhere else
	Serialize() []byte
}

Identifier for a particular message

var (
	// MessageID that points to the earliest message avaialable in a topic
	EarliestMessage MessageID = earliestMessageID()

	// MessageID that points to the latest message
	LatestMessage MessageID = latestMessageID()
)

func DeserializeMessageID

func DeserializeMessageID(data []byte) MessageID

Reconstruct a MessageID object from its serialized representation

type MessageRoutingMode

type MessageRoutingMode int
const (
	// Publish messages across all partitions in round-robin.
	RoundRobinDistribution MessageRoutingMode = iota

	// The producer will chose one single partition and publish all the messages into that partition
	UseSinglePartition

	// Use custom message router implementation that will be called to determine the partition for a particular message.
	CustomPartition
)

type Producer

type Producer interface {
	// return the topic to which producer is publishing to
	Topic() string

	// return the producer name which could have been assigned by the system or specified by the client
	Name() string

	// Send a message
	// This call will be blocking until is successfully acknowledged by the Pulsar broker.
	// Example:
	// producer.Send(ctx, pulsar.ProducerMessage{ Payload: myPayload })
	Send(context.Context, ProducerMessage) error

	// Send a message in asynchronous mode
	// The callback will report back the message being published and
	// the eventual error in publishing
	SendAsync(context.Context, ProducerMessage, func(ProducerMessage, error))

	// Get the last sequence id that was published by this producer.
	// This represent either the automatically assigned or custom sequence id (set on the ProducerMessage) that
	// was published and acknowledged by the broker.
	// After recreating a producer with the same producer name, this will return the last message that was
	// published in the previous producer session, or -1 if there no message was ever published.
	// return the last sequence id published by this producer.
	LastSequenceID() int64

	// Flush all the messages buffered in the client and wait until all messages have been successfully
	// persisted.
	Flush() error

	// Close the producer and releases resources allocated
	// No more writes will be accepted from this producer. Waits until all pending write request are persisted. In case
	// of errors, pending writes will not be retried.
	Close() error

	Schema() Schema
}

The producer is used to publish messages on a topic

type ProducerMessage

type ProducerMessage struct {
	// Payload for the message
	Payload []byte

	//Value and payload is mutually exclusive, `Value interface{}` for schema message.
	Value interface{}

	// Sets the key of the message for routing policy
	Key string

	// Attach application defined properties on the message
	Properties map[string]string

	// Set the event time for a given message
	EventTime time.Time

	// Override the replication clusters for this message.
	ReplicationClusters []string

	// Set the sequence id to assign to the current message
	SequenceID int64
}

type ProducerOptions

type ProducerOptions struct {
	// Specify the topic this producer will be publishing on.
	// This argument is required when constructing the producer.
	Topic string

	// Specify a name for the producer
	// If not assigned, the system will generate a globally unique name which can be access with
	// Producer.ProducerName().
	// When specifying a name, it is up to the user to ensure that, for a given topic, the producer name is unique
	// across all Pulsar's clusters. Brokers will enforce that only a single producer a given name can be publishing on
	// a topic.
	Name string

	// Attach a set of application defined properties to the producer
	// This properties will be visible in the topic stats
	Properties map[string]string

	// Set the send timeout (default: 30 seconds)
	// If a message is not acknowledged by the server before the sendTimeout expires, an error will be reported.
	// Setting the timeout to -1, will set the timeout to infinity, which can be useful when using Pulsar's message
	// deduplication feature.
	SendTimeout time.Duration

	// Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker.
	// When the queue is full, by default, all calls to Producer.send() and Producer.sendAsync() will fail
	// unless `BlockIfQueueFull` is set to true. Use BlockIfQueueFull(boolean) to change the blocking behavior.
	MaxPendingMessages int

	// Set the number of max pending messages across all the partitions
	// This setting will be used to lower the max pending messages for each partition
	// `MaxPendingMessages(int)`, if the total exceeds the configured value.
	MaxPendingMessagesAcrossPartitions int

	// Set whether the `Producer.Send()` and `Producer.sendAsync()` operations should block when the outgoing
	// message queue is full. Default is `false`. If set to `false`, send operations will immediately fail with
	// `ProducerQueueIsFullError` when there is no space left in pending queue.
	BlockIfQueueFull bool

	// Set the message routing mode for the partitioned producer.
	// Default routing mode is round-robin routing.
	//
	// This logic is applied when the application is not setting a key ProducerMessage#setKey(String) on a
	// particular message.
	MessageRoutingMode

	// Change the `HashingScheme` used to chose the partition on where to publish a particular message.
	// Standard hashing functions available are:
	//
	//  - `JavaStringHash` : Java String.hashCode() equivalent
	//  - `Murmur3_32Hash` : Use Murmur3 hashing function.
	// 		https://en.wikipedia.org/wiki/MurmurHash">https://en.wikipedia.org/wiki/MurmurHash
	//  - `BoostHash`      : C++ based boost::hash
	//
	// Default is `JavaStringHash`.
	HashingScheme

	// Set the compression type for the producer.
	// By default, message payloads are not compressed. Supported compression types are:
	//  - LZ4
	//  - ZLIB
	//  - ZSTD
	//  - SNAPPY
	//
	// Note: ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that
	// release in order to be able to receive messages compressed with ZSTD.
	//
	// Note: SNAPPY is supported since Pulsar 2.4. Consumers will need to be at least at that
	// release in order to be able to receive messages compressed with SNAPPY.
	CompressionType

	// Set a custom message routing policy by passing an implementation of MessageRouter
	// The router is a function that given a particular message and the topic metadata, returns the
	// partition index where the message should be routed to
	MessageRouter func(Message, TopicMetadata) int

	// Control whether automatic batching of messages is enabled for the producer. Default: false [No batching]
	//
	// When batching is enabled, multiple calls to Producer.sendAsync can result in a single batch to be sent to the
	// broker, leading to better throughput, especially when publishing small messages. If compression is enabled,
	// messages will be compressed at the batch level, leading to a much better compression ratio for similar headers or
	// contents.
	//
	// When enabled default batch delay is set to 1 ms and default batch size is 1000 messages
	Batching bool

	// Set the time period within which the messages sent will be batched (default: 10ms) if batch messages are
	// enabled. If set to a non zero value, messages will be queued until this time interval or until
	BatchingMaxPublishDelay time.Duration

	// Set the maximum number of messages permitted in a batch. (default: 1000) If set to a value greater than 1,
	// messages will be queued until this threshold is reached or batch interval has elapsed
	BatchingMaxMessages uint
}

type ProtoSchema

type ProtoSchema struct {
	AvroCodec
	SchemaInfo
}

func NewProtoSchema

func NewProtoSchema(protoAvroSchemaDef string, properties map[string]string) *ProtoSchema

func (*ProtoSchema) Decode

func (ps *ProtoSchema) Decode(data []byte, v interface{}) error

func (*ProtoSchema) Encode

func (ps *ProtoSchema) Encode(data interface{}) ([]byte, error)

func (*ProtoSchema) GetSchemaInfo

func (ps *ProtoSchema) GetSchemaInfo() *SchemaInfo

func (*ProtoSchema) Validate

func (ps *ProtoSchema) Validate(message []byte) error

type Reader

type Reader interface {
	// The topic from which this reader is reading from
	Topic() string

	// Read the next message in the topic, blocking until a message is available
	Next(context.Context) (Message, error)

	// Check if there is any message available to read from the current position
	HasNext() (bool, error)

	// Close the reader and stop the broker to push more messages
	Close() error

	Schema() Schema
}

A Reader can be used to scan through all the messages currently available in a topic.

type ReaderMessage

type ReaderMessage struct {
	Reader
	Message
}

type ReaderOptions

type ReaderOptions struct {
	// Specify the topic this consumer will subscribe on.
	// This argument is required when constructing the reader.
	Topic string

	// Set the reader name.
	Name string

	// The initial reader positioning is done by specifying a message id. The options are:
	//  * `pulsar.EarliestMessage` : Start reading from the earliest message available in the topic
	//  * `pulsar.LatestMessage` : Start reading from the end topic, only getting messages published after the
	//                           reader was created
	//  * `MessageID` : Start reading from a particular message id, the reader will position itself on that
	//                  specific position. The first message to be read will be the message next to the specified
	//                  messageID
	StartMessageID MessageID

	// Sets a `MessageChannel` for the consumer
	// When a message is received, it will be pushed to the channel for consumption
	MessageChannel chan ReaderMessage

	// Sets the size of the consumer receive queue.
	// The consumer receive queue controls how many messages can be accumulated by the Reader before the
	// application calls Reader.readNext(). Using a higher value could potentially increase the consumer
	// throughput at the expense of bigger memory utilization.
	//
	// Default value is {@code 1000} messages and should be good for most use cases.
	ReceiverQueueSize int

	// Set the subscription role prefix. The default prefix is "reader".
	SubscriptionRolePrefix string

	// If enabled, the reader will read messages from the compacted topic rather than reading the full message backlog
	// of the topic. This means that, if the topic has been compacted, the reader will only see the latest value for
	// each key in the topic, up until the point in the topic message backlog that has been compacted. Beyond that
	// point, the messages will be sent as normal.
	//
	// ReadCompacted can only be enabled when reading from a persistent topic. Attempting to enable it on non-persistent
	// topics will lead to the reader create call throwing a PulsarClientException.
	ReadCompacted bool
}

type Result

type Result int
const (
	UnknownError                          Result = 1  // Unknown error happened on broker
	InvalidConfiguration                  Result = 2  // Invalid configuration
	TimeoutError                          Result = 3  // Operation timed out
	LookupError                           Result = 4  // Broker lookup failed
	ConnectError                          Result = 5  // Failed to connect to broker
	ReadError                             Result = 6  // Failed to read from socket
	AuthenticationError                   Result = 7  // Authentication failed on broker
	AuthorizationError                    Result = 8  // Client is not authorized to create producer/consumer
	ErrorGettingAuthenticationData        Result = 9  // Client cannot find authorization data
	BrokerMetadataError                   Result = 10 // Broker failed in updating metadata
	BrokerPersistenceError                Result = 11 // Broker failed to persist entry
	ChecksumError                         Result = 12 // Corrupt message checksum failure
	ConsumerBusy                          Result = 13 // Exclusive consumer is already connected
	NotConnectedError                     Result = 14 // Producer/Consumer is not currently connected to broker
	AlreadyClosedError                    Result = 15 // Producer/Consumer is already closed and not accepting any operation
	InvalidMessage                        Result = 16 // Error in publishing an already used message
	ConsumerNotInitialized                Result = 17 // Consumer is not initialized
	ProducerNotInitialized                Result = 18 // Producer is not initialized
	TooManyLookupRequestException         Result = 19 // Too Many concurrent LookupRequest
	InvalidTopicName                      Result = 20 // Invalid topic name
	InvalidUrl                            Result = 21 // Client Initialized with Invalid Broker Url (VIP Url passed to Client Constructor)
	ServiceUnitNotReady                   Result = 22 // Service Unit unloaded between client did lookup and producer/consumer got created
	OperationNotSupported                 Result = 23
	ProducerBlockedQuotaExceededError     Result = 24 // Producer is blocked
	ProducerBlockedQuotaExceededException Result = 25 // Producer is getting exception
	ProducerQueueIsFull                   Result = 26 // Producer queue is full
	MessageTooBig                         Result = 27 // Trying to send a messages exceeding the max size
	TopicNotFound                         Result = 28 // Topic not found
	SubscriptionNotFound                  Result = 29 // Subscription not found
	ConsumerNotFound                      Result = 30 // Consumer not found
	UnsupportedVersionError               Result = 31 // Error when an older client/version doesn't support a required feature
	TopicTerminated                       Result = 32 // Topic was already terminated
	CryptoError                           Result = 33 // Error when crypto operation fails
)

func (Result) String

func (r Result) String() string

type Schema

type Schema interface {
	Encode(v interface{}) ([]byte, error)
	Decode(data []byte, v interface{}) error
	Validate(message []byte) error
	GetSchemaInfo() *SchemaInfo
}

type SchemaInfo

type SchemaInfo struct {
	Name       string
	Schema     string
	Type       SchemaType
	Properties map[string]string
}

Encapsulates data around the schema definition

type SchemaType

type SchemaType int
const (
	NONE     SchemaType = iota //No schema defined
	STRING                     //Simple String encoding with UTF-8
	JSON                       //JSON object encoding and validation
	PROTOBUF                   //Protobuf message encoding and decoding
	AVRO                       //Serialize and deserialize via Avro
	BOOLEAN                    //
	INT8                       //A 8-byte integer.
	INT16                      //A 16-byte integer.
	INT32                      //A 32-byte integer.
	INT64                      //A 64-byte integer.
	FLOAT                      //A float number.
	DOUBLE                     //A double number

	KEY_VALUE         //A Schema that contains Key Schema and Value Schema.
	BYTES        = -1 //A bytes array.
	AUTO         = -2 //
	AUTO_CONSUME = -3 //Auto Consume Type.
	AUTO_PUBLISH = -4 // Auto Publish Type.
)

type StringSchema

type StringSchema struct {
	SchemaInfo
}

func NewStringSchema

func NewStringSchema(properties map[string]string) *StringSchema

func (*StringSchema) Decode

func (ss *StringSchema) Decode(data []byte, v interface{}) error

func (*StringSchema) Encode

func (ss *StringSchema) Encode(v interface{}) ([]byte, error)

func (*StringSchema) GetSchemaInfo

func (ss *StringSchema) GetSchemaInfo() *SchemaInfo

func (*StringSchema) Validate

func (ss *StringSchema) Validate(message []byte) error

type SubscriptionType

type SubscriptionType int

Types of subscription supported by Pulsar

const (
	// There can be only 1 consumer on the same topic with the same subscription name
	Exclusive SubscriptionType = iota

	// Multiple consumer will be able to use the same subscription name and the messages will be dispatched according to
	// a round-robin rotation between the connected consumers
	Shared

	// Multiple consumer will be able to use the same subscription name but only 1 consumer will receive the messages.
	// If that consumer disconnects, one of the other connected consumers will start receiving messages.
	Failover

	// Multiple consumer will be able to use the same subscription and all messages with the same key
	// will be dispatched to only one consumer
	KeyShared
)

type TopicMetadata

type TopicMetadata interface {
	// Get the number of partitions for the specific topic
	NumPartitions() int
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL