Documentation ¶
Index ¶
- Constants
- func GetBatcherBuilderProvider(typ BatcherBuilderType) (internal.BatcherBuilderProvider, error)
- func NewDefaultRouter(hashFunc func(string) uint32, maxBatchingMessages uint, maxBatchingSize uint, ...) func(*ProducerMessage, uint32) int
- func ReadElements(r io.Reader, elements ...interface{}) error
- func WriteElements(w io.Writer, elements ...interface{}) error
- type Authentication
- func NewAuthentication(name string, params string) (Authentication, error)
- func NewAuthenticationAthenz(authParams map[string]string) Authentication
- func NewAuthenticationFromTLSCertSupplier(tlsCertSupplier func() (*tls.Certificate, error)) Authentication
- func NewAuthenticationOAuth2(authParams map[string]string) (Authentication, error)
- func NewAuthenticationTLS(certificatePath string, privateKeyPath string) Authentication
- func NewAuthenticationToken(token string) Authentication
- func NewAuthenticationTokenFromFile(tokenFilePath string) Authentication
- func NewAuthenticationTokenFromSupplier(tokenSupplier func() (string, error)) Authentication
- type AvroCodec
- type AvroSchema
- type BatcherBuilderType
- type BinaryFreeList
- func (b BinaryFreeList) Borrow() (buf []byte)
- func (b BinaryFreeList) Float32(buf []byte) (float32, error)
- func (b BinaryFreeList) Float64(buf []byte) (float64, error)
- func (b BinaryFreeList) PutDouble(datum interface{}) ([]byte, error)
- func (b BinaryFreeList) PutFloat(datum interface{}) ([]byte, error)
- func (b BinaryFreeList) PutUint16(w io.Writer, byteOrder binary.ByteOrder, val uint16) error
- func (b BinaryFreeList) PutUint32(w io.Writer, byteOrder binary.ByteOrder, val uint32) error
- func (b BinaryFreeList) PutUint64(w io.Writer, byteOrder binary.ByteOrder, val uint64) error
- func (b BinaryFreeList) PutUint8(w io.Writer, val uint8) error
- func (b BinaryFreeList) Return(buf []byte)
- func (b BinaryFreeList) Uint16(r io.Reader, byteOrder binary.ByteOrder) (uint16, error)
- func (b BinaryFreeList) Uint32(r io.Reader, byteOrder binary.ByteOrder) (uint32, error)
- func (b BinaryFreeList) Uint64(r io.Reader, byteOrder binary.ByteOrder) (uint64, error)
- func (b BinaryFreeList) Uint8(r io.Reader) (uint8, error)
- type BytesSchema
- type Client
- type ClientOptions
- type CompressionLevel
- type CompressionType
- type Consumer
- type ConsumerInterceptor
- type ConsumerInterceptors
- type ConsumerMessage
- type ConsumerOptions
- type DLQPolicy
- type DoubleSchema
- type EncryptionContext
- type EncryptionKey
- type Error
- type FloatSchema
- type HashingScheme
- type Int16Schema
- type Int32Schema
- type Int64Schema
- type Int8Schema
- type JSONSchema
- type KeySharedPolicy
- type KeySharedPolicyMode
- type Message
- type MessageDecryptionInfo
- type MessageID
- type MetricsCardinality
- type NackBackoffPolicy
- type Producer
- type ProducerEncryptionInfo
- type ProducerInterceptor
- type ProducerInterceptors
- type ProducerMessage
- type ProducerOptions
- type ProtoSchema
- type Reader
- type ReaderMessage
- type ReaderOptions
- type Result
- type RetryMessage
- type Schema
- type SchemaInfo
- type SchemaType
- type StringSchema
- type SubscriptionInitialPosition
- type SubscriptionType
- type TopicMetadata
Constants ¶
const ( DlqTopicSuffix = "-DLQ" RetryTopicSuffix = "-RETRY" MaxReconsumeTimes = 16 SysPropertyDelayTime = "DELAY_TIME" SysPropertyRealTopic = "REAL_TOPIC" SysPropertyRetryTopic = "RETRY_TOPIC" SysPropertyReconsumeTimes = "RECONSUMETIMES" SysPropertyOriginMessageID = "ORIGIN_MESSAGE_IDY_TIME" )
const (
IoMaxSize = 1024
)
Variables ¶
This section is empty.
Functions ¶
func GetBatcherBuilderProvider ¶
func GetBatcherBuilderProvider(typ BatcherBuilderType) ( internal.BatcherBuilderProvider, error, )
func NewDefaultRouter ¶
func NewDefaultRouter( hashFunc func(string) uint32, maxBatchingMessages uint, maxBatchingSize uint, maxBatchingDelay time.Duration, disableBatching bool) func(*ProducerMessage, uint32) int
NewDefaultRouter set the message routing mode for the partitioned producer. Default routing mode is round-robin routing if no partition key is specified. If the batching is enabled, it honors the different thresholds for batching i.e. maximum batch size, maximum number of messages, maximum delay to publish a batch. When one of the threshold is reached the next partition is used.
func ReadElements ¶
func WriteElements ¶
Types ¶
type Authentication ¶
type Authentication interface{}
Authentication Opaque interface that represents the authentication credentials
func NewAuthentication ¶
func NewAuthentication(name string, params string) (Authentication, error)
NewAuthentication Creates an authentication by name and params
func NewAuthenticationAthenz ¶
func NewAuthenticationAthenz(authParams map[string]string) Authentication
NewAuthenticationAthenz Creates Athenz Authentication provider
func NewAuthenticationFromTLSCertSupplier ¶
func NewAuthenticationFromTLSCertSupplier(tlsCertSupplier func() (*tls.Certificate, error)) Authentication
NewAuthenticationFromTLSCertSupplier Create new Authentication provider with specified TLS certificate supplier
func NewAuthenticationOAuth2 ¶
func NewAuthenticationOAuth2(authParams map[string]string) (Authentication, error)
NewAuthenticationOAuth2 Creates OAuth2 Authentication provider
func NewAuthenticationTLS ¶
func NewAuthenticationTLS(certificatePath string, privateKeyPath string) Authentication
NewAuthenticationTLS Creates new Authentication provider with specified TLS certificate and private key
func NewAuthenticationToken ¶
func NewAuthenticationToken(token string) Authentication
NewAuthenticationToken Creates new Authentication provider with specified auth token
func NewAuthenticationTokenFromFile ¶
func NewAuthenticationTokenFromFile(tokenFilePath string) Authentication
NewAuthenticationTokenFromFile Creates new Authentication provider with specified auth token from a file
func NewAuthenticationTokenFromSupplier ¶
func NewAuthenticationTokenFromSupplier(tokenSupplier func() (string, error)) Authentication
NewAuthenticationTokenFromSupplier returns a token auth provider that gets the token data from a user supplied function. The function is invoked each time the client library needs to use a token in talking with Pulsar brokers
type AvroCodec ¶
type AvroCodec struct {
Codec *goavro.Codec
}
func NewSchemaDefinition ¶
func NewSchemaDefinition(schema *goavro.Codec) *AvroCodec
type AvroSchema ¶
type AvroSchema struct { AvroCodec SchemaInfo }
func NewAvroSchema ¶
func NewAvroSchema(avroSchemaDef string, properties map[string]string) *AvroSchema
func (*AvroSchema) Decode ¶
func (as *AvroSchema) Decode(data []byte, v interface{}) error
func (*AvroSchema) Encode ¶
func (as *AvroSchema) Encode(data interface{}) ([]byte, error)
func (*AvroSchema) GetSchemaInfo ¶
func (as *AvroSchema) GetSchemaInfo() *SchemaInfo
func (*AvroSchema) Validate ¶
func (as *AvroSchema) Validate(message []byte) error
type BatcherBuilderType ¶
type BatcherBuilderType int
const ( DefaultBatchBuilder BatcherBuilderType = iota KeyBasedBatchBuilder )
type BinaryFreeList ¶
type BinaryFreeList chan []byte
var BinarySerializer BinaryFreeList = make(chan []byte, IoMaxSize)
func (BinaryFreeList) Borrow ¶
func (b BinaryFreeList) Borrow() (buf []byte)
func (BinaryFreeList) PutDouble ¶
func (b BinaryFreeList) PutDouble(datum interface{}) ([]byte, error)
func (BinaryFreeList) PutFloat ¶
func (b BinaryFreeList) PutFloat(datum interface{}) ([]byte, error)
func (BinaryFreeList) Return ¶
func (b BinaryFreeList) Return(buf []byte)
type BytesSchema ¶
type BytesSchema struct {
SchemaInfo
}
func NewBytesSchema ¶
func NewBytesSchema(properties map[string]string) *BytesSchema
func (*BytesSchema) Decode ¶
func (bs *BytesSchema) Decode(data []byte, v interface{}) error
func (*BytesSchema) Encode ¶
func (bs *BytesSchema) Encode(data interface{}) ([]byte, error)
func (*BytesSchema) GetSchemaInfo ¶
func (bs *BytesSchema) GetSchemaInfo() *SchemaInfo
func (*BytesSchema) Validate ¶
func (bs *BytesSchema) Validate(message []byte) error
type Client ¶
type Client interface { // CreateProducer Creates the producer instance // This method will block until the producer is created successfully CreateProducer(ProducerOptions) (Producer, error) // Subscribe Creates a `Consumer` by subscribing to a topic. // // If the subscription does not exist, a new subscription will be created and all messages published after the // creation will be retained until acknowledged, even if the consumer is not connected Subscribe(ConsumerOptions) (Consumer, error) // CreateReader Creates a Reader instance. // This method will block until the reader is created successfully. CreateReader(ReaderOptions) (Reader, error) // TopicPartitions Fetches the list of partitions for a given topic // // If the topic is partitioned, this will return a list of partition names. // If the topic is not partitioned, the returned list will contain the topic // name itself. // // This can be used to discover the partitions and create {@link Reader}, // {@link Consumer} or {@link Producer} instances directly on a particular partition. TopicPartitions(topic string) ([]string, error) // Close Closes the Client and free associated resources Close() }
Client represents a pulsar client
func NewClient ¶
func NewClient(options ClientOptions) (Client, error)
NewClient Creates a pulsar client instance
type ClientOptions ¶
type ClientOptions struct { // Configure the service URL for the Pulsar service. // This parameter is required URL string // Timeout for the establishment of a TCP connection (default: 5 seconds) ConnectionTimeout time.Duration // Set the operation timeout (default: 30 seconds) // Producer-create, subscribe and unsubscribe operations will be retried until this interval, after which the // operation will be marked as failed OperationTimeout time.Duration // Configure the authentication provider. (default: no authentication) // Example: `Authentication: NewAuthenticationTLS("my-cert.pem", "my-key.pem")` Authentication // Set the path to the trusted TLS certificate file TLSTrustCertsFilePath string // Configure whether the Pulsar client accept untrusted TLS certificate from broker (default: false) TLSAllowInsecureConnection bool // Configure whether the Pulsar client verify the validity of the host name from broker (default: false) TLSValidateHostname bool // Configure the net model for vpc user to connect the pulsar broker ListenerName string // Max number of connections to a single broker that will kept in the pool. (Default: 1 connection) MaxConnectionsPerBroker int // Configure the logger used by the client. // By default, a wrapped logrus.StandardLogger will be used, namely, // log.NewLoggerWithLogrus(logrus.StandardLogger()) // FIXME: use `logger` as internal field name instead of `log` as it's more idiomatic Logger log.Logger // Specify metric cardinality to the tenant, namespace or topic levels, or remove it completely. // Default: MetricsCardinalityNamespace MetricsCardinality MetricsCardinality // Add custom labels to all the metrics reported by this client instance CustomMetricsLabels map[string]string }
ClientOptions is used to construct a Pulsar Client instance.
type CompressionLevel ¶
type CompressionLevel int
const ( // Default compression level Default CompressionLevel = iota // Faster compression, with lower compression ratio Faster // Higher compression rate, but slower Better )
type CompressionType ¶
type CompressionType int
const ( NoCompression CompressionType = iota LZ4 ZLib ZSTD )
type Consumer ¶
type Consumer interface { // Subscription get a subscription for the consumer Subscription() string // Unsubscribe the consumer Unsubscribe() error // Receive a single message. // This calls blocks until a message is available. Receive(context.Context) (Message, error) // Chan returns a channel to consume messages from Chan() <-chan ConsumerMessage // Ack the consumption of a single message Ack(Message) // AckID the consumption of a single message, identified by its MessageID AckID(MessageID) // ReconsumeLater mark a message for redelivery after custom delay ReconsumeLater(msg Message, delay time.Duration) // Nack acknowledges the failure to process a single message. // // When a message is "negatively acked" it will be marked for redelivery after // some fixed delay. The delay is configurable when constructing the consumer // with ConsumerOptions.NAckRedeliveryDelay . // // This call is not blocking. Nack(Message) // NackID acknowledges the failure to process a single message. // // When a message is "negatively acked" it will be marked for redelivery after // some fixed delay. The delay is configurable when constructing the consumer // with ConsumerOptions.NackRedeliveryDelay . // // This call is not blocking. NackID(MessageID) // Close the consumer and stop the broker to push more messages Close() // Seek resets the subscription associated with this consumer to a specific message id. // The message id can either be a specific message or represent the first or last messages in the topic. // // Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the // seek() on the individual partitions. Seek(MessageID) error // SeekByTime resets the subscription associated with this consumer to a specific message publish time. // // Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the seek() on // the individual partitions. // // @param timestamp // the message publish time where to reposition the subscription // SeekByTime(time time.Time) error // Name returns the name of consumer. Name() string }
Consumer is an interface that abstracts behavior of Pulsar's consumer
type ConsumerInterceptor ¶
type ConsumerInterceptor interface { // BeforeConsume This is called just before the message is send to Consumer's ConsumerMessage channel. BeforeConsume(message ConsumerMessage) // OnAcknowledge This is called consumer sends the acknowledgment to the broker. OnAcknowledge(consumer Consumer, msgID MessageID) // OnNegativeAcksSend This method will be called when a redelivery from a negative acknowledge occurs. OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID) }
type ConsumerInterceptors ¶
type ConsumerInterceptors []ConsumerInterceptor
func (ConsumerInterceptors) BeforeConsume ¶
func (x ConsumerInterceptors) BeforeConsume(message ConsumerMessage)
func (ConsumerInterceptors) OnAcknowledge ¶
func (x ConsumerInterceptors) OnAcknowledge(consumer Consumer, msgID MessageID)
func (ConsumerInterceptors) OnNegativeAcksSend ¶
func (x ConsumerInterceptors) OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID)
type ConsumerMessage ¶
ConsumerMessage represents a pair of a Consumer and Message.
type ConsumerOptions ¶
type ConsumerOptions struct { // Topic specifies the topic this consumer will subscribe on. // Either a topic, a list of topics or a topics pattern are required when subscribing Topic string // Topics specifies a list of topics this consumer will subscribe on. // Either a topic, a list of topics or a topics pattern are required when subscribing Topics []string // TopicsPattern specifies a regular expression to subscribe to multiple topics under the same namespace. // Either a topic, a list of topics or a topics pattern are required when subscribing TopicsPattern string // AutoDiscoveryPeriod specifies the interval in which to poll for new partitions or new topics // if using a TopicsPattern. AutoDiscoveryPeriod time.Duration // SubscriptionName specifies the subscription name for this consumer // This argument is required when subscribing SubscriptionName string // Properties represents a set of application defined properties for the consumer. // Those properties will be visible in the topic stats Properties map[string]string // SubscriptionProperties specify the subscription properties for this subscription. // // > Notice: SubscriptionProperties are immutable, and consumers under the same subscription will fail to create a // > subscription if they use different properties. SubscriptionProperties map[string]string // Type specifies the subscription type to be used when subscribing to a topic. // Default is `Exclusive` Type SubscriptionType // SubscriptionInitialPosition is the initial position at which the cursor will be set when subscribe // Default is `Latest` SubscriptionInitialPosition // DLQ represents the configuration for Dead Letter Queue consumer policy. // eg. route the message to topic X after N failed attempts at processing it // By default is nil and there's no DLQ DLQ *DLQPolicy KeySharedPolicy *KeySharedPolicy // RetryEnable determines whether to automatically retry sending messages to default filled DLQPolicy topics. // Default is false RetryEnable bool // MessageChannel sets a `MessageChannel` for the consumer // When a message is received, it will be pushed to the channel for consumption MessageChannel chan ConsumerMessage // ReceiverQueueSize sets the size of the consumer receive queue. // The consumer receive queue controls how many messages can be accumulated by the `Consumer` before the // application calls `Consumer.receive()`. Using a higher value could potentially increase the consumer // throughput at the expense of bigger memory utilization. // Default value is `1000` messages and should be good for most use cases. ReceiverQueueSize int // NackRedeliveryDelay specifies the delay after which to redeliver the messages that failed to be // processed. Default is 1 min. (See `Consumer.Nack()`) NackRedeliveryDelay time.Duration // Name specifies the consumer name. Name string // ReadCompacted, if enabled, the consumer will read messages from the compacted topic rather than reading the // full message backlog of the topic. This means that, if the topic has been compacted, the consumer will only // see the latest value for each key in the topic, up until the point in the topic message backlog that has been // compacted. Beyond that point, the messages will be sent as normal. // // ReadCompacted can only be enabled subscriptions to persistent topics, which have a single active consumer (i.e. // failure or exclusive subscriptions). Attempting to enable it on subscriptions to a non-persistent topics or on a // shared subscription, will lead to the subscription call throwing a PulsarClientException. ReadCompacted bool // ReplicateSubscriptionState marks the subscription as replicated to keep it in sync across clusters ReplicateSubscriptionState bool // Interceptors is a chain of interceptors. These interceptors will be called at some points defined in // ConsumerInterceptor interface. Interceptors ConsumerInterceptors // Schema represents the schema implementation. Schema Schema // MaxReconnectToBroker sets the maximum retry number of reconnectToBroker. (default: ultimate) MaxReconnectToBroker *uint // Decryption represents the encryption related fields required by the consumer to decrypt a message. Decryption *MessageDecryptionInfo // EnableDefaultNackBackoffPolicy, if enabled, the default implementation of NackBackoffPolicy will be used // to calculate the delay time of // nack backoff, Default: false. EnableDefaultNackBackoffPolicy bool // NackBackoffPolicy is a redelivery backoff mechanism which we can achieve redelivery with different // delays according to the number of times the message is retried. // // > Notice: the NackBackoffPolicy will not work with `consumer.NackID(MessageID)` // > because we are not able to get the redeliveryCount from the message ID. NackBackoffPolicy NackBackoffPolicy }
ConsumerOptions is used to configure and create instances of Consumer.
type DLQPolicy ¶
type DLQPolicy struct { // MaxDeliveries specifies the maximum number of times that a message will be delivered before being // sent to the dead letter queue. MaxDeliveries uint32 // DeadLetterTopic specifies the name of the topic where the failing messages will be sent. DeadLetterTopic string // RetryLetterTopic specifies the name of the topic where the retry messages will be sent. RetryLetterTopic string }
DLQPolicy represents the configuration for the Dead Letter Queue consumer policy.
type DoubleSchema ¶
type DoubleSchema struct {
SchemaInfo
}
func NewDoubleSchema ¶
func NewDoubleSchema(properties map[string]string) *DoubleSchema
func (*DoubleSchema) Decode ¶
func (ds *DoubleSchema) Decode(data []byte, v interface{}) error
func (*DoubleSchema) Encode ¶
func (ds *DoubleSchema) Encode(value interface{}) ([]byte, error)
func (*DoubleSchema) GetSchemaInfo ¶
func (ds *DoubleSchema) GetSchemaInfo() *SchemaInfo
func (*DoubleSchema) Validate ¶
func (ds *DoubleSchema) Validate(message []byte) error
type EncryptionContext ¶
type EncryptionContext struct { Keys map[string]EncryptionKey Param []byte Algorithm string CompressionType CompressionType UncompressedSize int BatchSize int }
EncryptionContext It will be used to decrypt message outside of this client
type EncryptionKey ¶
EncryptionKey Encryption key used to encrypt the message payload
type Error ¶
type Error struct {
// contains filtered or unexported fields
}
Error implement error interface, composed of two parts: msg and result.
type FloatSchema ¶
type FloatSchema struct {
SchemaInfo
}
func NewFloatSchema ¶
func NewFloatSchema(properties map[string]string) *FloatSchema
func (*FloatSchema) Decode ¶
func (fs *FloatSchema) Decode(data []byte, v interface{}) error
func (*FloatSchema) Encode ¶
func (fs *FloatSchema) Encode(value interface{}) ([]byte, error)
func (*FloatSchema) GetSchemaInfo ¶
func (fs *FloatSchema) GetSchemaInfo() *SchemaInfo
func (*FloatSchema) Validate ¶
func (fs *FloatSchema) Validate(message []byte) error
type HashingScheme ¶
type HashingScheme int
const ( // JavaStringHash and Java String.hashCode() equivalent JavaStringHash HashingScheme = iota // Murmur3_32Hash use Murmur3 hashing function Murmur3_32Hash )
type Int16Schema ¶
type Int16Schema struct {
SchemaInfo
}
func NewInt16Schema ¶
func NewInt16Schema(properties map[string]string) *Int16Schema
func (*Int16Schema) Decode ¶
func (is16 *Int16Schema) Decode(data []byte, v interface{}) error
func (*Int16Schema) Encode ¶
func (is16 *Int16Schema) Encode(value interface{}) ([]byte, error)
func (*Int16Schema) GetSchemaInfo ¶
func (is16 *Int16Schema) GetSchemaInfo() *SchemaInfo
func (*Int16Schema) Validate ¶
func (is16 *Int16Schema) Validate(message []byte) error
type Int32Schema ¶
type Int32Schema struct {
SchemaInfo
}
func NewInt32Schema ¶
func NewInt32Schema(properties map[string]string) *Int32Schema
func (*Int32Schema) Decode ¶
func (is32 *Int32Schema) Decode(data []byte, v interface{}) error
func (*Int32Schema) Encode ¶
func (is32 *Int32Schema) Encode(value interface{}) ([]byte, error)
func (*Int32Schema) GetSchemaInfo ¶
func (is32 *Int32Schema) GetSchemaInfo() *SchemaInfo
func (*Int32Schema) Validate ¶
func (is32 *Int32Schema) Validate(message []byte) error
type Int64Schema ¶
type Int64Schema struct {
SchemaInfo
}
func NewInt64Schema ¶
func NewInt64Schema(properties map[string]string) *Int64Schema
func (*Int64Schema) Decode ¶
func (is64 *Int64Schema) Decode(data []byte, v interface{}) error
func (*Int64Schema) Encode ¶
func (is64 *Int64Schema) Encode(value interface{}) ([]byte, error)
func (*Int64Schema) GetSchemaInfo ¶
func (is64 *Int64Schema) GetSchemaInfo() *SchemaInfo
func (*Int64Schema) Validate ¶
func (is64 *Int64Schema) Validate(message []byte) error
type Int8Schema ¶
type Int8Schema struct {
SchemaInfo
}
func NewInt8Schema ¶
func NewInt8Schema(properties map[string]string) *Int8Schema
func (*Int8Schema) Decode ¶
func (is8 *Int8Schema) Decode(data []byte, v interface{}) error
func (*Int8Schema) Encode ¶
func (is8 *Int8Schema) Encode(value interface{}) ([]byte, error)
func (*Int8Schema) GetSchemaInfo ¶
func (is8 *Int8Schema) GetSchemaInfo() *SchemaInfo
func (*Int8Schema) Validate ¶
func (is8 *Int8Schema) Validate(message []byte) error
type JSONSchema ¶
type JSONSchema struct { AvroCodec SchemaInfo }
func NewJSONSchema ¶
func NewJSONSchema(jsonAvroSchemaDef string, properties map[string]string) *JSONSchema
func (*JSONSchema) Decode ¶
func (js *JSONSchema) Decode(data []byte, v interface{}) error
func (*JSONSchema) Encode ¶
func (js *JSONSchema) Encode(data interface{}) ([]byte, error)
func (*JSONSchema) GetSchemaInfo ¶
func (js *JSONSchema) GetSchemaInfo() *SchemaInfo
func (*JSONSchema) Validate ¶
func (js *JSONSchema) Validate(message []byte) error
type KeySharedPolicy ¶
type KeySharedPolicy struct { KeySharedPolicyMode HashRanges []int // failures. This will make it faster for new consumers to join without being stalled by an existing slow consumer. AllowOutOfOrderDelivery bool }Mode
KeySharedPolicy for KeyShared subscription
func NewKeySharedPolicySticky ¶
func NewKeySharedPolicySticky(hashRanges []int) (*KeySharedPolicy, error)
NewKeySharedPolicySticky construct KeySharedPolicy in Sticky mode with hashRanges formed in value pair list: [x1, x2, y1, y2, z1, z2], and must not overlap with each others
type KeySharedPolicyMode ¶
type KeySharedPolicyMode int
const ( KeySharedPolicyMode = iota KeySharedPolicyModeSticky )KeySharedPolicyModeAutoSplit
type Message ¶
type Message interface { // Topic returns the topic from which this message originated from. Topic() string // ProducerName returns the name of the producer that has published the message. ProducerName() string // Properties are application defined key/value pairs that will be attached to the message. // Returns the properties attached to the message. Properties() map[string]string // Payload returns the payload of the message Payload() []byte // ID returns the unique message ID associated with this message. // The message id can be used to univocally refer to a message without having the keep the entire payload in memory. ID() MessageID // PublishTime returns the publish time of this message. The publish time is the timestamp that a client // publish the message. PublishTime() time.Time // EventTime returns the event time associated with this message. It is typically set by the applications via // `ProducerMessage.EventTime`. // If EventTime is 0, it means there isn't any event time associated with this message. EventTime() time.Time // Key returns the key of the message, if any Key() string // OrderingKey returns the ordering key of the message, if any OrderingKey() string // RedeliveryCount returns message redelivery count, redelivery count maintain in pulsar broker. // When client nack acknowledge messages, // broker will dispatch message again with message redelivery count in CommandMessage defined. // // Message redelivery increases monotonically in a broker, when topic switch ownership to a another broker // redelivery count will be recalculated. RedeliveryCount() uint32 // IsReplicated determines whether the message is replicated from another cluster. IsReplicated() bool // GetReplicatedFrom returns the name of the cluster, from which the message is replicated. GetReplicatedFrom() string // GetSchemaValue returns the de-serialized value of the message, according to the configuration. GetSchemaValue(v interface{}) error // GetEncryptionContext returns the ecryption context of the message. // It will be used by the application to parse the undecrypted message. GetEncryptionContext() *EncryptionContext }
Message abstraction used in Pulsar
type MessageDecryptionInfo ¶
type MessageDecryptionInfo struct { // KeyReader read RSA public/private key pairs KeyReader crypto.KeyReader // MessageCrypto used to encrypt and decrypt the data and session keys MessageCrypto crypto.MessageCrypto // ConsumerCryptoFailureAction action to be taken on failure of message decryption ConsumerCryptoFailureAction int }
MessageDecryptionInfo encryption related fields required by the consumer to decrypt the message
type MessageID ¶
type MessageID interface { // Serialize the message id into a sequence of bytes that can be stored somewhere else Serialize() []byte // LedgerID returns the message ledgerID LedgerID() int64 // EntryID returns the message entryID EntryID() int64 // BatchIdx returns the message batchIdx BatchIdx() int32 // PartitionIdx returns the message partitionIdx PartitionIdx() int32 }
MessageID identifier for a particular message
func DeserializeMessageID ¶
DeserializeMessageID reconstruct a MessageID object from its serialized representation
func EarliestMessageID ¶
func EarliestMessageID() MessageID
EarliestMessageID returns a messageID that points to the earliest message available in a topic
func LatestMessageID ¶
func LatestMessageID() MessageID
LatestMessageID returns a messageID that points to the latest message
type MetricsCardinality ¶
type MetricsCardinality int
MetricsCardinality represents the specificty of labels on a per-metric basis
const ( MetricsCardinalityNone MetricsCardinality // Do not add additional labels to metrics MetricsCardinalityTenant // Label metrics by tenant MetricsCardinalityNamespace // Label metrics by tenant and namespace MetricsCardinalityTopic // Label metrics by topic )
type NackBackoffPolicy ¶
type NackBackoffPolicy interface { // The redeliveryCount indicates the number of times the message was redelivered. // We can get the redeliveryCount from the CommandMessage. Next(redeliveryCount uint32) int64 }
NackBackoffPolicy is a interface for custom message negativeAcked policy, users can specify a NackBackoffPolicy for a consumer.
> Notice: the consumer crashes will trigger the redelivery of the unacked message, this case will not respect the > NackBackoffPolicy, which means the message might get redelivered earlier than the delay time > from the backoff.
type Producer ¶
type Producer interface { // Topic return the topic to which producer is publishing to Topic() string // Name return the producer name which could have been assigned by the system or specified by the client Name() string // Send a message // This call will be blocking until is successfully acknowledged by the Pulsar broker. // Example: // producer.Send(ctx, pulsar.ProducerMessage{ Payload: myPayload }) Send(context.Context, *ProducerMessage) (MessageID, error) // SendAsync a message in asynchronous mode // This call is blocked when the `event channel` becomes full (default: 10) or the // `maxPendingMessages` becomes full (default: 1000) // The callback will report back the message being published and // the eventual error in publishing SendAsync(context.Context, *ProducerMessage, func(MessageID, *ProducerMessage, error)) // LastSequenceID get the last sequence id that was published by this producer. // This represent either the automatically assigned or custom sequence id (set on the ProducerMessage) that // was published and acknowledged by the broker. // After recreating a producer with the same producer name, this will return the last message that was // published in the previous producer session, or -1 if there no message was ever published. // return the last sequence id published by this producer. LastSequenceID() int64 // Flush all the messages buffered in the client and wait until all messages have been successfully // persisted. Flush() error // Close the producer and releases resources allocated // No more writes will be accepted from this producer. Waits until all pending write request are persisted. In case // of errors, pending writes will not be retried. Close() }
Producer is used to publish messages on a topic
type ProducerEncryptionInfo ¶
type ProducerEncryptionInfo struct { // KeyReader read RSA public/private key pairs KeyReader crypto.KeyReader // MessageCrypto used to encrypt and decrypt the data and session keys MessageCrypto crypto.MessageCrypto // Keys list of encryption key names to encrypt session key Keys []string // ProducerCryptoFailureAction action to be taken on failure of message encryption // default is ProducerCryptoFailureActionFail ProducerCryptoFailureAction int }
ProducerEncryptionInfo encryption related fields required by the producer
type ProducerInterceptor ¶
type ProducerInterceptor interface { // BeforeSend This is called before send the message to the brokers. This method is allowed to modify the BeforeSend(producer Producer, message *ProducerMessage) // OnSendAcknowledgement This method is called when the message sent to the broker has been acknowledged, // or when sending the message fails. OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) }
type ProducerInterceptors ¶
type ProducerInterceptors []ProducerInterceptor
func (ProducerInterceptors) BeforeSend ¶
func (x ProducerInterceptors) BeforeSend(producer Producer, message *ProducerMessage)
func (ProducerInterceptors) OnSendAcknowledgement ¶
func (x ProducerInterceptors) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID)
type ProducerMessage ¶
type ProducerMessage struct { // Payload for the message Payload []byte // Value and payload is mutually exclusive, `Value interface{}` for schema message. Value interface{} // Key sets the key of the message for routing policy Key string // OrderingKey sets the ordering key of the message OrderingKey string // Properties attach application defined properties on the message Properties map[string]string // EventTime set the event time for a given message // By default, messages don't have an event time associated, while the publish // time will be be always present. // Set the event time to a non-zero timestamp to explicitly declare the time // that the event "happened", as opposed to when the message is being published. EventTime time.Time // ReplicationClusters override the replication clusters for this message. ReplicationClusters []string // DisableReplication disables the replication for this message DisableReplication bool // SequenceID sets the sequence id to assign to the current message SequenceID *int64 // DeliverAfter requests to deliver the message only after the specified relative delay. // Note: messages are only delivered with delay when a consumer is consuming // through a `SubscriptionType=Shared` subscription. With other subscription // types, the messages will still be delivered immediately. DeliverAfter time.Duration // DeliverAt delivers the message only at or after the specified absolute timestamp. // Note: messages are only delivered with delay when a consumer is consuming // through a `SubscriptionType=Shared` subscription. With other subscription // types, the messages will still be delivered immediately. DeliverAt time.Time }
ProducerMessage abstraction used in Pulsar producer
type ProducerOptions ¶
type ProducerOptions struct { // Topic specifies the topic this producer will be publishing on. // This argument is required when constructing the producer. Topic string // Name specifies a name for the producer. // If not assigned, the system will generate a globally unique name which can be access with // Producer.ProducerName(). // When specifying a name, it is up to the user to ensure that, for a given topic, the producer name is unique // across all Pulsar's clusters. Brokers will enforce that only a single producer a given name can be publishing on // a topic. Name string // Properties specifies a set of application defined properties for the producer. // This properties will be visible in the topic stats Properties map[string]string // SendTimeout specifies the timeout for a message that has not been acknowledged by the server since sent. // Send and SendAsync returns an error after timeout. // Default is 30 seconds, negative such as -1 to disable. SendTimeout time.Duration // DisableBlockIfQueueFull controls whether Send and SendAsync block if producer's message queue is full. // Default is false, if set to true then Send and SendAsync return error when queue is full. DisableBlockIfQueueFull bool // MaxPendingMessages specifies the max size of the queue holding the messages pending to receive an // acknowledgment from the broker. MaxPendingMessages int // HashingScheme is used to define the partition on where to publish a particular message. // Standard hashing functions available are: // // - `JavaStringHash` : Java String.hashCode() equivalent // - `Murmur3_32Hash` : Use Murmur3 hashing function. // https://en.wikipedia.org/wiki/MurmurHash">https://en.wikipedia.org/wiki/MurmurHash // // Default is `JavaStringHash`. HashingScheme // CompressionType specifies the compression type for the producer. // By default, message payloads are not compressed. Supported compression types are: // - LZ4 // - ZLIB // - ZSTD // // Note: ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that // release in order to be able to receive messages compressed with ZSTD. CompressionType // CompressionLevel defines the desired compression level. Options: // - Default // - Faster // - Better CompressionLevel // MessageRouter represents a custom message routing policy by passing an implementation of MessageRouter // The router is a function that given a particular message and the topic metadata, returns the // partition index where the message should be routed to MessageRouter func(*ProducerMessage, TopicMetadata) int // DisableBatching controls whether automatic batching of messages is enabled for the producer. By default batching // is enabled. // When batching is enabled, multiple calls to Producer.sendAsync can result in a single batch to be sent to the // broker, leading to better throughput, especially when publishing small messages. If compression is enabled, // messages will be compressed at the batch level, leading to a much better compression ratio for similar headers or // contents. // When enabled default batch delay is set to 1 ms and default batch size is 1000 messages // Setting `DisableBatching: true` will make the producer to send messages individually DisableBatching bool // BatchingMaxPublishDelay specifies the time period within which the messages sent will be batched (default: 10ms) // if batch messages are enabled. If set to a non zero value, messages will be queued until this time // interval or until BatchingMaxPublishDelay time.Duration // BatchingMaxMessages specifies the maximum number of messages permitted in a batch. (default: 1000) // If set to a value greater than 1, messages will be queued until this threshold is reached or // BatchingMaxSize (see below) has been reached or the batch interval has elapsed. BatchingMaxMessages uint // BatchingMaxSize specifies the maximum number of bytes permitted in a batch. (default 128 KB) // If set to a value greater than 1, messages will be queued until this threshold is reached or // BatchingMaxMessages (see above) has been reached or the batch interval has elapsed. BatchingMaxSize uint // Interceptors is a chain of interceptors, These interceptors will be called at some points defined // in ProducerInterceptor interface Interceptors ProducerInterceptors // Schema represents the schema implementation. Schema Schema // MaxReconnectToBroker specifies the maximum retry number of reconnectToBroker. (default: ultimate) MaxReconnectToBroker *uint // BatcherBuilderType sets the batch builder type (default DefaultBatchBuilder) // This will be used to create batch container when batching is enabled. // Options: // - DefaultBatchBuilder // - KeyBasedBatchBuilder BatcherBuilderType // PartitionsAutoDiscoveryInterval is the time interval for the background process to discover new partitions // Default is 1 minute PartitionsAutoDiscoveryInterval time.Duration // Encryption specifies the fields required to encrypt a message Encryption *ProducerEncryptionInfo }
type ProtoSchema ¶
type ProtoSchema struct { AvroCodec SchemaInfo }
func NewProtoSchema ¶
func NewProtoSchema(protoAvroSchemaDef string, properties map[string]string) *ProtoSchema
func (*ProtoSchema) Decode ¶
func (ps *ProtoSchema) Decode(data []byte, v interface{}) error
func (*ProtoSchema) Encode ¶
func (ps *ProtoSchema) Encode(data interface{}) ([]byte, error)
func (*ProtoSchema) GetSchemaInfo ¶
func (ps *ProtoSchema) GetSchemaInfo() *SchemaInfo
func (*ProtoSchema) Validate ¶
func (ps *ProtoSchema) Validate(message []byte) error
type Reader ¶
type Reader interface { // Topic from which this reader is reading from Topic() string // Next reads the next message in the topic, blocking until a message is available Next(context.Context) (Message, error) // HasNext checks if there is any message available to read from the current position HasNext() bool // Close the reader and stop the broker to push more messages Close() // Seek resets the subscription associated with this reader to a specific message id. // The message id can either be a specific message or represent the first or last messages in the topic. // // Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the // seek() on the individual partitions. Seek(MessageID) error // SeekByTime resets the subscription associated with this reader to a specific message publish time. // // Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the seek() on // the individual partitions. // // @param timestamp // the message publish time where to reposition the subscription // SeekByTime(time time.Time) error }
Reader can be used to scan through all the messages currently available in a topic.
type ReaderMessage ¶
ReaderMessage packages Reader and Message as a struct to use.
type ReaderOptions ¶
type ReaderOptions struct { // Topic specifies the topic this consumer will subscribe on. // This argument is required when constructing the reader. Topic string // Name set the reader name. Name string // Properties represents a set of application defined properties for the reader. // Those properties will be visible in the topic stats. Properties map[string]string // StartMessageID initial reader positioning is done by specifying a message id. The options are: // * `pulsar.EarliestMessage` : Start reading from the earliest message available in the topic // * `pulsar.LatestMessage` : Start reading from the end topic, only getting messages published after the // reader was created // * `MessageID` : Start reading from a particular message id, the reader will position itself on that // specific position. The first message to be read will be the message next to the specified // messageID StartMessageID MessageID // StartMessageIDInclusive, if true, the reader will start at the `StartMessageID`, included. // Default is `false` and the reader will start from the "next" message StartMessageIDInclusive bool // MessageChannel sets a `MessageChannel` for the consumer // When a message is received, it will be pushed to the channel for consumption MessageChannel chan ReaderMessage // ReceiverQueueSize sets the size of the consumer receive queue. // The consumer receive queue controls how many messages can be accumulated by the Reader before the // application calls Reader.readNext(). Using a higher value could potentially increase the consumer // throughput at the expense of bigger memory utilization. // Default value is {@code 1000} messages and should be good for most use cases. ReceiverQueueSize int // SubscriptionRolePrefix sets the subscription role prefix. The default prefix is "reader". SubscriptionRolePrefix string // ReadCompacted, if enabled, the reader will read messages from the compacted topic rather than reading the // full message backlog of the topic. This means that, if the topic has been compacted, the reader will only // see the latest value for each key in the topic, up until the point in the topic message backlog that has // been compacted. Beyond that point, the messages will be sent as normal. // // ReadCompacted can only be enabled when reading from a persistent topic. Attempting to enable it on non-persistent // topics will lead to the reader create call throwing a PulsarClientException. ReadCompacted bool // Decryption represents the encryption related fields required by the reader to decrypt a message. Decryption *MessageDecryptionInfo }
ReaderOptions represents Reader options to use.
type Result ¶
type Result int
Result used to represent pulsar processing is an alias of type int.
const ( // Ok means no errors Ok Result = iota // UnknownError means unknown error happened on broker UnknownError // InvalidConfiguration means invalid configuration InvalidConfiguration // TimeoutError means operation timed out TimeoutError //LookupError means broker lookup failed LookupError // ConnectError means failed to connect to broker ConnectError // ReadError means failed to read from socket ReadError // AuthenticationError means authentication failed on broker AuthenticationError // AuthorizationError client is not authorized to create producer/consumer AuthorizationError // ErrorGettingAuthenticationData client cannot find authorization data ErrorGettingAuthenticationData // BrokerMetadataError broker failed in updating metadata BrokerMetadataError // BrokerPersistenceError broker failed to persist entry BrokerPersistenceError // ChecksumError corrupt message checksum failure ChecksumError // ConsumerBusy means Exclusive consumer is already connected ConsumerBusy // NotConnectedError producer/consumer is not currently connected to broker NotConnectedError // AlreadyClosedError producer/consumer is already closed and not accepting any operation AlreadyClosedError // InvalidMessage error in publishing an already used message InvalidMessage // ConsumerNotInitialized consumer is not initialized ConsumerNotInitialized // ProducerNotInitialized producer is not initialized ProducerNotInitialized // TooManyLookupRequestException too many concurrent LookupRequest TooManyLookupRequestException // InvalidTopicName means invalid topic name InvalidTopicName // InvalidURL means Client Initialized with Invalid Broker Url (VIP Url passed to Client Constructor) InvalidURL // ServiceUnitNotReady unloaded between client did lookup and producer/consumer got created ServiceUnitNotReady // OperationNotSupported operation not supported OperationNotSupported // ProducerBlockedQuotaExceededError producer is blocked ProducerBlockedQuotaExceededError // ProducerBlockedQuotaExceededException producer is getting exception ProducerBlockedQuotaExceededException // ProducerQueueIsFull producer queue is full ProducerQueueIsFull // MessageTooBig trying to send a messages exceeding the max size MessageTooBig // TopicNotFound topic not found TopicNotFound // SubscriptionNotFound subscription not found SubscriptionNotFound // ConsumerNotFound consumer not found ConsumerNotFound // UnsupportedVersionError when an older client/version doesn't support a required feature UnsupportedVersionError // TopicTerminated topic was already terminated TopicTerminated // CryptoError error when crypto operation fails CryptoError // ConsumerClosed means consumer already been closed ConsumerClosed // InvalidBatchBuilderType invalid batch builder type InvalidBatchBuilderType // AddToBatchFailed failed to add sendRequest to batchBuilder AddToBatchFailed // SeekFailed seek failed SeekFailed // ProducerClosed means producer already been closed ProducerClosed )
type RetryMessage ¶
type RetryMessage struct {
// contains filtered or unexported fields
}
type SchemaInfo ¶
type SchemaInfo struct { Name string Schema string Type SchemaType Properties map[string]string }
Encapsulates data around the schema definition
type SchemaType ¶
type SchemaType int
const ( NONE SchemaType = iota //No schema defined STRING //Simple String encoding with UTF-8 JSON //JSON object encoding and validation PROTOBUF //Protobuf message encoding and decoding AVRO //Serialize and deserialize via Avro BOOLEAN // INT8 //A 8-byte integer. INT16 //A 16-byte integer. INT32 //A 32-byte integer. INT64 //A 64-byte integer. FLOAT //A float number. DOUBLE //A double number KeyValue //A Schema that contains Key Schema and Value Schema. BYTES = -1 //A bytes array. AUTO = -2 // AutoConsume = -3 //Auto Consume Type. AutoPublish = -4 // Auto Publish Type. )
type StringSchema ¶
type StringSchema struct {
SchemaInfo
}
func NewStringSchema ¶
func NewStringSchema(properties map[string]string) *StringSchema
func (*StringSchema) Decode ¶
func (ss *StringSchema) Decode(data []byte, v interface{}) error
func (*StringSchema) Encode ¶
func (ss *StringSchema) Encode(v interface{}) ([]byte, error)
func (*StringSchema) GetSchemaInfo ¶
func (ss *StringSchema) GetSchemaInfo() *SchemaInfo
func (*StringSchema) Validate ¶
func (ss *StringSchema) Validate(message []byte) error
type SubscriptionInitialPosition ¶
type SubscriptionInitialPosition int
const ( // SubscriptionPositionLatest is the latest position which means the start consuming position // will be the last message SubscriptionPositionLatest SubscriptionInitialPosition = iota // SubscriptionPositionEarliest is the earliest position which means the start consuming position // will be the first message SubscriptionPositionEarliest )
type SubscriptionType ¶
type SubscriptionType int
SubscriptionType of subscription supported by Pulsar
const ( // Exclusive there can be only 1 consumer on the same topic with the same subscription name Exclusive SubscriptionType = iota // and the messages will be dispatched according to // a round-robin rotation between the connected consumers Shared // Failover subscription mode, multiple consumer will be able to use the same subscription name // but only 1 consumer will receive the messages. // If that consumer disconnects, one of the other connected consumers will start receiving messages. Failover // subscription and all messages with the same key will be dispatched to only one consumer KeyShared )
type TopicMetadata ¶
type TopicMetadata interface { // NumPartitions returns the number of partitions for a particular topic. NumPartitions() uint32 }
TopicMetadata represents a topic metadata.
Source Files ¶
- batcher_builder.go
- client.go
- client_impl.go
- consumer.go
- consumer_impl.go
- consumer_interceptor.go
- consumer_multitopic.go
- consumer_partition.go
- consumer_regex.go
- default_router.go
- dlq_router.go
- encryption.go
- error.go
- helper.go
- impl_message.go
- key_shared_policy.go
- message.go
- negative_acks_tracker.go
- negative_backoff_policy.go
- primitiveSerDe.go
- producer.go
- producer_impl.go
- producer_interceptor.go
- producer_partition.go
- reader.go
- reader_impl.go
- retry_router.go
- schema.go
- test_helper.go
Directories ¶
Path | Synopsis |
---|---|
Package log defines the logger interfaces used by pulsar client.
|
Package log defines the logger interfaces used by pulsar client. |