Documentation ¶
Index ¶
- Variables
- type Authentication
- func NewAuthentication(name string, params string) (Authentication, error)
- func NewAuthenticationAthenz(authParams map[string]string) Authentication
- func NewAuthenticationTLS(certificatePath string, privateKeyPath string) Authentication
- func NewAuthenticationToken(token string) Authentication
- func NewAuthenticationTokenFromFile(tokenFilePath string) Authentication
- func NewAuthenticationTokenFromSupplier(tokenSupplier func() (string, error)) Authentication
- type Client
- type ClientOptions
- type CompressionType
- type Consumer
- type ConsumerMessage
- type ConsumerOptions
- type DLQPolicy
- type Error
- type HashingScheme
- type Message
- type MessageID
- type Producer
- type ProducerMessage
- type ProducerOptions
- type Reader
- type ReaderMessage
- type ReaderOptions
- type Result
- type SubscriptionInitialPosition
- type SubscriptionType
- type TopicMetadata
Constants ¶
This section is empty.
Variables ¶
var ErrConsumerClosed = errors.New("consumer closed")
Functions ¶
This section is empty.
Types ¶
type Authentication ¶
type Authentication interface{}
Opaque interface that represents the authentication credentials
func NewAuthentication ¶
func NewAuthentication(name string, params string) (Authentication, error)
func NewAuthenticationAthenz ¶
func NewAuthenticationAthenz(authParams map[string]string) Authentication
func NewAuthenticationTLS ¶
func NewAuthenticationTLS(certificatePath string, privateKeyPath string) Authentication
Create new Authentication provider with specified TLS certificate and private key
func NewAuthenticationToken ¶
func NewAuthenticationToken(token string) Authentication
Create new Authentication provider with specified auth token
func NewAuthenticationTokenFromFile ¶
func NewAuthenticationTokenFromFile(tokenFilePath string) Authentication
Create new Authentication provider with specified auth token from a file
func NewAuthenticationTokenFromSupplier ¶
func NewAuthenticationTokenFromSupplier(tokenSupplier func() (string, error)) Authentication
NewAuthenticationTokenFromSupplier returns a token auth provider that gets the token data from a user supplied function. The function is invoked each time the client library needs to use a token in talking with Pulsar brokers
type Client ¶
type Client interface { // Create the producer instance // This method will block until the producer is created successfully CreateProducer(ProducerOptions) (Producer, error) // Create a `Consumer` by subscribing to a topic. // // If the subscription does not exist, a new subscription will be created and all messages published after the // creation will be retained until acknowledged, even if the consumer is not connected Subscribe(ConsumerOptions) (Consumer, error) // Create a Reader instance. // This method will block until the reader is created successfully. CreateReader(ReaderOptions) (Reader, error) // Fetch the list of partitions for a given topic // // If the topic is partitioned, this will return a list of partition names. // If the topic is not partitioned, the returned list will contain the topic // name itself. // // This can be used to discover the partitions and create {@link Reader}, // {@link Consumer} or {@link Producer} instances directly on a particular partition. TopicPartitions(topic string) ([]string, error) // Close the Client and free associated resources Close() }
func NewClient ¶
func NewClient(options ClientOptions) (Client, error)
type ClientOptions ¶
type ClientOptions struct { // Configure the service URL for the Pulsar service. // This parameter is required URL string // Timeout for the establishment of a TCP connection (default: 30 seconds) ConnectionTimeout time.Duration // Set the operation timeout (default: 30 seconds) // Producer-create, subscribe and unsubscribe operations will be retried until this interval, after which the // operation will be marked as failed OperationTimeout time.Duration // Configure the authentication provider. (default: no authentication) // Example: `Authentication: NewAuthenticationTLS("my-cert.pem", "my-key.pem")` Authentication // Set the path to the trusted TLS certificate file TLSTrustCertsFilePath string // Configure whether the Pulsar client accept untrusted TLS certificate from broker (default: false) TLSAllowInsecureConnection bool // Configure whether the Pulsar client verify the validity of the host name from broker (default: false) TLSValidateHostname bool }
Builder interface that is used to construct a Pulsar Client instance.
type CompressionType ¶
type CompressionType int
const ( NoCompression CompressionType = iota LZ4 ZLib ZSTD )
type Consumer ¶
type Consumer interface { // Subscription get a subscription for the consumer Subscription() string // Unsubscribe the consumer Unsubscribe() error // Receive a single message. // This calls blocks until a message is available. Receive(context.Context) (Message, error) // Chan returns a channel to consume messages from Chan() <-chan ConsumerMessage // Ack the consumption of a single message Ack(Message) // AckID the consumption of a single message, identified by its MessageID AckID(MessageID) // Acknowledge the failure to process a single message. // // When a message is "negatively acked" it will be marked for redelivery after // some fixed delay. The delay is configurable when constructing the consumer // with ConsumerOptions.NAckRedeliveryDelay . // // This call is not blocking. Nack(Message) // Acknowledge the failure to process a single message. // // When a message is "negatively acked" it will be marked for redelivery after // some fixed delay. The delay is configurable when constructing the consumer // with ConsumerOptions.NackRedeliveryDelay . // // This call is not blocking. NackID(MessageID) // Close the consumer and stop the broker to push more messages Close() // Reset the subscription associated with this consumer to a specific message id. // The message id can either be a specific message or represent the first or last messages in the topic. // // Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the // seek() on the individual partitions. Seek(MessageID) error // Reset the subscription associated with this consumer to a specific message publish time. // // Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the seek() on // the individual partitions. // // @param timestamp // the message publish time where to reposition the subscription // SeekByTime(time time.Time) error }
Consumer is an interface that abstracts behavior of Pulsar's consumer
type ConsumerMessage ¶
Pair of a Consumer and Message
type ConsumerOptions ¶
type ConsumerOptions struct { // Specify the topic this consumer will subscribe on. // Either a topic, a list of topics or a topics pattern are required when subscribing Topic string // Specify a list of topics this consumer will subscribe on. // Either a topic, a list of topics or a topics pattern are required when subscribing Topics []string // Specify a regular expression to subscribe to multiple topics under the same namespace. // Either a topic, a list of topics or a topics pattern are required when subscribing TopicsPattern string // Specify the interval in which to poll for new partitions or new topics if using a TopicsPattern. AutoDiscoveryPeriod time.Duration // Specify the subscription name for this consumer // This argument is required when subscribing SubscriptionName string // Attach a set of application defined properties to the consumer // This properties will be visible in the topic stats Properties map[string]string // Select the subscription type to be used when subscribing to the topic. // Default is `Exclusive` Type SubscriptionType // InitialPosition at which the cursor will be set when subscribe // Default is `Latest` SubscriptionInitialPosition // Configuration for Dead Letter Queue consumer policy. // eg. route the message to topic X after N failed attempts at processing it // By default is nil and there's no DLQ DLQ *DLQPolicy // Sets a `MessageChannel` for the consumer // When a message is received, it will be pushed to the channel for consumption MessageChannel chan ConsumerMessage // Sets the size of the consumer receive queue. // The consumer receive queue controls how many messages can be accumulated by the `Consumer` before the // application calls `Consumer.receive()`. Using a higher value could potentially increase the consumer // throughput at the expense of bigger memory utilization. // Default value is `1000` messages and should be good for most use cases. // Set to -1 to disable prefetching in consumer ReceiverQueueSize int // The delay after which to redeliver the messages that failed to be // processed. Default is 1min. (See `Consumer.Nack()`) NackRedeliveryDelay time.Duration // Set the consumer name. Name string // If enabled, the consumer will read messages from the compacted topic rather than reading the full message backlog // of the topic. This means that, if the topic has been compacted, the consumer will only see the latest value for // each key in the topic, up until the point in the topic message backlog that has been compacted. Beyond that // point, the messages will be sent as normal. // // ReadCompacted can only be enabled subscriptions to persistent topics, which have a single active consumer (i.e. // failure or exclusive subscriptions). Attempting to enable it on subscriptions to a non-persistent topics or on a // shared subscription, will lead to the subscription call throwing a PulsarClientException. ReadCompacted bool // Mark the subscription as replicated to keep it in sync across clusters ReplicateSubscriptionState bool }
ConsumerOptions is used to configure and create instances of Consumer
type DLQPolicy ¶
type DLQPolicy struct { // Maximum number of times that a message will be delivered before being sent to the dead letter queue. MaxDeliveries uint32 // Name of the topic where the failing messages will be sent. Topic string }
Configuration for Dead Letter Queue consumer policy
type Error ¶
type Error struct {
// contains filtered or unexported fields
}
Error implement error interface, composed of two parts: msg and result.
type HashingScheme ¶
type HashingScheme int
const ( // JavaStringHash and Java String.hashCode() equivalent JavaStringHash HashingScheme = iota // Murmur3_32Hash use Murmur3 hashing function Murmur3_32Hash )
type Message ¶
type Message interface { // Topic get the topic from which this message originated from Topic() string // Properties are application defined key/value pairs that will be attached to the message. // Return the properties attached to the message. Properties() map[string]string // Payload get the payload of the message Payload() []byte // ID get the unique message ID associated with this message. // The message id can be used to univocally refer to a message without having the keep the entire payload in memory. ID() MessageID // PublishTime get the publish time of this message. The publish time is the timestamp that a client // publish the message. PublishTime() time.Time // EventTime get the event time associated with this message. It is typically set by the applications via // `ProducerMessage.EventTime`. // If EventTime is 0, it means there isn't any event time associated with this message. EventTime() time.Time // Key get the key of the message, if any Key() string // Get message redelivery count, redelivery count maintain in pulsar broker. When client nack acknowledge messages, // broker will dispatch message again with message redelivery count in CommandMessage defined. // // Message redelivery increases monotonically in a broker, when topic switch ownership to a another broker // redelivery count will be recalculated. RedeliveryCount() uint32 // Check whether the message is replicated from other cluster. IsReplicated() bool // Get name of cluster, from which the message is replicated. GetReplicatedFrom() string }
Message abstraction used in Pulsar
type MessageID ¶
type MessageID interface { // Serialize the message id into a sequence of bytes that can be stored somewhere else Serialize() []byte }
MessageID identifier for a particular message
func DeserializeMessageID ¶
DeserializeMessageID reconstruct a MessageID object from its serialized representation
func EarliestMessageID ¶
func EarliestMessageID() MessageID
EarliestMessageID returns a messageID that points to the earliest message available in a topic
func LatestMessageID ¶
func LatestMessageID() MessageID
LatestMessage returns a messageID that points to the latest message
type Producer ¶
type Producer interface { // Topic return the topic to which producer is publishing to Topic() string // Name return the producer name which could have been assigned by the system or specified by the client Name() string // Send a message // This call will be blocking until is successfully acknowledged by the Pulsar broker. // Example: // producer.Send(ctx, pulsar.ProducerMessage{ Payload: myPayload }) Send(context.Context, *ProducerMessage) (MessageID, error) // SendAsync a message in asynchronous mode // This call is blocked when the `event channel` becomes full (default: 10) or the // `maxPendingMessages` becomes full (default: 1000) // The callback will report back the message being published and // the eventual error in publishing SendAsync(context.Context, *ProducerMessage, func(MessageID, *ProducerMessage, error)) // LastSequenceID get the last sequence id that was published by this producer. // This represent either the automatically assigned or custom sequence id (set on the ProducerMessage) that // was published and acknowledged by the broker. // After recreating a producer with the same producer name, this will return the last message that was // published in the previous producer session, or -1 if there no message was ever published. // return the last sequence id published by this producer. LastSequenceID() int64 // Flush all the messages buffered in the client and wait until all messages have been successfully // persisted. Flush() error // Close the producer and releases resources allocated // No more writes will be accepted from this producer. Waits until all pending write request are persisted. In case // of errors, pending writes will not be retried. Close() }
Producer is used to publish messages on a topic
type ProducerMessage ¶
type ProducerMessage struct { // Payload for the message Payload []byte // Key sets the key of the message for routing policy Key string // Properties attach application defined properties on the message Properties map[string]string // EventTime set the event time for a given message // By default, messages don't have an event time associated, while the publish // time will be be always present. // Set the event time to a non-zero timestamp to explicitly declare the time // that the event "happened", as opposed to when the message is being published. EventTime time.Time // ReplicationClusters override the replication clusters for this message. ReplicationClusters []string // SequenceID set the sequence id to assign to the current message SequenceID *int64 // Request to deliver the message only after the specified relative delay. // Note: messages are only delivered with delay when a consumer is consuming // through a `SubscriptionType=Shared` subscription. With other subscription // types, the messages will still be delivered immediately. DeliverAfter time.Duration // Deliver the message only at or after the specified absolute timestamp. // Note: messages are only delivered with delay when a consumer is consuming // through a `SubscriptionType=Shared` subscription. With other subscription // types, the messages will still be delivered immediately. DeliverAt time.Time }
ProducerMessage abstraction used in Pulsar producer
type ProducerOptions ¶
type ProducerOptions struct { // Topic specify the topic this producer will be publishing on. // This argument is required when constructing the producer. Topic string // Name specify a name for the producer // If not assigned, the system will generate a globally unique name which can be access with // Producer.ProducerName(). // When specifying a name, it is up to the user to ensure that, for a given topic, the producer name is unique // across all Pulsar's clusters. Brokers will enforce that only a single producer a given name can be publishing on // a topic. Name string // Properties attach a set of application defined properties to the producer // This properties will be visible in the topic stats Properties map[string]string // MaxPendingMessages set the max size of the queue holding the messages pending to receive an // acknowledgment from the broker. MaxPendingMessages int // HashingScheme change the `HashingScheme` used to chose the partition on where to publish a particular message. // Standard hashing functions available are: // // - `JavaStringHash` : Java String.hashCode() equivalent // - `Murmur3_32Hash` : Use Murmur3 hashing function. // https://en.wikipedia.org/wiki/MurmurHash">https://en.wikipedia.org/wiki/MurmurHash // // Default is `JavaStringHash`. HashingScheme // CompressionType set the compression type for the producer. // By default, message payloads are not compressed. Supported compression types are: // - LZ4 // - ZLIB // - ZSTD // // Note: ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that // release in order to be able to receive messages compressed with ZSTD. CompressionType // MessageRouter set a custom message routing policy by passing an implementation of MessageRouter // The router is a function that given a particular message and the topic metadata, returns the // partition index where the message should be routed to MessageRouter func(*ProducerMessage, TopicMetadata) int // DisableBatching control whether automatic batching of messages is enabled for the producer. By default batching // is enabled. // When batching is enabled, multiple calls to Producer.sendAsync can result in a single batch to be sent to the // broker, leading to better throughput, especially when publishing small messages. If compression is enabled, // messages will be compressed at the batch level, leading to a much better compression ratio for similar headers or // contents. // When enabled default batch delay is set to 1 ms and default batch size is 1000 messages // Setting `DisableBatching: true` will make the producer to send messages individually DisableBatching bool // BatchingMaxPublishDelay set the time period within which the messages sent will be batched (default: 10ms) // if batch messages are enabled. If set to a non zero value, messages will be queued until this time // interval or until BatchingMaxPublishDelay time.Duration // BatchingMaxMessages set the maximum number of messages permitted in a batch. (default: 1000) // If set to a value greater than 1, messages will be queued until this threshold is reached or // batch interval has elapsed. BatchingMaxMessages uint }
type Reader ¶
type Reader interface { // Topic from which this reader is reading from Topic() string // Next read the next message in the topic, blocking until a message is available Next(context.Context) (Message, error) // HasNext check if there is any message available to read from the current position HasNext() bool // Close the reader and stop the broker to push more messages Close() }
Reader can be used to scan through all the messages currently available in a topic.
type ReaderMessage ¶
ReaderMessage package Reader and Message as a struct to use
type ReaderOptions ¶
type ReaderOptions struct { // Topic specify the topic this consumer will subscribe on. // This argument is required when constructing the reader. Topic string // Name set the reader name. Name string // Attach a set of application defined properties to the reader // This properties will be visible in the topic stats Properties map[string]string // StartMessageID initial reader positioning is done by specifying a message id. The options are: // * `pulsar.EarliestMessage` : Start reading from the earliest message available in the topic // * `pulsar.LatestMessage` : Start reading from the end topic, only getting messages published after the // reader was created // * `MessageID` : Start reading from a particular message id, the reader will position itself on that // specific position. The first message to be read will be the message next to the specified // messageID StartMessageID MessageID // If true, the reader will start at the `StartMessageID`, included. // Default is `false` and the reader will start from the "next" message StartMessageIDInclusive bool // MessageChannel sets a `MessageChannel` for the consumer // When a message is received, it will be pushed to the channel for consumption MessageChannel chan ReaderMessage // ReceiverQueueSize sets the size of the consumer receive queue. // The consumer receive queue controls how many messages can be accumulated by the Reader before the // application calls Reader.readNext(). Using a higher value could potentially increase the consumer // throughput at the expense of bigger memory utilization. // Default value is {@code 1000} messages and should be good for most use cases. ReceiverQueueSize int // SubscriptionRolePrefix set the subscription role prefix. The default prefix is "reader". SubscriptionRolePrefix string // If enabled, the reader will read messages from the compacted topic rather than reading the full message backlog // of the topic. This means that, if the topic has been compacted, the reader will only see the latest value for // each key in the topic, up until the point in the topic message backlog that has been compacted. Beyond that // point, the messages will be sent as normal. // // ReadCompacted can only be enabled when reading from a persistent topic. Attempting to enable it on non-persistent // topics will lead to the reader create call throwing a PulsarClientException. ReadCompacted bool }
ReaderOptions abstraction Reader options to use.
type Result ¶
type Result int
Result used to represent pulsar processing is an alias of type int.
const ( // ResultOk means no errors ResultOk Result = iota // ResultUnknownError means unknown error happened on broker ResultUnknownError // ResultInvalidConfiguration means invalid configuration ResultInvalidConfiguration // ResultTimeoutError means operation timed out ResultTimeoutError // ResultLookupError means broker lookup failed ResultLookupError // ResultInvalidTopicName means invalid topic name ResultInvalidTopicName // ResultConnectError means failed to connect to broker ResultConnectError // ReadError means failed to read from socket //ReadError Result = 6 // AuthenticationError means authentication failed on broker //AuthenticationError Result = 7 //AuthorizationError Result = 8 //ErrorGettingAuthenticationData Result = 9 // Client cannot find authorization data //BrokerMetadataError Result = 10 // Broker failed in updating metadata //BrokerPersistenceError Result = 11 // Broker failed to persist entry //ChecksumError Result = 12 // Corrupt message checksum failure // ConsumerBusy means Exclusive consumer is already connected ConsumerBusy Result = 13 //NotConnectedError Result = 14 // Producer/Consumer is not currently connected to broker //AlreadyClosedError Result = 15 // Producer/Consumer is already closed and not accepting any operation //InvalidMessage Result = 16 // Error in publishing an already used message //ConsumerNotInitialized Result = 17 // Consumer is not initialized //ProducerNotInitialized Result = 18 // Producer is not initialized //TooManyLookupRequestException Result = 19 // Too Many concurrent LookupRequest // InvalidUrl means Client Initialized with Invalid Broker Url (VIP Url passed to Client Constructor) //InvalidUrl Result = 21 // ServiceUnitNotReady unloaded between client did lookup and producer/consumer got created //ServiceUnitNotReady Result = 22 //OperationNotSupported Result = 23 //ProducerBlockedQuotaExceededError Result = 24 // Producer is blocked //ProducerBlockedQuotaExceededException Result = 25 // Producer is getting exception //ProducerQueueIsFull Result = 26 // Producer queue is full //MessageTooBig Result = 27 // Trying to send a messages exceeding the max size TopicNotFound Result = 28 // Topic not found SubscriptionNotFound Result = 29 // Subscription not found )
type SubscriptionInitialPosition ¶
type SubscriptionInitialPosition int
const ( // Latest position which means the start consuming position will be the last message SubscriptionPositionLatest SubscriptionInitialPosition = iota // Earliest position which means the start consuming position will be the first message SubscriptionPositionEarliest )
type SubscriptionType ¶
type SubscriptionType int
SubscriptionType of subscription supported by Pulsar
const ( // Exclusive there can be only 1 consumer on the same topic with the same subscription name Exclusive SubscriptionType = iota // and the messages will be dispatched according to // a round-robin rotation between the connected consumers Shared // Failover subscription mode, multiple consumer will be able to use the same subscription name // but only 1 consumer will receive the messages. // If that consumer disconnects, one of the other connected consumers will start receiving messages. Failover // subscription and all messages with the same key will be dispatched to only one consumer KeyShared )
type TopicMetadata ¶
type TopicMetadata interface { // NumPartitions get the number of partitions for the specific topic NumPartitions() uint32 }
TopicMetadata is a interface of topic metadata