Documentation ¶
Index ¶
- Constants
- func IngestRow(topic *Topic, row *common.Row, colTypes []common.ColumnType, keyCols []int, ...) error
- func IngestRows(f *FakeKafka, sourceInfo *common.SourceInfo, colTypes []common.ColumnType, ...) error
- func NewStringKeyProtobufValueEncoderFactory(registry protolib.Resolver) func(options string) (MessageEncoder, error)
- type ClientFactory
- type ConfluentMessageProvider
- func (cmp *ConfluentMessageProvider) Close() error
- func (cmp *ConfluentMessageProvider) CommitOffsets(offsetsMap map[int32]int64) error
- func (cmp *ConfluentMessageProvider) GetMessage(pollTimeout time.Duration) (*Message, error)
- func (cmp *ConfluentMessageProvider) RebalanceOccurred(cons *kafka.Consumer, event kafka.Event) error
- func (cmp *ConfluentMessageProvider) SetRebalanceCallback(callback RebalanceCallback)
- func (cmp *ConfluentMessageProvider) Start() error
- func (cmp *ConfluentMessageProvider) Stop() error
- type ConfluentMessageProviderFactory
- type FakeKafka
- func (f *FakeKafka) CreateTopic(name string, partitions int) (*Topic, error)
- func (f *FakeKafka) DeleteTopic(name string) error
- func (f *FakeKafka) GetTopic(name string) (*Topic, bool)
- func (f *FakeKafka) GetTopicNames() []string
- func (f *FakeKafka) IngestMessage(topicName string, message *Message) error
- func (f *FakeKafka) InjectFailure(topicName string, groupID string, failTime time.Duration) error
- type FakeMessageProvider
- func (f *FakeMessageProvider) Close() error
- func (f *FakeMessageProvider) CommitOffsets(offsets map[int32]int64) error
- func (f *FakeMessageProvider) GetMessage(pollTimeout time.Duration) (*Message, error)
- func (f *FakeMessageProvider) SetRebalanceCallback(callback RebalanceCallback)
- func (f *FakeMessageProvider) Start() error
- func (f *FakeMessageProvider) Stop() error
- type FakeMessageProviderFactory
- type Float32BEKeyTLJSONValueEncoder
- type Float64BEKeyTLJSONValueEncoder
- type Group
- type Int16BEKeyTLJSONValueEncoder
- type Int32BEKeyTLJSONValueEncoder
- type Int64BEKeyTLJSONValueEncoder
- type JSONHeadersEncoder
- type JSONKeyJSONValueEncoder
- type Message
- type MessageEncoder
- type MessageHeader
- type MessageProvider
- type MessageProviderFactory
- type MessageQueue
- type NestedJSONKeyNestedJSONValueEncoder
- type PartInfo
- type Partition
- type RebalanceCallback
- type StringKeyProtobufValueEncoder
- type StringKeyTLJSONValueEncoder
- type Subscriber
- type Topic
Constants ¶
const FakeKafkaIDPropName = "fakeKafkaID"
Variables ¶
This section is empty.
Functions ¶
func IngestRow ¶
func IngestRow(topic *Topic, row *common.Row, colTypes []common.ColumnType, keyCols []int, encoder MessageEncoder, timestamp time.Time) error
IngestRow is a convenience method which encodes the row into a Kafka message first, then ingests it
func IngestRows ¶
func IngestRows(f *FakeKafka, sourceInfo *common.SourceInfo, colTypes []common.ColumnType, rows *common.Rows, encoder MessageEncoder) error
IngestRows ingests rows given schema and source name - convenience method for use in tests
func NewStringKeyProtobufValueEncoderFactory ¶
func NewStringKeyProtobufValueEncoderFactory(registry protolib.Resolver) func(options string) (MessageEncoder, error)
Types ¶
type ClientFactory ¶
type ClientFactory func(topicName string, props map[string]string, groupID string) MessageProviderFactory
type ConfluentMessageProvider ¶
type ConfluentMessageProvider struct {
// contains filtered or unexported fields
}
func (*ConfluentMessageProvider) Close ¶
func (cmp *ConfluentMessageProvider) Close() error
func (*ConfluentMessageProvider) CommitOffsets ¶
func (cmp *ConfluentMessageProvider) CommitOffsets(offsetsMap map[int32]int64) error
func (*ConfluentMessageProvider) GetMessage ¶
func (cmp *ConfluentMessageProvider) GetMessage(pollTimeout time.Duration) (*Message, error)
func (*ConfluentMessageProvider) RebalanceOccurred ¶
func (*ConfluentMessageProvider) SetRebalanceCallback ¶
func (cmp *ConfluentMessageProvider) SetRebalanceCallback(callback RebalanceCallback)
func (*ConfluentMessageProvider) Start ¶
func (cmp *ConfluentMessageProvider) Start() error
func (*ConfluentMessageProvider) Stop ¶
func (cmp *ConfluentMessageProvider) Stop() error
type ConfluentMessageProviderFactory ¶
type ConfluentMessageProviderFactory struct {
// contains filtered or unexported fields
}
func (*ConfluentMessageProviderFactory) NewMessageProvider ¶
func (cmpf *ConfluentMessageProviderFactory) NewMessageProvider() (MessageProvider, error)
type FakeKafka ¶
type FakeKafka struct { ID int64 // contains filtered or unexported fields }
func GetFakeKafka ¶
func NewFakeKafka ¶
func NewFakeKafka() *FakeKafka
func (*FakeKafka) CreateTopic ¶
func (*FakeKafka) DeleteTopic ¶
func (*FakeKafka) GetTopicNames ¶
func (*FakeKafka) IngestMessage ¶
type FakeMessageProvider ¶
type FakeMessageProvider struct {
// contains filtered or unexported fields
}
func (*FakeMessageProvider) Close ¶
func (f *FakeMessageProvider) Close() error
func (*FakeMessageProvider) CommitOffsets ¶
func (f *FakeMessageProvider) CommitOffsets(offsets map[int32]int64) error
func (*FakeMessageProvider) GetMessage ¶
func (f *FakeMessageProvider) GetMessage(pollTimeout time.Duration) (*Message, error)
func (*FakeMessageProvider) SetRebalanceCallback ¶
func (f *FakeMessageProvider) SetRebalanceCallback(callback RebalanceCallback)
func (*FakeMessageProvider) Start ¶
func (f *FakeMessageProvider) Start() error
func (*FakeMessageProvider) Stop ¶
func (f *FakeMessageProvider) Stop() error
type FakeMessageProviderFactory ¶
type FakeMessageProviderFactory struct {
// contains filtered or unexported fields
}
func (*FakeMessageProviderFactory) NewMessageProvider ¶
func (fmpf *FakeMessageProviderFactory) NewMessageProvider() (MessageProvider, error)
type Float32BEKeyTLJSONValueEncoder ¶
type Float32BEKeyTLJSONValueEncoder struct { }
Float32BEKeyTLJSONValueEncoder encodes as float32BE key, top level JSON value, no headers
func (*Float32BEKeyTLJSONValueEncoder) EncodeMessage ¶
func (s *Float32BEKeyTLJSONValueEncoder) EncodeMessage(row *common.Row, colTypes []common.ColumnType, keyCols []int, timestamp time.Time) (*Message, error)
func (*Float32BEKeyTLJSONValueEncoder) Name ¶
func (s *Float32BEKeyTLJSONValueEncoder) Name() string
type Float64BEKeyTLJSONValueEncoder ¶
type Float64BEKeyTLJSONValueEncoder struct { }
Float64BEKeyTLJSONValueEncoder encodes as float64BE key, top level JSON value, no headers
func (*Float64BEKeyTLJSONValueEncoder) EncodeMessage ¶
func (s *Float64BEKeyTLJSONValueEncoder) EncodeMessage(row *common.Row, colTypes []common.ColumnType, keyCols []int, timestamp time.Time) (*Message, error)
func (*Float64BEKeyTLJSONValueEncoder) Name ¶
func (s *Float64BEKeyTLJSONValueEncoder) Name() string
type Int16BEKeyTLJSONValueEncoder ¶
type Int16BEKeyTLJSONValueEncoder struct { }
Int16BEKeyTLJSONValueEncoder encodes as int16BE key, top level JSON value, no headers
func (*Int16BEKeyTLJSONValueEncoder) EncodeMessage ¶
func (s *Int16BEKeyTLJSONValueEncoder) EncodeMessage(row *common.Row, colTypes []common.ColumnType, keyCols []int, timestamp time.Time) (*Message, error)
func (*Int16BEKeyTLJSONValueEncoder) Name ¶
func (s *Int16BEKeyTLJSONValueEncoder) Name() string
type Int32BEKeyTLJSONValueEncoder ¶
type Int32BEKeyTLJSONValueEncoder struct { }
Int32BEKeyTLJSONValueEncoder encodes as int32BE key, top level JSON value, no headers
func (*Int32BEKeyTLJSONValueEncoder) EncodeMessage ¶
func (s *Int32BEKeyTLJSONValueEncoder) EncodeMessage(row *common.Row, colTypes []common.ColumnType, keyCols []int, timestamp time.Time) (*Message, error)
func (*Int32BEKeyTLJSONValueEncoder) Name ¶
func (s *Int32BEKeyTLJSONValueEncoder) Name() string
type Int64BEKeyTLJSONValueEncoder ¶
type Int64BEKeyTLJSONValueEncoder struct { }
Int64BEKeyTLJSONValueEncoder encodes as int64BE key, top level JSON value, no headers
func (*Int64BEKeyTLJSONValueEncoder) EncodeMessage ¶
func (s *Int64BEKeyTLJSONValueEncoder) EncodeMessage(row *common.Row, colTypes []common.ColumnType, keyCols []int, timestamp time.Time) (*Message, error)
func (*Int64BEKeyTLJSONValueEncoder) Name ¶
func (s *Int64BEKeyTLJSONValueEncoder) Name() string
type JSONHeadersEncoder ¶
type JSONHeadersEncoder struct { }
JSONHeadersEncoder puts the message key encoded as JSON in one header and the message value encoded as JSON in another, the actual message key and value are empty JSON objects
func (*JSONHeadersEncoder) EncodeMessage ¶
func (s *JSONHeadersEncoder) EncodeMessage(row *common.Row, colTypes []common.ColumnType, keyCols []int, timestamp time.Time) (*Message, error)
func (*JSONHeadersEncoder) Name ¶
func (s *JSONHeadersEncoder) Name() string
type JSONKeyJSONValueEncoder ¶
type JSONKeyJSONValueEncoder struct { }
JSONKeyJSONValueEncoder encodes as top level JSON key, top level JSON value, no headers
func (*JSONKeyJSONValueEncoder) EncodeMessage ¶
func (s *JSONKeyJSONValueEncoder) EncodeMessage(row *common.Row, colTypes []common.ColumnType, keyCols []int, timestamp time.Time) (*Message, error)
func (*JSONKeyJSONValueEncoder) Name ¶
func (s *JSONKeyJSONValueEncoder) Name() string
type MessageEncoder ¶
type MessageEncoder interface { Name() string EncodeMessage(row *common.Row, colTypes []common.ColumnType, keyCols []int, timestamp time.Time) (*Message, error) }
func NewStringKeyProtobufValueEncoder ¶
func NewStringKeyProtobufValueEncoder(protoRegistry protolib.Resolver, options string) (MessageEncoder, error)
type MessageHeader ¶
type MessageProvider ¶
type MessageProviderFactory ¶
type MessageProviderFactory interface {
NewMessageProvider() (MessageProvider, error)
}
func NewMessageProviderFactory ¶
func NewMessageProviderFactory(topicName string, props map[string]string, groupID string) MessageProviderFactory
type MessageQueue ¶
type MessageQueue chan *Message
type NestedJSONKeyNestedJSONValueEncoder ¶
type NestedJSONKeyNestedJSONValueEncoder struct { }
NestedJSONKeyNestedJSONValueEncoder encodes as nested JSON key, nested JSON value, no headers
func (*NestedJSONKeyNestedJSONValueEncoder) EncodeMessage ¶
func (s *NestedJSONKeyNestedJSONValueEncoder) EncodeMessage(row *common.Row, colTypes []common.ColumnType, keyCols []int, timestamp time.Time) (*Message, error)
func (*NestedJSONKeyNestedJSONValueEncoder) Name ¶
func (s *NestedJSONKeyNestedJSONValueEncoder) Name() string
type RebalanceCallback ¶
type RebalanceCallback func() error
type StringKeyProtobufValueEncoder ¶
type StringKeyProtobufValueEncoder struct {
// contains filtered or unexported fields
}
StringKeyProtobufValueEncoder is an encoder that translates each row to a protobuf message. Columns 0 to N correspond to protobuf field numbers 1 to N+1. This means that the key also ends up as a field in the value protobuf.
func (*StringKeyProtobufValueEncoder) EncodeMessage ¶
func (e *StringKeyProtobufValueEncoder) EncodeMessage(row *common.Row, colTypes []common.ColumnType, keyCols []int, timestamp time.Time) (*Message, error)
func (*StringKeyProtobufValueEncoder) Name ¶
func (e *StringKeyProtobufValueEncoder) Name() string
type StringKeyTLJSONValueEncoder ¶
type StringKeyTLJSONValueEncoder struct { }
StringKeyTLJSONValueEncoder encodes as string key, top level JSON value, no headers
func (*StringKeyTLJSONValueEncoder) EncodeMessage ¶
func (s *StringKeyTLJSONValueEncoder) EncodeMessage(row *common.Row, colTypes []common.ColumnType, keyCols []int, timestamp time.Time) (*Message, error)
func (*StringKeyTLJSONValueEncoder) Name ¶
func (s *StringKeyTLJSONValueEncoder) Name() string
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
func (*Subscriber) GetMessage ¶
func (c *Subscriber) GetMessage(pollTimeout time.Duration) (*Message, error)
func (*Subscriber) Unsubscribe ¶
func (c *Subscriber) Unsubscribe() error
type Topic ¶
type Topic struct { Name string // contains filtered or unexported fields }
func (*Topic) CreateSubscriber ¶
func (t *Topic) CreateSubscriber(groupID string, rebalanceCB RebalanceCallback) (*Subscriber, error)