Versions in this module Expand all Collapse all v0 v0.13.2 Jun 26, 2024 v0.13.1 Jun 26, 2024 v0.13.0 Jun 1, 2024 Changes in this version + const DlqTopicSuffix + const IoMaxSize + const MaxReconsumeTimes + const PropertyOriginMessageID + const RetryTopicSuffix + const SysPropertyDelayTime + const SysPropertyOriginMessageID + const SysPropertyRealTopic + const SysPropertyReconsumeTimes + const SysPropertyRetryTopic + const TransactionCoordinatorAssign + var ErrContextExpired = newError(TimeoutError, "message send context expired") + var ErrFailAddToBatch = newError(AddToBatchFailed, "message add to batch failed") + var ErrInvalidAck = errors.New("invalid ack") + var ErrInvalidMessage = newError(InvalidMessage, "invalid message") + var ErrMemoryBufferIsFull = newError(ClientMemoryBufferIsFull, "client memory buffer is full") + var ErrMessageTooLarge = newError(MessageTooBig, "message size exceeds MaxMessageSize") + var ErrMetaTooLarge = newError(InvalidMessage, "message metadata size exceeds MaxMessageSize") + var ErrProducerBlockedQuotaExceeded = newError(ProducerBlockedQuotaExceededException, "producer blocked") + var ErrProducerClosed = newError(ProducerClosed, "producer already been closed") + var ErrProducerFenced = newError(ProducerFenced, "producer fenced") + var ErrSchema = newError(SchemaFailure, "schema error") + var ErrSendQueueIsFull = newError(ProducerQueueIsFull, "producer send queue is full") + var ErrSendTimeout = newError(TimeoutError, "message send timeout") + var ErrTopicNotfound = newError(TopicNotFound, "topic not found") + var ErrTopicTerminated = newError(TopicTerminated, "topic terminated") + var ErrTransaction = errors.New("transaction error") + func GetBatcherBuilderProvider(typ BatcherBuilderType) (internal.BatcherBuilderProvider, error) + func NewDefaultRouter(hashFunc func(string) uint32, maxBatchingMessages uint, maxBatchingSize uint, ...) func(*ProducerMessage, uint32) int + func NewSinglePartitionRouter() func(*ProducerMessage, TopicMetadata) int + func ReadElements(r io.Reader, elements ...interface{}) error + func WriteElements(w io.Writer, elements ...interface{}) error + type AckGroupingOptions struct + MaxSize uint32 + MaxTime time.Duration + type Authentication interface + func NewAuthentication(name string, params string) (Authentication, error) + func NewAuthenticationAthenz(authParams map[string]string) Authentication + func NewAuthenticationBasic(username, password string) (Authentication, error) + func NewAuthenticationFromTLSCertSupplier(tlsCertSupplier func() (*tls.Certificate, error)) Authentication + func NewAuthenticationOAuth2(authParams map[string]string) Authentication + func NewAuthenticationTLS(certificatePath string, privateKeyPath string) Authentication + func NewAuthenticationToken(token string) Authentication + func NewAuthenticationTokenFromFile(tokenFilePath string) Authentication + func NewAuthenticationTokenFromSupplier(tokenSupplier func() (string, error)) Authentication + type AvroCodec struct + Codec *goavro.Codec + func NewSchemaDefinition(schema *goavro.Codec) *AvroCodec + type AvroSchema struct + func NewAvroSchema(avroSchemaDef string, properties map[string]string) *AvroSchema + func NewAvroSchemaWithValidation(avroSchemaDef string, properties map[string]string) (*AvroSchema, error) + func (as *AvroSchema) Decode(data []byte, v interface{}) error + func (as *AvroSchema) Encode(data interface{}) ([]byte, error) + func (as *AvroSchema) GetSchemaInfo() *SchemaInfo + func (as *AvroSchema) Validate(message []byte) error + type BatcherBuilderType int + const DefaultBatchBuilder + const KeyBasedBatchBuilder + type BinaryFreeList chan []byte + var BinarySerializer BinaryFreeList = make(chan []byte, IoMaxSize) + func (b BinaryFreeList) Borrow() (buf []byte) + func (b BinaryFreeList) Float32(buf []byte) (float32, error) + func (b BinaryFreeList) Float64(buf []byte) (float64, error) + func (b BinaryFreeList) PutDouble(datum interface{}) ([]byte, error) + func (b BinaryFreeList) PutFloat(datum interface{}) ([]byte, error) + func (b BinaryFreeList) PutUint16(w io.Writer, byteOrder binary.ByteOrder, val uint16) error + func (b BinaryFreeList) PutUint32(w io.Writer, byteOrder binary.ByteOrder, val uint32) error + func (b BinaryFreeList) PutUint64(w io.Writer, byteOrder binary.ByteOrder, val uint64) error + func (b BinaryFreeList) PutUint8(w io.Writer, val uint8) error + func (b BinaryFreeList) Return(buf []byte) + func (b BinaryFreeList) Uint16(r io.Reader, byteOrder binary.ByteOrder) (uint16, error) + func (b BinaryFreeList) Uint32(r io.Reader, byteOrder binary.ByteOrder) (uint32, error) + func (b BinaryFreeList) Uint64(r io.Reader, byteOrder binary.ByteOrder) (uint64, error) + func (b BinaryFreeList) Uint8(r io.Reader) (uint8, error) + type BytesSchema struct + func NewBytesSchema(properties map[string]string) *BytesSchema + func (bs *BytesSchema) Decode(data []byte, v interface{}) error + func (bs *BytesSchema) Encode(data interface{}) ([]byte, error) + func (bs *BytesSchema) GetSchemaInfo() *SchemaInfo + func (bs *BytesSchema) Validate(message []byte) error + type Client interface + Close func() + CreateProducer func(ProducerOptions) (Producer, error) + CreateReader func(ReaderOptions) (Reader, error) + CreateTableView func(TableViewOptions) (TableView, error) + NewTransaction func(duration time.Duration) (Transaction, error) + Subscribe func(ConsumerOptions) (Consumer, error) + TopicPartitions func(topic string) ([]string, error) + func NewClient(options ClientOptions) (Client, error) + type ClientOptions struct + ConnectionMaxIdleTime time.Duration + ConnectionTimeout time.Duration + CustomMetricsLabels map[string]string + EnableTransaction bool + KeepAliveInterval time.Duration + ListenerName string + Logger log.Logger + MaxConnectionsPerBroker int + MemoryLimitBytes int64 + MetricsCardinality MetricsCardinality + MetricsRegisterer prometheus.Registerer + OperationTimeout time.Duration + TLSAllowInsecureConnection bool + TLSCertificateFile string + TLSCipherSuites []uint16 + TLSKeyFilePath string + TLSMaxVersion uint16 + TLSMinVersion uint16 + TLSTrustCertsFilePath string + TLSValidateHostname bool + URL string + type CompressionLevel int + const Better + const Default + const Faster + type CompressionType int + const LZ4 + const NoCompression + const ZLib + const ZSTD + type Consumer interface + Ack func(Message) error + AckCumulative func(msg Message) error + AckID func(MessageID) error + AckIDCumulative func(msgID MessageID) error + AckWithTxn func(Message, Transaction) error + Chan func() <-chan ConsumerMessage + Close func() + GetLastMessageIDs func() ([]TopicMessageID, error) + Nack func(Message) + NackID func(MessageID) + Name func() string + Receive func(context.Context) (Message, error) + ReconsumeLater func(msg Message, delay time.Duration) + ReconsumeLaterWithCustomProperties func(msg Message, customProperties map[string]string, delay time.Duration) + Seek func(MessageID) error + SeekByTime func(time time.Time) error + Subscription func() string + Unsubscribe func() error + UnsubscribeForce func() error + type ConsumerEventListener interface + BecameActive func(consumer Consumer, topicName string, partition int32) + BecameInactive func(consumer Consumer, topicName string, partition int32) + type ConsumerInterceptor interface + BeforeConsume func(message ConsumerMessage) + OnAcknowledge func(consumer Consumer, msgID MessageID) + OnNegativeAcksSend func(consumer Consumer, msgIDs []MessageID) + type ConsumerInterceptors []ConsumerInterceptor + func (x ConsumerInterceptors) BeforeConsume(message ConsumerMessage) + func (x ConsumerInterceptors) OnAcknowledge(consumer Consumer, msgID MessageID) + func (x ConsumerInterceptors) OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID) + type ConsumerMessage struct + type ConsumerOptions struct + AckGroupingOptions *AckGroupingOptions + AckWithResponse bool + AutoAckIncompleteChunk bool + AutoDiscoveryPeriod time.Duration + BackoffPolicy internal.BackoffPolicy + DLQ *DLQPolicy + Decryption *MessageDecryptionInfo + EnableAutoScaledReceiverQueueSize bool + EnableBatchIndexAcknowledgment bool + EnableDefaultNackBackoffPolicy bool + EventListener ConsumerEventListener + ExpireTimeOfIncompleteChunk time.Duration + Interceptors ConsumerInterceptors + KeySharedPolicy *KeySharedPolicy + MaxPendingChunkedMessage int + MaxReconnectToBroker *uint + MessageChannel chan ConsumerMessage + NackBackoffPolicy NackBackoffPolicy + NackRedeliveryDelay time.Duration + Name string + Properties map[string]string + ReadCompacted bool + ReceiverQueueSize int + ReplicateSubscriptionState bool + RetryEnable bool + Schema Schema + StartMessageIDInclusive bool + SubscriptionMode SubscriptionMode + SubscriptionName string + SubscriptionProperties map[string]string + Topic string + Topics []string + TopicsPattern string + Type SubscriptionType + type DLQPolicy struct + DeadLetterTopic string + MaxDeliveries uint32 + ProducerOptions ProducerOptions + RetryLetterTopic string + type DoubleSchema struct + func NewDoubleSchema(properties map[string]string) *DoubleSchema + func (ds *DoubleSchema) Decode(data []byte, v interface{}) error + func (ds *DoubleSchema) Encode(value interface{}) ([]byte, error) + func (ds *DoubleSchema) GetSchemaInfo() *SchemaInfo + func (ds *DoubleSchema) Validate(message []byte) error + type EncryptionContext struct + Algorithm string + BatchSize int + CompressionType CompressionType + Keys map[string]EncryptionKey + Param []byte + UncompressedSize int + type EncryptionKey struct + KeyValue []byte + Metadata map[string]string + type Error struct + func (e *Error) Error() string + func (e *Error) Result() Result + type FloatSchema struct + func NewFloatSchema(properties map[string]string) *FloatSchema + func (fs *FloatSchema) Decode(data []byte, v interface{}) error + func (fs *FloatSchema) Encode(value interface{}) ([]byte, error) + func (fs *FloatSchema) GetSchemaInfo() *SchemaInfo + func (fs *FloatSchema) Validate(message []byte) error + type HashingScheme int + const JavaStringHash + const Murmur3_32Hash + type Int16Schema struct + func NewInt16Schema(properties map[string]string) *Int16Schema + func (is16 *Int16Schema) Decode(data []byte, v interface{}) error + func (is16 *Int16Schema) Encode(value interface{}) ([]byte, error) + func (is16 *Int16Schema) GetSchemaInfo() *SchemaInfo + func (is16 *Int16Schema) Validate(message []byte) error + type Int32Schema struct + func NewInt32Schema(properties map[string]string) *Int32Schema + func (is32 *Int32Schema) Decode(data []byte, v interface{}) error + func (is32 *Int32Schema) Encode(value interface{}) ([]byte, error) + func (is32 *Int32Schema) GetSchemaInfo() *SchemaInfo + func (is32 *Int32Schema) Validate(message []byte) error + type Int64Schema struct + func NewInt64Schema(properties map[string]string) *Int64Schema + func (is64 *Int64Schema) Decode(data []byte, v interface{}) error + func (is64 *Int64Schema) Encode(value interface{}) ([]byte, error) + func (is64 *Int64Schema) GetSchemaInfo() *SchemaInfo + func (is64 *Int64Schema) Validate(message []byte) error + type Int8Schema struct + func NewInt8Schema(properties map[string]string) *Int8Schema + func (is8 *Int8Schema) Decode(data []byte, v interface{}) error + func (is8 *Int8Schema) Encode(value interface{}) ([]byte, error) + func (is8 *Int8Schema) GetSchemaInfo() *SchemaInfo + func (is8 *Int8Schema) Validate(message []byte) error + type JSONSchema struct + func NewJSONSchema(jsonAvroSchemaDef string, properties map[string]string) *JSONSchema + func NewJSONSchemaWithValidation(jsonAvroSchemaDef string, properties map[string]string) (*JSONSchema, error) + func (js *JSONSchema) Decode(data []byte, v interface{}) error + func (js *JSONSchema) Encode(data interface{}) ([]byte, error) + func (js *JSONSchema) GetSchemaInfo() *SchemaInfo + func (js *JSONSchema) Validate(message []byte) error + type KeySharedPolicy struct + AllowOutOfOrderDelivery bool + HashRanges []int + Mode KeySharedPolicyMode + func NewKeySharedPolicySticky(hashRanges []int) (*KeySharedPolicy, error) + type KeySharedPolicyMode int + const KeySharedPolicyModeAutoSplit + const KeySharedPolicyModeSticky + type Message interface + BrokerPublishTime func() *time.Time + EventTime func() time.Time + GetEncryptionContext func() *EncryptionContext + GetReplicatedFrom func() string + GetSchemaValue func(v interface{}) error + ID func() MessageID + Index func() *uint64 + IsReplicated func() bool + Key func() string + OrderingKey func() string + Payload func() []byte + ProducerName func() string + Properties func() map[string]string + PublishTime func() time.Time + RedeliveryCount func() uint32 + SchemaVersion func() []byte + Topic func() string + type MessageDecryptionInfo struct + ConsumerCryptoFailureAction int + KeyReader crypto.KeyReader + MessageCrypto crypto.MessageCrypto + type MessageID interface + BatchIdx func() int32 + BatchSize func() int32 + EntryID func() int64 + LedgerID func() int64 + PartitionIdx func() int32 + Serialize func() []byte + String func() string + func DeserializeMessageID(data []byte) (MessageID, error) + func EarliestMessageID() MessageID + func LatestMessageID() MessageID + func NewMessageID(ledgerID int64, entryID int64, batchIdx int32, partitionIdx int32) MessageID + type MetricsCardinality int + const MetricsCardinalityNamespace + const MetricsCardinalityNone + const MetricsCardinalityTenant + const MetricsCardinalityTopic + type NackBackoffPolicy interface + Next func(redeliveryCount uint32) time.Duration + type Producer interface + Close func() + Flush func() error + FlushWithCtx func(ctx context.Context) error + LastSequenceID func() int64 + Name func() string + Send func(context.Context, *ProducerMessage) (MessageID, error) + SendAsync func(context.Context, *ProducerMessage, func(MessageID, *ProducerMessage, error)) + Topic func() string + type ProducerAccessMode int + const ProducerAccessModeExclusive + const ProducerAccessModeShared + const ProducerAccessModeWaitForExclusive + type ProducerEncryptionInfo struct + KeyReader crypto.KeyReader + Keys []string + MessageCrypto crypto.MessageCrypto + ProducerCryptoFailureAction int + type ProducerInterceptor interface + BeforeSend func(producer Producer, message *ProducerMessage) + OnSendAcknowledgement func(producer Producer, message *ProducerMessage, msgID MessageID) + type ProducerInterceptors []ProducerInterceptor + func (x ProducerInterceptors) BeforeSend(producer Producer, message *ProducerMessage) + func (x ProducerInterceptors) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) + type ProducerMessage struct + DeliverAfter time.Duration + DeliverAt time.Time + DisableReplication bool + EventTime time.Time + Key string + OrderingKey string + Payload []byte + Properties map[string]string + ReplicationClusters []string + Schema Schema + SequenceID *int64 + Transaction Transaction + Value interface{} + type ProducerOptions struct + BackoffPolicy internal.BackoffPolicy + BatchingMaxMessages uint + BatchingMaxPublishDelay time.Duration + BatchingMaxSize uint + ChunkMaxMessageSize uint + DisableBatching bool + DisableBlockIfQueueFull bool + DisableMultiSchema bool + EnableChunking bool + Encryption *ProducerEncryptionInfo + Interceptors ProducerInterceptors + MaxPendingMessages int + MaxReconnectToBroker *uint + MessageRouter func(*ProducerMessage, TopicMetadata) int + Name string + PartitionsAutoDiscoveryInterval time.Duration + Properties map[string]string + Schema Schema + SendTimeout time.Duration + Topic string + type ProtoNativeSchema struct + func NewProtoNativeSchemaWithMessage(message proto.Message, properties map[string]string) *ProtoNativeSchema + func (ps *ProtoNativeSchema) Decode(data []byte, v interface{}) error + func (ps *ProtoNativeSchema) Encode(data interface{}) ([]byte, error) + func (ps *ProtoNativeSchema) GetSchemaInfo() *SchemaInfo + func (ps *ProtoNativeSchema) Validate(message []byte) error + type ProtoNativeSchemaData struct + FileDescriptorSet []byte + RootFileDescriptorName string + RootMessageTypeName string + type ProtoSchema struct + func NewProtoSchema(protoAvroSchemaDef string, properties map[string]string) *ProtoSchema + func NewProtoSchemaWithValidation(protoAvroSchemaDef string, properties map[string]string) (*ProtoSchema, error) + func (ps *ProtoSchema) Decode(data []byte, v interface{}) error + func (ps *ProtoSchema) Encode(data interface{}) ([]byte, error) + func (ps *ProtoSchema) GetSchemaInfo() *SchemaInfo + func (ps *ProtoSchema) Validate(message []byte) error + type Reader interface + Close func() + GetLastMessageID func() (MessageID, error) + HasNext func() bool + Next func(context.Context) (Message, error) + Seek func(MessageID) error + SeekByTime func(time time.Time) error + Topic func() string + type ReaderMessage struct + type ReaderOptions struct + AutoAckIncompleteChunk bool + BackoffPolicy internal.BackoffPolicy + Decryption *MessageDecryptionInfo + ExpireTimeOfIncompleteChunk time.Duration + MaxPendingChunkedMessage int + MessageChannel chan ReaderMessage + Name string + Properties map[string]string + ReadCompacted bool + ReceiverQueueSize int + Schema Schema + StartMessageID MessageID + StartMessageIDInclusive bool + SubscriptionName string + SubscriptionRolePrefix string + Topic string + type Result int + const AddToBatchFailed + const AlreadyClosedError + const AuthenticationError + const AuthorizationError + const BrokerMetadataError + const BrokerPersistenceError + const ChecksumError + const ClientMemoryBufferIsFull + const ConnectError + const ConsumerBusy + const ConsumerClosed + const ConsumerNotFound + const ConsumerNotInitialized + const CryptoError + const ErrorGettingAuthenticationData + const InvalidBatchBuilderType + const InvalidConfiguration + const InvalidMessage + const InvalidStatus + const InvalidTopicName + const InvalidURL + const LookupError + const MessageTooBig + const NotConnectedError + const Ok + const OperationNotSupported + const ProducerBlockedQuotaExceededError + const ProducerBlockedQuotaExceededException + const ProducerClosed + const ProducerFenced + const ProducerNotInitialized + const ProducerQueueIsFull + const ReadError + const SchemaFailure + const SeekFailed + const ServiceUnitNotReady + const SubscriptionNotFound + const TimeoutError + const TooManyLookupRequestException + const TopicNotFound + const TopicTerminated + const TransactionNoFoundError + const UnknownError + const UnsupportedVersionError + type RetryMessage struct + type Schema interface + Decode func(data []byte, v interface{}) error + Encode func(v interface{}) ([]byte, error) + GetSchemaInfo func() *SchemaInfo + Validate func(message []byte) error + func NewSchema(schemaType SchemaType, schemaData []byte, properties map[string]string) (schema Schema, err error) + type SchemaInfo struct + Name string + Properties map[string]string + Schema string + Type SchemaType + type SchemaType int + const AUTO + const AVRO + const AutoConsume + const AutoPublish + const BOOLEAN + const BYTES + const DOUBLE + const FLOAT + const INT16 + const INT32 + const INT64 + const INT8 + const JSON + const KeyValue + const NONE + const PROTOBUF + const ProtoNative + const STRING + type StringSchema struct + func NewStringSchema(properties map[string]string) *StringSchema + func (ss *StringSchema) Decode(data []byte, v interface{}) error + func (ss *StringSchema) Encode(v interface{}) ([]byte, error) + func (ss *StringSchema) GetSchemaInfo() *SchemaInfo + func (ss *StringSchema) Validate(message []byte) error + type SubscriptionInitialPosition int + const SubscriptionPositionEarliest + const SubscriptionPositionLatest + type SubscriptionMode int + const Durable + const NonDurable + type SubscriptionType int + const Exclusive + const Failover + const KeyShared + const Shared + type TableView interface + Close func() + ContainsKey func(key string) bool + Entries func() map[string]interface{} + ForEach func(func(string, interface{}) error) error + ForEachAndListen func(func(string, interface{}) error) error + Get func(key string) interface{} + IsEmpty func() bool + Keys func() []string + Size func() int + type TableViewImpl struct + func (tv *TableViewImpl) Close() + func (tv *TableViewImpl) ContainsKey(key string) bool + func (tv *TableViewImpl) Entries() map[string]interface{} + func (tv *TableViewImpl) ForEach(action func(string, interface{}) error) error + func (tv *TableViewImpl) ForEachAndListen(action func(string, interface{}) error) error + func (tv *TableViewImpl) Get(key string) interface{} + func (tv *TableViewImpl) IsEmpty() bool + func (tv *TableViewImpl) Keys() []string + func (tv *TableViewImpl) Size() int + type TableViewOptions struct + AutoUpdatePartitionsInterval time.Duration + Logger log.Logger + Schema Schema + SchemaValueType reflect.Type + Topic string + type TopicMessageID interface + Topic func() string + type TopicMetadata interface + NumPartitions func() uint32 + type Transaction interface + Abort func(context.Context) error + Commit func(context.Context) error + GetState func() TxnState + GetTxnID func() TxnID + type TxnID struct + LeastSigBits uint64 + MostSigBits uint64 + type TxnState int32 + const TxnAborted + const TxnAborting + const TxnCommitted + const TxnCommitting + const TxnError + const TxnOpen + const TxnTimeout