Versions in this module Expand all Collapse all v0 v0.0.4 May 19, 2021 v0.0.3 May 17, 2021 v0.0.2 May 17, 2021 Changes in this version + const DlqTopicSuffix + const IoMaxSize + const MaxReconsumeTimes + const RetryTopicSuffix + const SysPropertyDelayTime + const SysPropertyOriginMessageID + const SysPropertyRealTopic + const SysPropertyReconsumeTimes + const SysPropertyRetryTopic + func GetBatcherBuilderProvider(typ BatcherBuilderType) (internal.BatcherBuilderProvider, error) + func NewDefaultRouter(hashFunc func(string) uint32, maxBatchingMessages uint, maxBatchingSize uint, ...) func(*ProducerMessage, uint32) int + func ReadElements(r io.Reader, elements ...interface{}) error + func WriteElements(w io.Writer, elements ...interface{}) error + type Authentication interface + func NewAuthentication(name string, params string) (Authentication, error) + func NewAuthenticationAthenz(authParams map[string]string) Authentication + func NewAuthenticationFromTLSCertSupplier(tlsCertSupplier func() (*tls.Certificate, error)) Authentication + func NewAuthenticationOAuth2(authParams map[string]string) Authentication + func NewAuthenticationTLS(certificatePath string, privateKeyPath string) Authentication + func NewAuthenticationToken(token string) Authentication + func NewAuthenticationTokenFromFile(tokenFilePath string) Authentication + func NewAuthenticationTokenFromSupplier(tokenSupplier func() (string, error)) Authentication + type AvroCodec struct + Codec *goavro.Codec + func NewSchemaDefinition(schema *goavro.Codec) *AvroCodec + type AvroSchema struct + func NewAvroSchema(avroSchemaDef string, properties map[string]string) *AvroSchema + func (as *AvroSchema) Decode(data []byte, v interface{}) error + func (as *AvroSchema) Encode(data interface{}) ([]byte, error) + func (as *AvroSchema) GetSchemaInfo() *SchemaInfo + func (as *AvroSchema) Validate(message []byte) error + type BatcherBuilderType int + const DefaultBatchBuilder + const KeyBasedBatchBuilder + type BinaryFreeList chan []byte + var BinarySerializer BinaryFreeList = make(chan []byte, IoMaxSize) + func (b BinaryFreeList) Borrow() (buf []byte) + func (b BinaryFreeList) Float32(buf []byte) (float32, error) + func (b BinaryFreeList) Float64(buf []byte) (float64, error) + func (b BinaryFreeList) PutDouble(datum interface{}) ([]byte, error) + func (b BinaryFreeList) PutFloat(datum interface{}) ([]byte, error) + func (b BinaryFreeList) PutUint16(w io.Writer, byteOrder binary.ByteOrder, val uint16) error + func (b BinaryFreeList) PutUint32(w io.Writer, byteOrder binary.ByteOrder, val uint32) error + func (b BinaryFreeList) PutUint64(w io.Writer, byteOrder binary.ByteOrder, val uint64) error + func (b BinaryFreeList) PutUint8(w io.Writer, val uint8) error + func (b BinaryFreeList) Return(buf []byte) + func (b BinaryFreeList) Uint16(r io.Reader, byteOrder binary.ByteOrder) (uint16, error) + func (b BinaryFreeList) Uint32(r io.Reader, byteOrder binary.ByteOrder) (uint32, error) + func (b BinaryFreeList) Uint64(r io.Reader, byteOrder binary.ByteOrder) (uint64, error) + func (b BinaryFreeList) Uint8(r io.Reader) (uint8, error) + type BytesSchema struct + func NewBytesSchema(properties map[string]string) *BytesSchema + func (bs *BytesSchema) Decode(data []byte, v interface{}) error + func (bs *BytesSchema) Encode(data interface{}) ([]byte, error) + func (bs *BytesSchema) GetSchemaInfo() *SchemaInfo + func (bs *BytesSchema) Validate(message []byte) error + type Client interface + Close func() + CreateProducer func(ProducerOptions) (Producer, error) + CreateReader func(ReaderOptions) (Reader, error) + Subscribe func(ConsumerOptions) (Consumer, error) + TopicPartitions func(topic string) ([]string, error) + func NewClient(options ClientOptions) (Client, error) + type ClientOptions struct + ConnectionTimeout time.Duration + CustomMetricsLabels map[string]string + ListenerName string + Logger log.Logger + MaxConnectionsPerBroker int + OperationTimeout time.Duration + TLSAllowInsecureConnection bool + TLSTrustCertsFilePath string + TLSValidateHostname bool + URL string + type CompressionLevel int + const Better + const Default + const Faster + type CompressionType int + const LZ4 + const NoCompression + const ZLib + const ZSTD + type Consumer interface + Ack func(Message) + AckID func(MessageID) + Chan func() <-chan ConsumerMessage + Close func() + Nack func(Message) + NackID func(MessageID) + Name func() string + Receive func(context.Context) (Message, error) + ReconsumeLater func(msg Message, delay time.Duration) + Seek func(MessageID) error + SeekByTime func(time time.Time) error + Subscription func() string + Unsubscribe func() error + type ConsumerInterceptor interface + BeforeConsume func(message ConsumerMessage) + OnAcknowledge func(consumer Consumer, msgID MessageID) + OnNegativeAcksSend func(consumer Consumer, msgIDs []MessageID) + type ConsumerInterceptors []ConsumerInterceptor + func (x ConsumerInterceptors) BeforeConsume(message ConsumerMessage) + func (x ConsumerInterceptors) OnAcknowledge(consumer Consumer, msgID MessageID) + func (x ConsumerInterceptors) OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID) + type ConsumerMessage struct + type ConsumerOptions struct + AutoDiscoveryPeriod time.Duration + DLQ *DLQPolicy + Interceptors ConsumerInterceptors + KeySharedPolicy *KeySharedPolicy + MaxReconnectToBroker *uint + MessageChannel chan ConsumerMessage + NackRedeliveryDelay time.Duration + Name string + Properties map[string]string + ReadCompacted bool + ReceiverQueueSize int + ReplicateSubscriptionState bool + RetryEnable bool + Schema Schema + SubscriptionName string + Topic string + Topics []string + TopicsPattern string + Type SubscriptionType + type DLQPolicy struct + DeadLetterTopic string + MaxDeliveries uint32 + RetryLetterTopic string + type DoubleSchema struct + func NewDoubleSchema(properties map[string]string) *DoubleSchema + func (ds *DoubleSchema) Decode(data []byte, v interface{}) error + func (ds *DoubleSchema) Encode(value interface{}) ([]byte, error) + func (ds *DoubleSchema) GetSchemaInfo() *SchemaInfo + func (ds *DoubleSchema) Validate(message []byte) error + type Error struct + func (e *Error) Error() string + func (e *Error) Result() Result + type FloatSchema struct + func NewFloatSchema(properties map[string]string) *FloatSchema + func (fs *FloatSchema) Decode(data []byte, v interface{}) error + func (fs *FloatSchema) Encode(value interface{}) ([]byte, error) + func (fs *FloatSchema) GetSchemaInfo() *SchemaInfo + func (fs *FloatSchema) Validate(message []byte) error + type HashingScheme int + const JavaStringHash + const Murmur3_32Hash + type Int16Schema struct + func NewInt16Schema(properties map[string]string) *Int16Schema + func (is16 *Int16Schema) Decode(data []byte, v interface{}) error + func (is16 *Int16Schema) Encode(value interface{}) ([]byte, error) + func (is16 *Int16Schema) GetSchemaInfo() *SchemaInfo + func (is16 *Int16Schema) Validate(message []byte) error + type Int32Schema struct + func NewInt32Schema(properties map[string]string) *Int32Schema + func (is32 *Int32Schema) Decode(data []byte, v interface{}) error + func (is32 *Int32Schema) Encode(value interface{}) ([]byte, error) + func (is32 *Int32Schema) GetSchemaInfo() *SchemaInfo + func (is32 *Int32Schema) Validate(message []byte) error + type Int64Schema struct + func NewInt64Schema(properties map[string]string) *Int64Schema + func (is64 *Int64Schema) Decode(data []byte, v interface{}) error + func (is64 *Int64Schema) Encode(value interface{}) ([]byte, error) + func (is64 *Int64Schema) GetSchemaInfo() *SchemaInfo + func (is64 *Int64Schema) Validate(message []byte) error + type Int8Schema struct + func NewInt8Schema(properties map[string]string) *Int8Schema + func (is8 *Int8Schema) Decode(data []byte, v interface{}) error + func (is8 *Int8Schema) Encode(value interface{}) ([]byte, error) + func (is8 *Int8Schema) GetSchemaInfo() *SchemaInfo + func (is8 *Int8Schema) Validate(message []byte) error + type JSONSchema struct + func NewJSONSchema(jsonAvroSchemaDef string, properties map[string]string) *JSONSchema + func (js *JSONSchema) Decode(data []byte, v interface{}) error + func (js *JSONSchema) Encode(data interface{}) ([]byte, error) + func (js *JSONSchema) GetSchemaInfo() *SchemaInfo + func (js *JSONSchema) Validate(message []byte) error + type KeySharedPolicy struct + AllowOutOfOrderDelivery bool + HashRanges []int + Mode KeySharedPolicyMode + func NewKeySharedPolicySticky(hashRanges []int) (*KeySharedPolicy, error) + type KeySharedPolicyMode int + const KeySharedPolicyModeAutoSplit + const KeySharedPolicyModeSticky + type Message interface + EventTime func() time.Time + GetReplicatedFrom func() string + GetSchemaValue func(v interface{}) error + ID func() MessageID + IsReplicated func() bool + Key func() string + OrderingKey func() string + Payload func() []byte + ProducerName func() string + Properties func() map[string]string + PublishTime func() time.Time + RedeliveryCount func() uint32 + Topic func() string + type MessageID interface + Serialize func() []byte + func DeserializeMessageID(data []byte) (MessageID, error) + func EarliestMessageID() MessageID + func LatestMessageID() MessageID + type Producer interface + Close func() + Flush func() error + LastSequenceID func() int64 + Name func() string + Send func(context.Context, *ProducerMessage) (MessageID, error) + SendAsync func(context.Context, *ProducerMessage, func(MessageID, *ProducerMessage, error)) + Topic func() string + type ProducerInterceptor interface + BeforeSend func(producer Producer, message *ProducerMessage) + OnSendAcknowledgement func(producer Producer, message *ProducerMessage, msgID MessageID) + type ProducerInterceptors []ProducerInterceptor + func (x ProducerInterceptors) BeforeSend(producer Producer, message *ProducerMessage) + func (x ProducerInterceptors) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) + type ProducerMessage struct + DeliverAfter time.Duration + DeliverAt time.Time + EventTime time.Time + Key string + OrderingKey string + Payload []byte + Properties map[string]string + ReplicationClusters []string + SequenceID *int64 + Value interface{} + type ProducerOptions struct + BatchingMaxMessages uint + BatchingMaxPublishDelay time.Duration + BatchingMaxSize uint + DisableBatching bool + DisableBlockIfQueueFull bool + Interceptors ProducerInterceptors + MaxPendingMessages int + MaxReconnectToBroker *uint + MessageRouter func(*ProducerMessage, TopicMetadata) int + Name string + Properties map[string]string + Schema Schema + SendTimeout time.Duration + Topic string + type ProtoSchema struct + func NewProtoSchema(protoAvroSchemaDef string, properties map[string]string) *ProtoSchema + func (ps *ProtoSchema) Decode(data []byte, v interface{}) error + func (ps *ProtoSchema) Encode(data interface{}) ([]byte, error) + func (ps *ProtoSchema) GetSchemaInfo() *SchemaInfo + func (ps *ProtoSchema) Validate(message []byte) error + type Reader interface + Close func() + HasNext func() bool + Next func(context.Context) (Message, error) + Seek func(MessageID) error + SeekByTime func(time time.Time) error + Topic func() string + type ReaderMessage struct + type ReaderOptions struct + MessageChannel chan ReaderMessage + Name string + Properties map[string]string + ReadCompacted bool + ReceiverQueueSize int + StartMessageID MessageID + StartMessageIDInclusive bool + SubscriptionRolePrefix string + Topic string + type Result int + const AddToBatchFailed + const AlreadyClosedError + const AuthenticationError + const AuthorizationError + const BrokerMetadataError + const BrokerPersistenceError + const ChecksumError + const ConnectError + const ConsumerBusy + const ConsumerClosed + const ConsumerNotFound + const ConsumerNotInitialized + const CryptoError + const ErrorGettingAuthenticationData + const InvalidBatchBuilderType + const InvalidConfiguration + const InvalidMessage + const InvalidTopicName + const InvalidURL + const LookupError + const MessageTooBig + const NotConnectedError + const Ok + const OperationNotSupported + const ProducerBlockedQuotaExceededError + const ProducerBlockedQuotaExceededException + const ProducerNotInitialized + const ProducerQueueIsFull + const ReadError + const SeekFailed + const ServiceUnitNotReady + const SubscriptionNotFound + const TimeoutError + const TooManyLookupRequestException + const TopicNotFound + const TopicTerminated + const UnknownError + const UnsupportedVersionError + type RetryMessage struct + type Schema interface + Decode func(data []byte, v interface{}) error + Encode func(v interface{}) ([]byte, error) + GetSchemaInfo func() *SchemaInfo + Validate func(message []byte) error + type SchemaInfo struct + Name string + Properties map[string]string + Schema string + Type SchemaType + type SchemaType int + const AUTO + const AVRO + const AutoConsume + const AutoPublish + const BOOLEAN + const BYTES + const DOUBLE + const FLOAT + const INT16 + const INT32 + const INT64 + const INT8 + const JSON + const KeyValue + const NONE + const PROTOBUF + const STRING + type StringSchema struct + func NewStringSchema(properties map[string]string) *StringSchema + func (ss *StringSchema) Decode(data []byte, v interface{}) error + func (ss *StringSchema) Encode(v interface{}) ([]byte, error) + func (ss *StringSchema) GetSchemaInfo() *SchemaInfo + func (ss *StringSchema) Validate(message []byte) error + type SubscriptionInitialPosition int + const SubscriptionPositionEarliest + const SubscriptionPositionLatest + type SubscriptionType int + const Exclusive + const Failover + const KeyShared + const Shared + type TopicMetadata interface + NumPartitions func() uint32