Documentation ¶
Index ¶
- Constants
- func ReadElements(r io.Reader, elements ...interface{}) error
- func WriteElements(w io.Writer, elements ...interface{}) error
- type Authentication
- type AvroCodec
- type AvroSchema
- type BinaryFreeList
- func (b BinaryFreeList) Borrow() (buf []byte)
- func (b BinaryFreeList) Float32(buf []byte) (float32, error)
- func (b BinaryFreeList) Float64(buf []byte) (float64, error)
- func (b BinaryFreeList) PutDouble(datum interface{}) ([]byte, error)
- func (b BinaryFreeList) PutFloat(datum interface{}) ([]byte, error)
- func (b BinaryFreeList) PutUint16(w io.Writer, byteOrder binary.ByteOrder, val uint16) error
- func (b BinaryFreeList) PutUint32(w io.Writer, byteOrder binary.ByteOrder, val uint32) error
- func (b BinaryFreeList) PutUint64(w io.Writer, byteOrder binary.ByteOrder, val uint64) error
- func (b BinaryFreeList) PutUint8(w io.Writer, val uint8) error
- func (b BinaryFreeList) Return(buf []byte)
- func (b BinaryFreeList) Uint16(r io.Reader, byteOrder binary.ByteOrder) (uint16, error)
- func (b BinaryFreeList) Uint32(r io.Reader, byteOrder binary.ByteOrder) (uint32, error)
- func (b BinaryFreeList) Uint64(r io.Reader, byteOrder binary.ByteOrder) (uint64, error)
- func (b BinaryFreeList) Uint8(r io.Reader) (uint8, error)
- type BytesSchema
- type Client
- type ClientOptions
- type CompressionType
- type Consumer
- type ConsumerMessage
- type ConsumerOptions
- type DoubleSchema
- type Error
- type FloatSchema
- type HashingScheme
- type InitialPosition
- type Int16Schema
- type Int32Schema
- type Int64Schema
- type Int8Schema
- type JsonSchema
- type Message
- type MessageID
- type MessageRoutingMode
- type Producer
- type ProducerMessage
- type ProducerOptions
- type ProtoSchema
- type Reader
- type ReaderMessage
- type ReaderOptions
- type Result
- type Schema
- type SchemaInfo
- type SchemaType
- type StringSchema
- type SubscriptionType
- type TopicMetadata
Constants ¶
const (
IoMaxSize = 1024
)
Variables ¶
This section is empty.
Functions ¶
func ReadElements ¶
func WriteElements ¶
Types ¶
type Authentication ¶
type Authentication interface{}
Opaque interface that represents the authentication credentials
func NewAuthenticationAthenz ¶
func NewAuthenticationAthenz(authParams string) Authentication
Create new Athenz Authentication provider with configuration in JSON form
func NewAuthenticationTLS ¶
func NewAuthenticationTLS(certificatePath string, privateKeyPath string) Authentication
Create new Authentication provider with specified TLS certificate and private key
func NewAuthenticationToken ¶
func NewAuthenticationToken(token string) Authentication
Create new Authentication provider with specified auth token
func NewAuthenticationTokenSupplier ¶
func NewAuthenticationTokenSupplier(tokenSupplier func() string) Authentication
Create new Authentication provider with specified auth token supplier
type AvroCodec ¶
func NewSchemaDefinition ¶
type AvroSchema ¶
type AvroSchema struct { AvroCodec SchemaInfo }
func NewAvroSchema ¶
func NewAvroSchema(avroSchemaDef string, properties map[string]string) *AvroSchema
func (*AvroSchema) Decode ¶
func (as *AvroSchema) Decode(data []byte, v interface{}) error
func (*AvroSchema) Encode ¶
func (as *AvroSchema) Encode(data interface{}) ([]byte, error)
func (*AvroSchema) GetSchemaInfo ¶
func (as *AvroSchema) GetSchemaInfo() *SchemaInfo
func (*AvroSchema) Validate ¶
func (as *AvroSchema) Validate(message []byte) error
type BinaryFreeList ¶
type BinaryFreeList chan []byte
var BinarySerializer BinaryFreeList = make(chan []byte, IoMaxSize)
func (BinaryFreeList) Borrow ¶
func (b BinaryFreeList) Borrow() (buf []byte)
func (BinaryFreeList) PutDouble ¶
func (b BinaryFreeList) PutDouble(datum interface{}) ([]byte, error)
func (BinaryFreeList) PutFloat ¶
func (b BinaryFreeList) PutFloat(datum interface{}) ([]byte, error)
func (BinaryFreeList) Return ¶
func (b BinaryFreeList) Return(buf []byte)
type BytesSchema ¶
type BytesSchema struct {
SchemaInfo
}
func NewBytesSchema ¶
func NewBytesSchema(properties map[string]string) *BytesSchema
func (*BytesSchema) Decode ¶
func (bs *BytesSchema) Decode(data []byte, v interface{}) error
func (*BytesSchema) Encode ¶
func (bs *BytesSchema) Encode(data interface{}) ([]byte, error)
func (*BytesSchema) GetSchemaInfo ¶
func (bs *BytesSchema) GetSchemaInfo() *SchemaInfo
func (*BytesSchema) Validate ¶
func (bs *BytesSchema) Validate(message []byte) error
type Client ¶
type Client interface { // Create the producer instance // This method will block until the producer is created successfully CreateProducer(ProducerOptions) (Producer, error) CreateProducerWithSchema(ProducerOptions, Schema) (Producer, error) // Create a `Consumer` by subscribing to a topic. // // If the subscription does not exist, a new subscription will be created and all messages published after the // creation will be retained until acknowledged, even if the consumer is not connected Subscribe(ConsumerOptions) (Consumer, error) SubscribeWithSchema(ConsumerOptions, Schema) (Consumer, error) // Create a Reader instance. // This method will block until the reader is created successfully. CreateReader(ReaderOptions) (Reader, error) CreateReaderWithSchema(ReaderOptions, Schema) (Reader, error) // Fetch the list of partitions for a given topic // // If the topic is partitioned, this will return a list of partition names. // If the topic is not partitioned, the returned list will contain the topic // name itself. // // This can be used to discover the partitions and create {@link Reader}, // {@link Consumer} or {@link Producer} instances directly on a particular partition. TopicPartitions(topic string) ([]string, error) // Close the Client and free associated resources Close() error }
func NewClient ¶
func NewClient(options ClientOptions) (Client, error)
type ClientOptions ¶
type ClientOptions struct { // Configure the service URL for the Pulsar service. // This parameter is required URL string // Number of threads to be used for handling connections to brokers (default: 1 thread) IOThreads int // Set the operation timeout (default: 30 seconds) // Producer-create, subscribe and unsubscribe operations will be retried until this interval, after which the // operation will be marked as failed OperationTimeoutSeconds time.Duration // Set the number of threads to be used for message listeners (default: 1 thread) MessageListenerThreads int // Number of concurrent lookup-requests allowed to send on each broker-connection to prevent overload on broker. // (default: 5000) It should be configured with higher value only in case of it requires to produce/subscribe // on thousands of topic using created Pulsar Client ConcurrentLookupRequests int // Provide a custom logger implementation where all Pulsar library info/warn/error messages will be routed // By default, log messages will be printed on standard output. By passing a logger function, application // can determine how to print logs. This function will be called each time the Pulsar client library wants // to write any logs. Logger func(level log.LoggerLevel, file string, line int, message string) // Set the path to the trusted TLS certificate file TLSTrustCertsFilePath string // Configure whether the Pulsar client accept untrusted TLS certificate from broker (default: false) TLSAllowInsecureConnection bool // Configure whether the Pulsar client verify the validity of the host name from broker (default: false) TLSValidateHostname bool // Configure the authentication provider. (default: no authentication) // Example: `Authentication: NewAuthenticationTLS("my-cert.pem", "my-key.pem")` Authentication // Set the interval between each stat info (default: 60 seconds). Stats will be activated with positive // statsIntervalSeconds It should be set to at least 1 second StatsIntervalInSeconds int }
Builder interface that is used to construct a Pulsar Client instance.
type CompressionType ¶
type CompressionType int
const ( NoCompression CompressionType = iota LZ4 ZLib ZSTD SNAPPY )
type Consumer ¶
type Consumer interface { // Get the topic for the consumer Topic() string // Get a subscription for the consumer Subscription() string // Unsubscribe the consumer Unsubscribe() error // Receives a single message. // This calls blocks until a message is available. Receive(context.Context) (Message, error) //Ack the consumption of a single message Ack(Message) error // Ack the consumption of a single message, identified by its MessageID AckID(MessageID) error // Ack the reception of all the messages in the stream up to (and including) the provided message. // This method will block until the acknowledge has been sent to the broker. After that, the messages will not be // re-delivered to this consumer. // // Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared. // // It's equivalent to calling asyncAcknowledgeCumulative(Message) and waiting for the callback to be triggered. AckCumulative(Message) error // Ack the reception of all the messages in the stream up to (and including) the provided message. // This method will block until the acknowledge has been sent to the broker. After that, the messages will not be // re-delivered to this consumer. // // Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared. // // It's equivalent to calling asyncAcknowledgeCumulative(MessageID) and waiting for the callback to be triggered. AckCumulativeID(MessageID) error // Acknowledge the failure to process a single message. // // When a message is "negatively acked" it will be marked for redelivery after // some fixed delay. The delay is configurable when constructing the consumer // with ConsumerOptions.NAckRedeliveryDelay . // // This call is not blocking. Nack(Message) error // Acknowledge the failure to process a single message. // // When a message is "negatively acked" it will be marked for redelivery after // some fixed delay. The delay is configurable when constructing the consumer // with ConsumerOptions.NackRedeliveryDelay . // // This call is not blocking. NackID(MessageID) error // Close the consumer and stop the broker to push more messages Close() error // Reset the subscription associated with this consumer to a specific message id. // The message id can either be a specific message or represent the first or last messages in the topic. // // Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the // seek() on the individual partitions. Seek(msgID MessageID) error // Redelivers all the unacknowledged messages. In Failover mode, the request is ignored if the consumer is not // active for the given topic. In Shared mode, the consumers messages to be redelivered are distributed across all // the connected consumers. This is a non blocking call and doesn't throw an exception. In case the connection // breaks, the messages are redelivered after reconnect. RedeliverUnackedMessages() Schema() Schema }
An interface that abstracts behavior of Pulsar's consumer
type ConsumerMessage ¶
Pair of a Consumer and Message
type ConsumerOptions ¶
type ConsumerOptions struct { // Specify the topic this consumer will subscribe on. // Either a topic, a list of topics or a topics pattern are required when subscribing Topic string // Specify a list of topics this consumer will subscribe on. // Either a topic, a list of topics or a topics pattern are required when subscribing Topics []string // Specify a regular expression to subscribe to multiple topics under the same namespace. // Either a topic, a list of topics or a topics pattern are required when subscribing TopicsPattern string // Specify the subscription name for this consumer // This argument is required when subscribing SubscriptionName string // Attach a set of application defined properties to the consumer // This properties will be visible in the topic stats Properties map[string]string // Set the timeout for unacked messages // Message not acknowledged within the give time, will be replayed by the broker to the same or a different consumer // Default is 0, which means message are not being replayed based on ack time AckTimeout time.Duration // The delay after which to redeliver the messages that failed to be // processed. Default is 1min. (See `Consumer.Nack()`) NackRedeliveryDelay *time.Duration // Select the subscription type to be used when subscribing to the topic. // Default is `Exclusive` Type SubscriptionType // InitialPosition at which the cursor will be set when subscribe // Default is `Latest` SubscriptionInitPos InitialPosition // Sets a `MessageChannel` for the consumer // When a message is received, it will be pushed to the channel for consumption MessageChannel chan ConsumerMessage // Sets the size of the consumer receive queue. // The consumer receive queue controls how many messages can be accumulated by the `Consumer` before the // application calls `Consumer.receive()`. Using a higher value could potentially increase the consumer // throughput at the expense of bigger memory utilization. // Default value is `1000` messages and should be good for most use cases. // Set to -1 to disable prefetching in consumer ReceiverQueueSize int // Set the max total receiver queue size across partitions. // This setting will be used to reduce the receiver queue size for individual partitions // ReceiverQueueSize(int) if the total exceeds this value (default: 50000). MaxTotalReceiverQueueSizeAcrossPartitions int // Set the consumer name. Name string // If enabled, the consumer will read messages from the compacted topic rather than reading the full message backlog // of the topic. This means that, if the topic has been compacted, the consumer will only see the latest value for // each key in the topic, up until the point in the topic message backlog that has been compacted. Beyond that // point, the messages will be sent as normal. // // ReadCompacted can only be enabled subscriptions to persistent topics, which have a single active consumer (i.e. // failure or exclusive subscriptions). Attempting to enable it on subscriptions to a non-persistent topics or on a // shared subscription, will lead to the subscription call throwing a PulsarClientException. ReadCompacted bool Schema }
ConsumerBuilder is used to configure and create instances of Consumer
type DoubleSchema ¶
type DoubleSchema struct {
SchemaInfo
}
func NewDoubleSchema ¶
func NewDoubleSchema(properties map[string]string) *DoubleSchema
func (*DoubleSchema) Decode ¶
func (ds *DoubleSchema) Decode(data []byte, v interface{}) error
func (*DoubleSchema) Encode ¶
func (ds *DoubleSchema) Encode(value interface{}) ([]byte, error)
func (*DoubleSchema) GetSchemaInfo ¶
func (ds *DoubleSchema) GetSchemaInfo() *SchemaInfo
func (*DoubleSchema) Validate ¶
func (ds *DoubleSchema) Validate(message []byte) error
type FloatSchema ¶
type FloatSchema struct {
SchemaInfo
}
func NewFloatSchema ¶
func NewFloatSchema(properties map[string]string) *FloatSchema
func (*FloatSchema) Decode ¶
func (fs *FloatSchema) Decode(data []byte, v interface{}) error
func (*FloatSchema) Encode ¶
func (fs *FloatSchema) Encode(value interface{}) ([]byte, error)
func (*FloatSchema) GetSchemaInfo ¶
func (fs *FloatSchema) GetSchemaInfo() *SchemaInfo
func (*FloatSchema) Validate ¶
func (fs *FloatSchema) Validate(message []byte) error
type HashingScheme ¶
type HashingScheme int
const ( JavaStringHash HashingScheme = iota // Java String.hashCode() equivalent Murmur3_32Hash // Use Murmur3 hashing function BoostHash // C++ based boost::hash )
type InitialPosition ¶
type InitialPosition int
const ( // Latest position which means the start consuming position will be the last message Latest InitialPosition = iota // Earliest position which means the start consuming position will be the first message Earliest )
type Int16Schema ¶
type Int16Schema struct {
SchemaInfo
}
func NewInt16Schema ¶
func NewInt16Schema(properties map[string]string) *Int16Schema
func (*Int16Schema) Decode ¶
func (is16 *Int16Schema) Decode(data []byte, v interface{}) error
func (*Int16Schema) Encode ¶
func (is16 *Int16Schema) Encode(value interface{}) ([]byte, error)
func (*Int16Schema) GetSchemaInfo ¶
func (is16 *Int16Schema) GetSchemaInfo() *SchemaInfo
func (*Int16Schema) Validate ¶
func (is16 *Int16Schema) Validate(message []byte) error
type Int32Schema ¶
type Int32Schema struct {
SchemaInfo
}
func NewInt32Schema ¶
func NewInt32Schema(properties map[string]string) *Int32Schema
func (*Int32Schema) Decode ¶
func (is32 *Int32Schema) Decode(data []byte, v interface{}) error
func (*Int32Schema) Encode ¶
func (is32 *Int32Schema) Encode(value interface{}) ([]byte, error)
func (*Int32Schema) GetSchemaInfo ¶
func (is32 *Int32Schema) GetSchemaInfo() *SchemaInfo
func (*Int32Schema) Validate ¶
func (is32 *Int32Schema) Validate(message []byte) error
type Int64Schema ¶
type Int64Schema struct {
SchemaInfo
}
func NewInt64Schema ¶
func NewInt64Schema(properties map[string]string) *Int64Schema
func (*Int64Schema) Decode ¶
func (is64 *Int64Schema) Decode(data []byte, v interface{}) error
func (*Int64Schema) Encode ¶
func (is64 *Int64Schema) Encode(value interface{}) ([]byte, error)
func (*Int64Schema) GetSchemaInfo ¶
func (is64 *Int64Schema) GetSchemaInfo() *SchemaInfo
func (*Int64Schema) Validate ¶
func (is64 *Int64Schema) Validate(message []byte) error
type Int8Schema ¶
type Int8Schema struct {
SchemaInfo
}
func NewInt8Schema ¶
func NewInt8Schema(properties map[string]string) *Int8Schema
func (*Int8Schema) Decode ¶
func (is8 *Int8Schema) Decode(data []byte, v interface{}) error
func (*Int8Schema) Encode ¶
func (is8 *Int8Schema) Encode(value interface{}) ([]byte, error)
func (*Int8Schema) GetSchemaInfo ¶
func (is8 *Int8Schema) GetSchemaInfo() *SchemaInfo
func (*Int8Schema) Validate ¶
func (is8 *Int8Schema) Validate(message []byte) error
type JsonSchema ¶
type JsonSchema struct { AvroCodec SchemaInfo }
func NewJsonSchema ¶
func NewJsonSchema(jsonAvroSchemaDef string, properties map[string]string) *JsonSchema
func (*JsonSchema) Decode ¶
func (js *JsonSchema) Decode(data []byte, v interface{}) error
func (*JsonSchema) Encode ¶
func (js *JsonSchema) Encode(data interface{}) ([]byte, error)
func (*JsonSchema) GetSchemaInfo ¶
func (js *JsonSchema) GetSchemaInfo() *SchemaInfo
func (*JsonSchema) Validate ¶
func (js *JsonSchema) Validate(message []byte) error
type Message ¶
type Message interface { // Get the topic from which this message originated from Topic() string // Return the properties attached to the message. // Properties are application defined key/value pairs that will be attached to the message Properties() map[string]string // Get the payload of the message Payload() []byte // Get the unique message ID associated with this message. // The message id can be used to univocally refer to a message without having the keep the entire payload in memory. ID() MessageID // Get the publish time of this message. The publish time is the timestamp that a client publish the message. PublishTime() time.Time // Get the event time associated with this message. It is typically set by the applications via // `ProducerMessage.EventTime`. // If there isn't any event time associated with this event, it will be nil. EventTime() *time.Time // Get the key of the message, if any Key() string //Get the de-serialized value of the message, according the configured GetValue(v interface{}) error }
type MessageID ¶
type MessageID interface { // Serialize the message id into a sequence of bytes that can be stored somewhere else Serialize() []byte }
Identifier for a particular message
func DeserializeMessageID ¶
Reconstruct a MessageID object from its serialized representation
type MessageRoutingMode ¶
type MessageRoutingMode int
const ( // Publish messages across all partitions in round-robin. RoundRobinDistribution MessageRoutingMode = iota // The producer will chose one single partition and publish all the messages into that partition UseSinglePartition // Use custom message router implementation that will be called to determine the partition for a particular message. CustomPartition )
type Producer ¶
type Producer interface { // return the topic to which producer is publishing to Topic() string // return the producer name which could have been assigned by the system or specified by the client Name() string // Send a message // This call will be blocking until is successfully acknowledged by the Pulsar broker. // Example: // producer.Send(ctx, pulsar.ProducerMessage{ Payload: myPayload }) Send(context.Context, ProducerMessage) error // Send a message in asynchronous mode // The callback will report back the message being published and // the eventual error in publishing SendAsync(context.Context, ProducerMessage, func(ProducerMessage, error)) // Get the last sequence id that was published by this producer. // This represent either the automatically assigned or custom sequence id (set on the ProducerMessage) that // was published and acknowledged by the broker. // After recreating a producer with the same producer name, this will return the last message that was // published in the previous producer session, or -1 if there no message was ever published. // return the last sequence id published by this producer. LastSequenceID() int64 // Flush all the messages buffered in the client and wait until all messages have been successfully // persisted. Flush() error // Close the producer and releases resources allocated // No more writes will be accepted from this producer. Waits until all pending write request are persisted. In case // of errors, pending writes will not be retried. Close() error Schema() Schema }
The producer is used to publish messages on a topic
type ProducerMessage ¶
type ProducerMessage struct { // Payload for the message Payload []byte //Value and payload is mutually exclusive, `Value interface{}` for schema message. Value interface{} // Sets the key of the message for routing policy Key string // Attach application defined properties on the message Properties map[string]string // Set the event time for a given message EventTime time.Time // Override the replication clusters for this message. ReplicationClusters []string // Set the sequence id to assign to the current message SequenceID int64 }
type ProducerOptions ¶
type ProducerOptions struct { // Specify the topic this producer will be publishing on. // This argument is required when constructing the producer. Topic string // Specify a name for the producer // If not assigned, the system will generate a globally unique name which can be access with // Producer.ProducerName(). // When specifying a name, it is up to the user to ensure that, for a given topic, the producer name is unique // across all Pulsar's clusters. Brokers will enforce that only a single producer a given name can be publishing on // a topic. Name string // Attach a set of application defined properties to the producer // This properties will be visible in the topic stats Properties map[string]string // Set the send timeout (default: 30 seconds) // If a message is not acknowledged by the server before the sendTimeout expires, an error will be reported. // Setting the timeout to -1, will set the timeout to infinity, which can be useful when using Pulsar's message // deduplication feature. SendTimeout time.Duration // Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker. // When the queue is full, by default, all calls to Producer.send() and Producer.sendAsync() will fail // unless `BlockIfQueueFull` is set to true. Use BlockIfQueueFull(boolean) to change the blocking behavior. MaxPendingMessages int // Set the number of max pending messages across all the partitions // This setting will be used to lower the max pending messages for each partition // `MaxPendingMessages(int)`, if the total exceeds the configured value. MaxPendingMessagesAcrossPartitions int // Set whether the `Producer.Send()` and `Producer.sendAsync()` operations should block when the outgoing // message queue is full. Default is `false`. If set to `false`, send operations will immediately fail with // `ProducerQueueIsFullError` when there is no space left in pending queue. BlockIfQueueFull bool // Set the message routing mode for the partitioned producer. // Default routing mode is round-robin routing. // // This logic is applied when the application is not setting a key ProducerMessage#setKey(String) on a // particular message. MessageRoutingMode // Change the `HashingScheme` used to chose the partition on where to publish a particular message. // Standard hashing functions available are: // // - `JavaStringHash` : Java String.hashCode() equivalent // - `Murmur3_32Hash` : Use Murmur3 hashing function. // https://en.wikipedia.org/wiki/MurmurHash">https://en.wikipedia.org/wiki/MurmurHash // - `BoostHash` : C++ based boost::hash // // Default is `JavaStringHash`. HashingScheme // Set the compression type for the producer. // By default, message payloads are not compressed. Supported compression types are: // - LZ4 // - ZLIB // - ZSTD // - SNAPPY // // Note: ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that // release in order to be able to receive messages compressed with ZSTD. // // Note: SNAPPY is supported since Pulsar 2.4. Consumers will need to be at least at that // release in order to be able to receive messages compressed with SNAPPY. CompressionType // Set a custom message routing policy by passing an implementation of MessageRouter // The router is a function that given a particular message and the topic metadata, returns the // partition index where the message should be routed to MessageRouter func(Message, TopicMetadata) int // Control whether automatic batching of messages is enabled for the producer. Default: false [No batching] // // When batching is enabled, multiple calls to Producer.sendAsync can result in a single batch to be sent to the // broker, leading to better throughput, especially when publishing small messages. If compression is enabled, // messages will be compressed at the batch level, leading to a much better compression ratio for similar headers or // contents. // // When enabled default batch delay is set to 1 ms and default batch size is 1000 messages Batching bool // Set the time period within which the messages sent will be batched (default: 10ms) if batch messages are // enabled. If set to a non zero value, messages will be queued until this time interval or until BatchingMaxPublishDelay time.Duration // Set the maximum number of messages permitted in a batch. (default: 1000) If set to a value greater than 1, // messages will be queued until this threshold is reached or batch interval has elapsed BatchingMaxMessages uint }
type ProtoSchema ¶
type ProtoSchema struct { AvroCodec SchemaInfo }
func NewProtoSchema ¶
func NewProtoSchema(protoAvroSchemaDef string, properties map[string]string) *ProtoSchema
func (*ProtoSchema) Decode ¶
func (ps *ProtoSchema) Decode(data []byte, v interface{}) error
func (*ProtoSchema) Encode ¶
func (ps *ProtoSchema) Encode(data interface{}) ([]byte, error)
func (*ProtoSchema) GetSchemaInfo ¶
func (ps *ProtoSchema) GetSchemaInfo() *SchemaInfo
func (*ProtoSchema) Validate ¶
func (ps *ProtoSchema) Validate(message []byte) error
type Reader ¶
type Reader interface { // The topic from which this reader is reading from Topic() string // Read the next message in the topic, blocking until a message is available Next(context.Context) (Message, error) // Check if there is any message available to read from the current position HasNext() (bool, error) // Close the reader and stop the broker to push more messages Close() error Schema() Schema }
A Reader can be used to scan through all the messages currently available in a topic.
type ReaderMessage ¶
type ReaderOptions ¶
type ReaderOptions struct { // Specify the topic this consumer will subscribe on. // This argument is required when constructing the reader. Topic string // Set the reader name. Name string // The initial reader positioning is done by specifying a message id. The options are: // * `pulsar.EarliestMessage` : Start reading from the earliest message available in the topic // * `pulsar.LatestMessage` : Start reading from the end topic, only getting messages published after the // reader was created // * `MessageID` : Start reading from a particular message id, the reader will position itself on that // specific position. The first message to be read will be the message next to the specified // messageID StartMessageID MessageID // Sets a `MessageChannel` for the consumer // When a message is received, it will be pushed to the channel for consumption MessageChannel chan ReaderMessage // Sets the size of the consumer receive queue. // The consumer receive queue controls how many messages can be accumulated by the Reader before the // application calls Reader.readNext(). Using a higher value could potentially increase the consumer // throughput at the expense of bigger memory utilization. // // Default value is {@code 1000} messages and should be good for most use cases. ReceiverQueueSize int // Set the subscription role prefix. The default prefix is "reader". SubscriptionRolePrefix string // If enabled, the reader will read messages from the compacted topic rather than reading the full message backlog // of the topic. This means that, if the topic has been compacted, the reader will only see the latest value for // each key in the topic, up until the point in the topic message backlog that has been compacted. Beyond that // point, the messages will be sent as normal. // // ReadCompacted can only be enabled when reading from a persistent topic. Attempting to enable it on non-persistent // topics will lead to the reader create call throwing a PulsarClientException. ReadCompacted bool }
type Result ¶
type Result int
const ( UnknownError Result = 1 // Unknown error happened on broker InvalidConfiguration Result = 2 // Invalid configuration TimeoutError Result = 3 // Operation timed out LookupError Result = 4 // Broker lookup failed ConnectError Result = 5 // Failed to connect to broker ReadError Result = 6 // Failed to read from socket AuthenticationError Result = 7 // Authentication failed on broker AuthorizationError Result = 8 // Client is not authorized to create producer/consumer ErrorGettingAuthenticationData Result = 9 // Client cannot find authorization data BrokerMetadataError Result = 10 // Broker failed in updating metadata BrokerPersistenceError Result = 11 // Broker failed to persist entry ChecksumError Result = 12 // Corrupt message checksum failure ConsumerBusy Result = 13 // Exclusive consumer is already connected NotConnectedError Result = 14 // Producer/Consumer is not currently connected to broker AlreadyClosedError Result = 15 // Producer/Consumer is already closed and not accepting any operation InvalidMessage Result = 16 // Error in publishing an already used message ConsumerNotInitialized Result = 17 // Consumer is not initialized ProducerNotInitialized Result = 18 // Producer is not initialized TooManyLookupRequestException Result = 19 // Too Many concurrent LookupRequest InvalidTopicName Result = 20 // Invalid topic name InvalidUrl Result = 21 // Client Initialized with Invalid Broker Url (VIP Url passed to Client Constructor) ServiceUnitNotReady Result = 22 // Service Unit unloaded between client did lookup and producer/consumer got created OperationNotSupported Result = 23 ProducerBlockedQuotaExceededError Result = 24 // Producer is blocked ProducerBlockedQuotaExceededException Result = 25 // Producer is getting exception ProducerQueueIsFull Result = 26 // Producer queue is full MessageTooBig Result = 27 // Trying to send a messages exceeding the max size TopicNotFound Result = 28 // Topic not found SubscriptionNotFound Result = 29 // Subscription not found ConsumerNotFound Result = 30 // Consumer not found UnsupportedVersionError Result = 31 // Error when an older client/version doesn't support a required feature TopicTerminated Result = 32 // Topic was already terminated CryptoError Result = 33 // Error when crypto operation fails )
type SchemaInfo ¶
type SchemaInfo struct { Name string Schema string Type SchemaType Properties map[string]string }
Encapsulates data around the schema definition
type SchemaType ¶
type SchemaType int
const ( NONE SchemaType = iota //No schema defined STRING //Simple String encoding with UTF-8 JSON //JSON object encoding and validation PROTOBUF //Protobuf message encoding and decoding AVRO //Serialize and deserialize via Avro BOOLEAN // INT8 //A 8-byte integer. INT16 //A 16-byte integer. INT32 //A 32-byte integer. INT64 //A 64-byte integer. FLOAT //A float number. DOUBLE //A double number KEY_VALUE //A Schema that contains Key Schema and Value Schema. BYTES = -1 //A bytes array. AUTO = -2 // AUTO_CONSUME = -3 //Auto Consume Type. AUTO_PUBLISH = -4 // Auto Publish Type. )
type StringSchema ¶
type StringSchema struct {
SchemaInfo
}
func NewStringSchema ¶
func NewStringSchema(properties map[string]string) *StringSchema
func (*StringSchema) Decode ¶
func (ss *StringSchema) Decode(data []byte, v interface{}) error
func (*StringSchema) Encode ¶
func (ss *StringSchema) Encode(v interface{}) ([]byte, error)
func (*StringSchema) GetSchemaInfo ¶
func (ss *StringSchema) GetSchemaInfo() *SchemaInfo
func (*StringSchema) Validate ¶
func (ss *StringSchema) Validate(message []byte) error
type SubscriptionType ¶
type SubscriptionType int
Types of subscription supported by Pulsar
const ( // There can be only 1 consumer on the same topic with the same subscription name Exclusive SubscriptionType = iota // a round-robin rotation between the connected consumers Shared // Multiple consumer will be able to use the same subscription name but only 1 consumer will receive the messages. // If that consumer disconnects, one of the other connected consumers will start receiving messages. Failover // will be dispatched to only one consumer KeyShared )
type TopicMetadata ¶
type TopicMetadata interface { // Get the number of partitions for the specific topic NumPartitions() int }