Documentation ¶
Index ¶
- type MockKafkaAdmin
- type MockPartition
- type MockPartitionConsumer
- func (m *MockPartitionConsumer) Close() error
- func (m *MockPartitionConsumer) ConsumePartition(ctx context.Context, topic string, partition int32, offset kafka.Offset) (kafka.Partition, error)
- func (m *MockPartitionConsumer) ConsumeTopic(ctx context.Context, topic string, offset kafka.Offset) (map[int32]kafka.Partition, error)
- func (m *MockPartitionConsumer) GetOffsetLatest(topic string, partition int32) (offset int64, err error)
- func (m *MockPartitionConsumer) GetOffsetOldest(topic string, partition int32) (offset int64, err error)
- func (m *MockPartitionConsumer) OffsetValid(topic string, partition int32, offset int64) (isValid bool, err error)
- func (m *MockPartitionConsumer) Partitions(ctx context.Context, topic string) ([]int32, error)
- type MockStreamProducer
- type MockTopic
- type Record
- func (r *Record) Ctx() context.Context
- func (r *Record) Headers() kafka.RecordHeaders
- func (r *Record) Key() []byte
- func (r *Record) Offset() int64
- func (r *Record) Partition() int32
- func (r *Record) String() string
- func (r *Record) Timestamp() time.Time
- func (r *Record) Topic() string
- func (r *Record) Value() []byte
- type Topics
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type MockKafkaAdmin ¶
type MockKafkaAdmin struct {
Topics *Topics
}
func NewMockAdminWithTopics ¶
func NewMockAdminWithTopics(tps []*kafka.Topic) *MockKafkaAdmin
func (*MockKafkaAdmin) Close ¶
func (m *MockKafkaAdmin) Close()
func (*MockKafkaAdmin) CreateTopics ¶
func (m *MockKafkaAdmin) CreateTopics(topics []*kafka.Topic) error
func (*MockKafkaAdmin) DeleteTopics ¶
func (m *MockKafkaAdmin) DeleteTopics(topics []string) (map[string]error, error)
type MockPartition ¶
func (*MockPartition) FetchAll ¶
func (p *MockPartition) FetchAll() (records []kafka.Record)
func (*MockPartition) Latest ¶
func (p *MockPartition) Latest() int64
type MockPartitionConsumer ¶
type MockPartitionConsumer struct {
// contains filtered or unexported fields
}
func (*MockPartitionConsumer) Close ¶
func (m *MockPartitionConsumer) Close() error
func (*MockPartitionConsumer) ConsumePartition ¶
func (*MockPartitionConsumer) ConsumeTopic ¶
func (*MockPartitionConsumer) GetOffsetLatest ¶
func (m *MockPartitionConsumer) GetOffsetLatest(topic string, partition int32) (offset int64, err error)
func (*MockPartitionConsumer) GetOffsetOldest ¶
func (m *MockPartitionConsumer) GetOffsetOldest(topic string, partition int32) (offset int64, err error)
func (*MockPartitionConsumer) OffsetValid ¶
func (*MockPartitionConsumer) Partitions ¶
type MockStreamProducer ¶
type MockStreamProducer struct {
// contains filtered or unexported fields
}
func NewMockPartitionConsumer ¶
func NewMockPartitionConsumer(topics *Topics) *MockStreamProducer
func NewMockProducer ¶
func NewMockProducer(topics *Topics) *MockStreamProducer
func (*MockStreamProducer) Close ¶
func (msp *MockStreamProducer) Close() error
func (*MockStreamProducer) ProduceBatch ¶
func (*MockStreamProducer) ProduceSync ¶
type MockTopic ¶
func (*MockTopic) AddPartition ¶
func (*MockTopic) Partitions ¶
func (tp *MockTopic) Partitions() []*MockPartition
type Record ¶
type Record struct { MCtx context.Context MTopic string MPartition int32 MOffset int64 MValue []byte MKey []byte MTimestamp time.Time MHeaders []kafka.RecordHeader }
func (*Record) Headers ¶
func (r *Record) Headers() kafka.RecordHeaders
Click to show internal directories.
Click to hide internal directories.