Documentation ¶
Overview ¶
Package kafka provides interfaces extracted from the core API struct types in the Confluent Kafka Golang client; see https://github.com/confluentinc/confluent-kafka-go
Index ¶
- type AdminClient
- type Consumer
- type ConsumerImpl
- func (c ConsumerImpl) Assign(partitions []kafka.TopicPartition) (err error)
- func (c ConsumerImpl) Assignment() (partitions []kafka.TopicPartition, err error)
- func (c ConsumerImpl) Close() (err error)
- func (c ConsumerImpl) Commit() ([]kafka.TopicPartition, error)
- func (c ConsumerImpl) CommitMessage(m *kafka.Message) ([]kafka.TopicPartition, error)
- func (c ConsumerImpl) CommitOffsets(offsets []kafka.TopicPartition) ([]kafka.TopicPartition, error)
- func (c ConsumerImpl) Committed(partitions []kafka.TopicPartition, timeoutMs int) (offsets []kafka.TopicPartition, err error)
- func (c ConsumerImpl) Events() chan kafka.Event
- func (c ConsumerImpl) GetConsumerGroupMetadata() (*kafka.ConsumerGroupMetadata, error)
- func (c ConsumerImpl) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*kafka.Metadata, error)
- func (c ConsumerImpl) GetWatermarkOffsets(topic string, partition int32) (low, high int64, err error)
- func (c ConsumerImpl) Logs() chan kafka.LogEvent
- func (c ConsumerImpl) OffsetsForTimes(times []kafka.TopicPartition, timeoutMs int) (offsets []kafka.TopicPartition, err error)
- func (c ConsumerImpl) Pause(partitions []kafka.TopicPartition) (err error)
- func (c ConsumerImpl) Poll(timeoutMs int) (event kafka.Event)
- func (c ConsumerImpl) Position(partitions []kafka.TopicPartition) (offsets []kafka.TopicPartition, err error)
- func (c ConsumerImpl) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)
- func (c ConsumerImpl) ReadMessage(timeout time.Duration) (*kafka.Message, error)
- func (c ConsumerImpl) Resume(partitions []kafka.TopicPartition) (err error)
- func (c ConsumerImpl) Seek(partition kafka.TopicPartition, timeoutMs int) error
- func (c ConsumerImpl) SetOAuthBearerToken(oauthBearerToken kafka.OAuthBearerToken) error
- func (c ConsumerImpl) SetOAuthBearerTokenFailure(errstr string) error
- func (c ConsumerImpl) StoreOffsets(offsets []kafka.TopicPartition) (storedOffsets []kafka.TopicPartition, err error)
- func (c ConsumerImpl) String() string
- func (c ConsumerImpl) Subscribe(topic string, rebalanceCb kafka.RebalanceCb) error
- func (c ConsumerImpl) SubscribeTopics(topics []string, rebalanceCb kafka.RebalanceCb) (err error)
- func (c ConsumerImpl) Subscription() (topics []string, err error)
- func (c ConsumerImpl) Unassign() (err error)
- func (c ConsumerImpl) Unsubscribe() (err error)
- type Producer
- type ProducerImpl
- func (p ProducerImpl) AbortTransaction(ctx context.Context) error
- func (p ProducerImpl) BeginTransaction() error
- func (p ProducerImpl) Close()
- func (p ProducerImpl) CommitTransaction(ctx context.Context) error
- func (p ProducerImpl) Events() chan kafka.Event
- func (p ProducerImpl) Flush(timeoutMs int) int
- func (p ProducerImpl) GetFatalError() error
- func (p ProducerImpl) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*kafka.Metadata, error)
- func (p ProducerImpl) GetTarget() *kafka.Producer
- func (p ProducerImpl) InitTransactions(ctx context.Context) error
- func (p ProducerImpl) Len() int
- func (p ProducerImpl) Logs() chan kafka.LogEvent
- func (p ProducerImpl) OffsetsForTimes(times []kafka.TopicPartition, timeoutMs int) (offsets []kafka.TopicPartition, err error)
- func (p ProducerImpl) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error
- func (p ProducerImpl) ProduceChannel() chan *kafka.Message
- func (p ProducerImpl) Purge(flags int) error
- func (p ProducerImpl) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)
- func (p ProducerImpl) SendOffsetsToTransaction(ctx context.Context, offsets []kafka.TopicPartition, ...) error
- func (p ProducerImpl) SetOAuthBearerToken(oauthBearerToken kafka.OAuthBearerToken) error
- func (p ProducerImpl) SetOAuthBearerTokenFailure(errstr string) error
- func (p ProducerImpl) String() string
- func (p ProducerImpl) TestFatalError(code kafka.ErrorCode, str string) kafka.ErrorCode
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AdminClient ¶
type AdminClient interface { ClusterID(ctx context.Context) (clusterID string, err error) ControllerID(ctx context.Context) (controllerID int32, err error) CreateTopics(ctx context.Context, topics []kafka.TopicSpecification, options ...kafka.CreateTopicsAdminOption) (result []kafka.TopicResult, err error) DeleteTopics(ctx context.Context, topics []string, options ...kafka.DeleteTopicsAdminOption) (result []kafka.TopicResult, err error) CreatePartitions(ctx context.Context, partitions []kafka.PartitionsSpecification, options ...kafka.CreatePartitionsAdminOption) (result []kafka.TopicResult, err error) AlterConfigs(ctx context.Context, resources []kafka.ConfigResource, options ...kafka.AlterConfigsAdminOption) (result []kafka.ConfigResourceResult, err error) DescribeConfigs(ctx context.Context, resources []kafka.ConfigResource, options ...kafka.DescribeConfigsAdminOption) (result []kafka.ConfigResourceResult, err error) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*kafka.Metadata, error) String() string SetOAuthBearerToken(oauthBearerToken kafka.OAuthBearerToken) error SetOAuthBearerTokenFailure(errstr string) error Close() }
Extracted Kafka admin client interface for purposes of composition and mocking
type Consumer ¶
type Consumer interface { String() string Subscribe(topic string, rebalanceCb kafka.RebalanceCb) error SubscribeTopics(topics []string, rebalanceCb kafka.RebalanceCb) (err error) Unsubscribe() (err error) Assign(partitions []kafka.TopicPartition) (err error) Unassign() (err error) Commit() ([]kafka.TopicPartition, error) CommitMessage(m *kafka.Message) ([]kafka.TopicPartition, error) CommitOffsets(offsets []kafka.TopicPartition) ([]kafka.TopicPartition, error) StoreOffsets(offsets []kafka.TopicPartition) (storedOffsets []kafka.TopicPartition, err error) Seek(partition kafka.TopicPartition, timeoutMs int) error Poll(timeoutMs int) (event kafka.Event) Events() chan kafka.Event Logs() chan kafka.LogEvent ReadMessage(timeout time.Duration) (*kafka.Message, error) Close() (err error) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*kafka.Metadata, error) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error) GetWatermarkOffsets(topic string, partition int32) (low, high int64, err error) OffsetsForTimes(times []kafka.TopicPartition, timeoutMs int) (offsets []kafka.TopicPartition, err error) Subscription() (topics []string, err error) Assignment() (partitions []kafka.TopicPartition, err error) Committed(partitions []kafka.TopicPartition, timeoutMs int) (offsets []kafka.TopicPartition, err error) Position(partitions []kafka.TopicPartition) (offsets []kafka.TopicPartition, err error) Pause(partitions []kafka.TopicPartition) (err error) Resume(partitions []kafka.TopicPartition) (err error) SetOAuthBearerToken(oauthBearerToken kafka.OAuthBearerToken) error SetOAuthBearerTokenFailure(errstr string) error GetConsumerGroupMetadata() (*kafka.ConsumerGroupMetadata, error) }
Extracted Kafka consumer interface for purposes of composition and mocking
type ConsumerImpl ¶
Simple decorator implementation of Consumer interface
func (ConsumerImpl) Assign ¶
func (c ConsumerImpl) Assign(partitions []kafka.TopicPartition) (err error)
func (ConsumerImpl) Assignment ¶
func (c ConsumerImpl) Assignment() (partitions []kafka.TopicPartition, err error)
func (ConsumerImpl) Close ¶
func (c ConsumerImpl) Close() (err error)
func (ConsumerImpl) Commit ¶
func (c ConsumerImpl) Commit() ([]kafka.TopicPartition, error)
func (ConsumerImpl) CommitMessage ¶
func (c ConsumerImpl) CommitMessage(m *kafka.Message) ([]kafka.TopicPartition, error)
func (ConsumerImpl) CommitOffsets ¶
func (c ConsumerImpl) CommitOffsets(offsets []kafka.TopicPartition) ([]kafka.TopicPartition, error)
func (ConsumerImpl) Committed ¶
func (c ConsumerImpl) Committed(partitions []kafka.TopicPartition, timeoutMs int) (offsets []kafka.TopicPartition, err error)
func (ConsumerImpl) Events ¶
func (c ConsumerImpl) Events() chan kafka.Event
func (ConsumerImpl) GetConsumerGroupMetadata ¶
func (c ConsumerImpl) GetConsumerGroupMetadata() (*kafka.ConsumerGroupMetadata, error)
func (ConsumerImpl) GetMetadata ¶
func (ConsumerImpl) GetWatermarkOffsets ¶
func (c ConsumerImpl) GetWatermarkOffsets(topic string, partition int32) (low, high int64, err error)
func (ConsumerImpl) Logs ¶
func (c ConsumerImpl) Logs() chan kafka.LogEvent
func (ConsumerImpl) OffsetsForTimes ¶
func (c ConsumerImpl) OffsetsForTimes(times []kafka.TopicPartition, timeoutMs int) (offsets []kafka.TopicPartition, err error)
func (ConsumerImpl) Pause ¶
func (c ConsumerImpl) Pause(partitions []kafka.TopicPartition) (err error)
func (ConsumerImpl) Position ¶
func (c ConsumerImpl) Position(partitions []kafka.TopicPartition) (offsets []kafka.TopicPartition, err error)
func (ConsumerImpl) QueryWatermarkOffsets ¶
func (ConsumerImpl) ReadMessage ¶
func (ConsumerImpl) Resume ¶
func (c ConsumerImpl) Resume(partitions []kafka.TopicPartition) (err error)
func (ConsumerImpl) Seek ¶
func (c ConsumerImpl) Seek(partition kafka.TopicPartition, timeoutMs int) error
func (ConsumerImpl) SetOAuthBearerToken ¶
func (c ConsumerImpl) SetOAuthBearerToken(oauthBearerToken kafka.OAuthBearerToken) error
func (ConsumerImpl) SetOAuthBearerTokenFailure ¶
func (c ConsumerImpl) SetOAuthBearerTokenFailure(errstr string) error
func (ConsumerImpl) StoreOffsets ¶
func (c ConsumerImpl) StoreOffsets(offsets []kafka.TopicPartition) (storedOffsets []kafka.TopicPartition, err error)
func (ConsumerImpl) String ¶
func (c ConsumerImpl) String() string
func (ConsumerImpl) Subscribe ¶
func (c ConsumerImpl) Subscribe(topic string, rebalanceCb kafka.RebalanceCb) error
func (ConsumerImpl) SubscribeTopics ¶
func (c ConsumerImpl) SubscribeTopics(topics []string, rebalanceCb kafka.RebalanceCb) (err error)
func (ConsumerImpl) Subscription ¶
func (c ConsumerImpl) Subscription() (topics []string, err error)
func (ConsumerImpl) Unassign ¶
func (c ConsumerImpl) Unassign() (err error)
func (ConsumerImpl) Unsubscribe ¶
func (c ConsumerImpl) Unsubscribe() (err error)
type Producer ¶
type Producer interface { String() string Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error Events() chan kafka.Event Logs() chan kafka.LogEvent ProduceChannel() chan *kafka.Message Len() int Flush(timeoutMs int) int Close() Purge(flags int) error GetMetadata(topic *string, allTopics bool, timeoutMs int) (*kafka.Metadata, error) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error) OffsetsForTimes(times []kafka.TopicPartition, timeoutMs int) (offsets []kafka.TopicPartition, err error) GetFatalError() error TestFatalError(code kafka.ErrorCode, str string) kafka.ErrorCode SetOAuthBearerToken(oauthBearerToken kafka.OAuthBearerToken) error SetOAuthBearerTokenFailure(errstr string) error InitTransactions(ctx context.Context) error BeginTransaction() error SendOffsetsToTransaction(ctx context.Context, offsets []kafka.TopicPartition, consumerMetadata *kafka.ConsumerGroupMetadata) error CommitTransaction(ctx context.Context) error AbortTransaction(ctx context.Context) error GetTarget() *kafka.Producer }
Extracted Kafka producer interface for purposes of composition and mocking
type ProducerImpl ¶
Simple decorator implementation of Producer interface
func (ProducerImpl) AbortTransaction ¶
func (p ProducerImpl) AbortTransaction(ctx context.Context) error
func (ProducerImpl) BeginTransaction ¶
func (p ProducerImpl) BeginTransaction() error
func (ProducerImpl) Close ¶
func (p ProducerImpl) Close()
func (ProducerImpl) CommitTransaction ¶
func (p ProducerImpl) CommitTransaction(ctx context.Context) error
func (ProducerImpl) Events ¶
func (p ProducerImpl) Events() chan kafka.Event
func (ProducerImpl) Flush ¶
func (p ProducerImpl) Flush(timeoutMs int) int
func (ProducerImpl) GetFatalError ¶
func (p ProducerImpl) GetFatalError() error
func (ProducerImpl) GetMetadata ¶
func (ProducerImpl) GetTarget ¶
func (p ProducerImpl) GetTarget() *kafka.Producer
func (ProducerImpl) InitTransactions ¶
func (p ProducerImpl) InitTransactions(ctx context.Context) error
func (ProducerImpl) Len ¶
func (p ProducerImpl) Len() int
func (ProducerImpl) Logs ¶
func (p ProducerImpl) Logs() chan kafka.LogEvent
func (ProducerImpl) OffsetsForTimes ¶
func (p ProducerImpl) OffsetsForTimes(times []kafka.TopicPartition, timeoutMs int) (offsets []kafka.TopicPartition, err error)
func (ProducerImpl) ProduceChannel ¶
func (p ProducerImpl) ProduceChannel() chan *kafka.Message
func (ProducerImpl) Purge ¶
func (p ProducerImpl) Purge(flags int) error
func (ProducerImpl) QueryWatermarkOffsets ¶
func (ProducerImpl) SendOffsetsToTransaction ¶
func (p ProducerImpl) SendOffsetsToTransaction(ctx context.Context, offsets []kafka.TopicPartition, consumerMetadata *kafka.ConsumerGroupMetadata) error
func (ProducerImpl) SetOAuthBearerToken ¶
func (p ProducerImpl) SetOAuthBearerToken(oauthBearerToken kafka.OAuthBearerToken) error
func (ProducerImpl) SetOAuthBearerTokenFailure ¶
func (p ProducerImpl) SetOAuthBearerTokenFailure(errstr string) error
func (ProducerImpl) String ¶
func (p ProducerImpl) String() string
func (ProducerImpl) TestFatalError ¶
Click to show internal directories.
Click to hide internal directories.