Documentation ¶
Index ¶
- func NewAdminClientFromConsumer(c Consumer) (a *lib.AdminClient, err error)
- func NewAdminClientFromProducer(p Producer) (a *lib.AdminClient, err error)
- type Consumer
- type MockConsumer
- func (c *MockConsumer) Assign(partitions []lib.TopicPartition) (err error)
- func (c *MockConsumer) Assignment() ([]lib.TopicPartition, error)
- func (c *MockConsumer) Close() (err error)
- func (c *MockConsumer) Commit() ([]lib.TopicPartition, error)
- func (c *MockConsumer) CommitMessage(m *lib.Message) ([]lib.TopicPartition, error)
- func (c *MockConsumer) CommitOffsets(offsets []lib.TopicPartition) ([]lib.TopicPartition, error)
- func (c *MockConsumer) Committed(partitions []lib.TopicPartition, timeoutMs int) ([]lib.TopicPartition, error)
- func (c *MockConsumer) Events() chan lib.Event
- func (c *MockConsumer) GetConsumerGroupMetadata() (*lib.ConsumerGroupMetadata, error)
- func (c *MockConsumer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*lib.Metadata, error)
- func (c *MockConsumer) GetWatermarkOffsets(topic string, partition int32) (low, high int64, err error)
- func (c *MockConsumer) Logs() chan lib.LogEvent
- func (c *MockConsumer) OffsetsForTimes(times []lib.TopicPartition, timeoutMs int) (offsets []lib.TopicPartition, err error)
- func (c *MockConsumer) Pause(partitions []lib.TopicPartition) (err error)
- func (c *MockConsumer) Poll(timeoutMs int) (event lib.Event)
- func (c *MockConsumer) Position(partitions []lib.TopicPartition) (offsets []lib.TopicPartition, err error)
- func (c *MockConsumer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)
- func (c *MockConsumer) ReadMessage(timeout time.Duration) (*lib.Message, error)
- func (c *MockConsumer) Resume(partitions []lib.TopicPartition) (err error)
- func (c *MockConsumer) Seek(partition lib.TopicPartition, timeoutMs int) error
- func (c *MockConsumer) SetOAuthBearerToken(oauthBearerToken lib.OAuthBearerToken) error
- func (c *MockConsumer) SetOAuthBearerTokenFailure(errstr string) error
- func (c *MockConsumer) StoreOffsets(offsets []lib.TopicPartition) ([]lib.TopicPartition, error)
- func (c *MockConsumer) String() string
- func (c *MockConsumer) Subscribe(topic string, rebalanceCb lib.RebalanceCb) error
- func (c *MockConsumer) SubscribeTopics(topics []string, rebalanceCb lib.RebalanceCb) (err error)
- func (c *MockConsumer) Subscription() (topics []string, err error)
- func (c *MockConsumer) Unassign() (err error)
- func (c *MockConsumer) Unsubscribe() (err error)
- type MockProducer
- func (p *MockProducer) AbortTransaction(ctx context.Context) error
- func (p *MockProducer) BeginTransaction() error
- func (p *MockProducer) Close()
- func (p *MockProducer) CommitTransaction(ctx context.Context) error
- func (p *MockProducer) Events() chan lib.Event
- func (p *MockProducer) Flush(timeoutMs int) int
- func (p *MockProducer) GetFatalError() error
- func (p *MockProducer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*lib.Metadata, error)
- func (p *MockProducer) InitTransactions(ctx context.Context) error
- func (p *MockProducer) Len() int
- func (p *MockProducer) Logs() chan lib.LogEvent
- func (p *MockProducer) OffsetsForTimes(times []lib.TopicPartition, timeoutMs int) ([]lib.TopicPartition, error)
- func (p *MockProducer) Produce(msg *lib.Message, deliveryChan chan lib.Event) error
- func (p *MockProducer) ProduceChannel() chan *lib.Message
- func (p *MockProducer) Purge(flags int) error
- func (p *MockProducer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)
- func (p *MockProducer) SendOffsetsToTransaction(ctx context.Context, offsets []lib.TopicPartition, ...) error
- func (p *MockProducer) SetOAuthBearerToken(oauthBearerToken lib.OAuthBearerToken) error
- func (p *MockProducer) SetOAuthBearerTokenFailure(errstr string) error
- func (p *MockProducer) String() string
- func (p *MockProducer) TestFatalError(code lib.ErrorCode, str string) lib.ErrorCode
- type Producer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewAdminClientFromConsumer ¶
func NewAdminClientFromConsumer(c Consumer) (a *lib.AdminClient, err error)
NewAdminClientFromConsumer derives a new AdminClient from an existing Consumer instance. The AdminClient will use the same configuration and connections as the parent instance.
func NewAdminClientFromProducer ¶
func NewAdminClientFromProducer(p Producer) (a *lib.AdminClient, err error)
NewAdminClientFromProducer derives a new AdminClient from an existing Producer instance. The AdminClient will use the same configuration and connections as the parent instance.
Types ¶
type Consumer ¶
type Consumer interface { Assign(partitions []lib.TopicPartition) (err error) Assignment() (partitions []lib.TopicPartition, err error) Close() (err error) Commit() ([]lib.TopicPartition, error) CommitMessage(m *lib.Message) ([]lib.TopicPartition, error) CommitOffsets(offsets []lib.TopicPartition) ([]lib.TopicPartition, error) Committed(partitions []lib.TopicPartition, timeoutMs int) (offsets []lib.TopicPartition, err error) Events() chan lib.Event GetConsumerGroupMetadata() (*lib.ConsumerGroupMetadata, error) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*lib.Metadata, error) GetWatermarkOffsets(topic string, partition int32) (low, high int64, err error) Logs() chan lib.LogEvent OffsetsForTimes(times []lib.TopicPartition, timeoutMs int) (offsets []lib.TopicPartition, err error) Pause(partitions []lib.TopicPartition) (err error) Poll(timeoutMs int) (event lib.Event) Position(partitions []lib.TopicPartition) (offsets []lib.TopicPartition, err error) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error) ReadMessage(timeout time.Duration) (*lib.Message, error) Resume(partitions []lib.TopicPartition) (err error) Seek(partition lib.TopicPartition, timeoutMs int) error SetOAuthBearerToken(oauthBearerToken lib.OAuthBearerToken) error SetOAuthBearerTokenFailure(errstr string) error StoreOffsets(offsets []lib.TopicPartition) (storedOffsets []lib.TopicPartition, err error) String() string Subscribe(topic string, rebalanceCb lib.RebalanceCb) error SubscribeTopics(topics []string, rebalanceCb lib.RebalanceCb) (err error) Subscription() (topics []string, err error) Unassign() (err error) Unsubscribe() (err error) }
Consumer is an interface for the confluent-kafka-go Consumer struct
type MockConsumer ¶
MockConsumer is a mock of the Consumer interface using github.com/stretchr/testify/mock
func NewMockConsumer ¶
func NewMockConsumer() *MockConsumer
NewMockConsumer returns a new MockConsumer struct
func (*MockConsumer) Assign ¶
func (c *MockConsumer) Assign(partitions []lib.TopicPartition) (err error)
Assign method
func (*MockConsumer) Assignment ¶
func (c *MockConsumer) Assignment() ([]lib.TopicPartition, error)
Assignment method
func (*MockConsumer) Commit ¶
func (c *MockConsumer) Commit() ([]lib.TopicPartition, error)
Commit method
func (*MockConsumer) CommitMessage ¶
func (c *MockConsumer) CommitMessage(m *lib.Message) ([]lib.TopicPartition, error)
CommitMessage method
func (*MockConsumer) CommitOffsets ¶
func (c *MockConsumer) CommitOffsets(offsets []lib.TopicPartition) ([]lib.TopicPartition, error)
CommitOffsets method
func (*MockConsumer) Committed ¶
func (c *MockConsumer) Committed(partitions []lib.TopicPartition, timeoutMs int) ([]lib.TopicPartition, error)
Committed method
func (*MockConsumer) GetConsumerGroupMetadata ¶
func (c *MockConsumer) GetConsumerGroupMetadata() (*lib.ConsumerGroupMetadata, error)
GetConsumerGroupMetadata method
func (*MockConsumer) GetMetadata ¶
func (c *MockConsumer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*lib.Metadata, error)
GetMetadata method
func (*MockConsumer) GetWatermarkOffsets ¶
func (c *MockConsumer) GetWatermarkOffsets(topic string, partition int32) (low, high int64, err error)
GetWatermarkOffsets method
func (*MockConsumer) OffsetsForTimes ¶
func (c *MockConsumer) OffsetsForTimes(times []lib.TopicPartition, timeoutMs int) (offsets []lib.TopicPartition, err error)
OffsetsForTimes method
func (*MockConsumer) Pause ¶
func (c *MockConsumer) Pause(partitions []lib.TopicPartition) (err error)
Pause method
func (*MockConsumer) Poll ¶
func (c *MockConsumer) Poll(timeoutMs int) (event lib.Event)
Poll method
func (*MockConsumer) Position ¶
func (c *MockConsumer) Position(partitions []lib.TopicPartition) (offsets []lib.TopicPartition, err error)
Position method
func (*MockConsumer) QueryWatermarkOffsets ¶
func (c *MockConsumer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)
QueryWatermarkOffsets method
func (*MockConsumer) ReadMessage ¶
ReadMessage method
func (*MockConsumer) Resume ¶
func (c *MockConsumer) Resume(partitions []lib.TopicPartition) (err error)
Resume method
func (*MockConsumer) Seek ¶
func (c *MockConsumer) Seek(partition lib.TopicPartition, timeoutMs int) error
Seek method
func (*MockConsumer) SetOAuthBearerToken ¶
func (c *MockConsumer) SetOAuthBearerToken(oauthBearerToken lib.OAuthBearerToken) error
SetOAuthBearerToken method
func (*MockConsumer) SetOAuthBearerTokenFailure ¶
func (c *MockConsumer) SetOAuthBearerTokenFailure(errstr string) error
SetOAuthBearerTokenFailure method
func (*MockConsumer) StoreOffsets ¶
func (c *MockConsumer) StoreOffsets(offsets []lib.TopicPartition) ([]lib.TopicPartition, error)
StoreOffsets method
func (*MockConsumer) Subscribe ¶
func (c *MockConsumer) Subscribe(topic string, rebalanceCb lib.RebalanceCb) error
Subscribe method
func (*MockConsumer) SubscribeTopics ¶
func (c *MockConsumer) SubscribeTopics(topics []string, rebalanceCb lib.RebalanceCb) (err error)
SubscribeTopics method
func (*MockConsumer) Subscription ¶
func (c *MockConsumer) Subscription() (topics []string, err error)
Subscription method
func (*MockConsumer) Unsubscribe ¶
func (c *MockConsumer) Unsubscribe() (err error)
Unsubscribe method
type MockProducer ¶
MockProducer is a mock of the Producer interface using github.com/stretchr/testify/mock
func NewMockProducer ¶
func NewMockProducer() *MockProducer
NewMockProducer returns a new MockProducer struct
func (*MockProducer) AbortTransaction ¶
func (p *MockProducer) AbortTransaction(ctx context.Context) error
AbortTransaction method
func (*MockProducer) BeginTransaction ¶
func (p *MockProducer) BeginTransaction() error
BeginTransaction method
func (*MockProducer) CommitTransaction ¶
func (p *MockProducer) CommitTransaction(ctx context.Context) error
CommitTransaction method
func (*MockProducer) GetFatalError ¶
func (p *MockProducer) GetFatalError() error
GetFatalError method
func (*MockProducer) GetMetadata ¶
func (p *MockProducer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*lib.Metadata, error)
GetMetadata method
func (*MockProducer) InitTransactions ¶
func (p *MockProducer) InitTransactions(ctx context.Context) error
InitTransactions method
func (*MockProducer) OffsetsForTimes ¶
func (p *MockProducer) OffsetsForTimes(times []lib.TopicPartition, timeoutMs int) ([]lib.TopicPartition, error)
OffsetsForTimes method
func (*MockProducer) ProduceChannel ¶
func (p *MockProducer) ProduceChannel() chan *lib.Message
ProduceChannel method
func (*MockProducer) QueryWatermarkOffsets ¶
func (p *MockProducer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)
QueryWatermarkOffsets method
func (*MockProducer) SendOffsetsToTransaction ¶
func (p *MockProducer) SendOffsetsToTransaction(ctx context.Context, offsets []lib.TopicPartition, consumerMetadata *lib.ConsumerGroupMetadata) error
SendOffsetsToTransaction method
func (*MockProducer) SetOAuthBearerToken ¶
func (p *MockProducer) SetOAuthBearerToken(oauthBearerToken lib.OAuthBearerToken) error
SetOAuthBearerToken method
func (*MockProducer) SetOAuthBearerTokenFailure ¶
func (p *MockProducer) SetOAuthBearerTokenFailure(errstr string) error
SetOAuthBearerTokenFailure method
func (*MockProducer) TestFatalError ¶
TestFatalError method
type Producer ¶
type Producer interface { AbortTransaction(ctx context.Context) error BeginTransaction() error Close() CommitTransaction(ctx context.Context) error Events() chan lib.Event Flush(timeoutMs int) int GetFatalError() error GetMetadata(topic *string, allTopics bool, timeoutMs int) (*lib.Metadata, error) InitTransactions(ctx context.Context) error Len() int Logs() chan lib.LogEvent OffsetsForTimes(times []lib.TopicPartition, timeoutMs int) (offsets []lib.TopicPartition, err error) Produce(msg *lib.Message, deliveryChan chan lib.Event) error ProduceChannel() chan *lib.Message Purge(flags int) error QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error) SendOffsetsToTransaction(ctx context.Context, offsets []lib.TopicPartition, consumerMetadata *lib.ConsumerGroupMetadata) error SetOAuthBearerToken(oauthBearerToken lib.OAuthBearerToken) error SetOAuthBearerTokenFailure(errstr string) error String() string TestFatalError(code lib.ErrorCode, str string) lib.ErrorCode }
Producer is an interface for the confluent-kafka-go Producer struct