Versions in this module Expand all Collapse all v2 v2.0.1 Aug 9, 2023 v2.0.0 Jun 23, 2023 Changes in this version + type MockKafkaAdmin struct + Topics *Topics + func NewMockAdminWithTopics(tps []*kafka.Topic) *MockKafkaAdmin + func (m *MockKafkaAdmin) Close() + func (m *MockKafkaAdmin) CreateTopics(topics []*kafka.Topic) error + func (m *MockKafkaAdmin) DeleteTopics(topics []string) (map[string]error, error) + func (m *MockKafkaAdmin) FetchInfo(topics []string) ([]*kafka.Topic, error) + type MockPartition struct + func (p *MockPartition) Append(r kafka.Record) error + func (p *MockPartition) Fetch(start int64, limit int) (records []kafka.Record, err error) + func (p *MockPartition) FetchAll() (records []kafka.Record) + func (p *MockPartition) Latest() int64 + type MockPartitionConsumer struct + func (m *MockPartitionConsumer) Close() error + func (m *MockPartitionConsumer) ConsumePartition(ctx context.Context, topic string, partition int32, offset kafka.Offset) (kafka.Partition, error) + func (m *MockPartitionConsumer) ConsumeTopic(ctx context.Context, topic string, offset kafka.Offset) (map[int32]kafka.Partition, error) + func (m *MockPartitionConsumer) GetOffsetLatest(topic string, partition int32) (offset int64, err error) + func (m *MockPartitionConsumer) GetOffsetOldest(topic string, partition int32) (offset int64, err error) + func (m *MockPartitionConsumer) OffsetValid(topic string, partition int32, offset int64) (isValid bool, err error) + func (m *MockPartitionConsumer) Partitions(ctx context.Context, topic string) ([]int32, error) + type MockStreamProducer struct + func NewMockPartitionConsumer(topics *Topics) *MockStreamProducer + func NewMockProducer(topics *Topics) *MockStreamProducer + func (msp *MockStreamProducer) Close() error + func (msp *MockStreamProducer) ProduceBatch(ctx context.Context, messages []kafka.Record) error + func (msp *MockStreamProducer) ProduceSync(ctx context.Context, message kafka.Record) (partition int32, offset int64, err error) + type MockTopic struct + Meta *kafka.Topic + Name string + func (tp *MockTopic) AddPartition(id int) error + func (tp *MockTopic) FetchAll() (records []kafka.Record) + func (tp *MockTopic) Partition(id int) (*MockPartition, error) + func (tp *MockTopic) Partitions() []*MockPartition + type Record struct + MCtx context.Context + MHeaders []kafka.RecordHeader + MKey []byte + MOffset int64 + MPartition int32 + MTimestamp time.Time + MTopic string + MValue []byte + func (r *Record) Ctx() context.Context + func (r *Record) Headers() kafka.RecordHeaders + func (r *Record) Key() []byte + func (r *Record) Offset() int64 + func (r *Record) Partition() int32 + func (r *Record) String() string + func (r *Record) Timestamp() time.Time + func (r *Record) Topic() string + func (r *Record) Value() []byte + type Topics struct + func NewMockTopics() *Topics + func (td *Topics) AddTopic(topic *MockTopic) error + func (td *Topics) RemoveTopic(name string) error + func (td *Topics) Topic(name string) (*MockTopic, error) + func (td *Topics) Topics() map[string]*MockTopic