Documentation ¶
Index ¶
- Constants
- type AdminClient
- type Config
- type ConfigMap
- type Consumer
- type ConsumerFactory
- type DefaultAdminClient
- type DefaultConsumerFactory
- type DefaultProducerFactory
- type Extractor
- func (e *Extractor) Extract(ctx context.Context, query entity.ExtractorQuery, result any) (error, bool)
- func (e *Extractor) ExtractFromSink(ctx context.Context, query entity.ExtractorQuery, ...) (error, bool)
- func (e *Extractor) KafkaConfig() (pollTimeoutMs int, cfgMap map[string]any)
- func (e *Extractor) SendToSource(ctx context.Context, eventData any) (string, error)
- func (e *Extractor) SetConsumerFactory(cf ConsumerFactory)
- func (e *Extractor) SetProducerFactory(pf ProducerFactory)
- func (e *Extractor) StreamExtract(ctx context.Context, reportEvent entity.ProcessEventFunc, err *error, ...)
- type Loader
- type Producer
- type ProducerFactory
Constants ¶
View Source
const ( PropAutoCommit = "enable.auto.commit" PropAutoOffsetStore = "enable.auto.offset.store" )
Kafka props which cannot be overridden, required by Extractor
View Source
const DefaultPollTimeoutMs = 3000
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AdminClient ¶
type AdminClient interface { GetMetadata(topic *string, allTopics bool, timeoutMs int) (*kafka.Metadata, error) CreateTopics(ctx context.Context, topics []kafka.TopicSpecification, options ...kafka.CreateTopicsAdminOption) ([]kafka.TopicResult, error) }
type Config ¶
type Config struct {
// contains filtered or unexported fields
}
Config is the internal config used by each extractor/loader, combining config from external Config with config from what is inside the stream spec for this specific stream
func NewExtractorConfig ¶
func NewLoaderConfig ¶
func (*Config) SetCreateTopics ¶ added in v0.1.3
func (*Config) SetKafkaProperty ¶
func (*Config) SetPollTimout ¶
type Consumer ¶
type Consumer interface { SubscribeTopics(topics []string, rebalanceCb kafka.RebalanceCb) error Poll(timeoutMs int) (event kafka.Event) StoreOffsets(offsets []kafka.TopicPartition) ([]kafka.TopicPartition, error) Commit() ([]kafka.TopicPartition, error) CommitMessage(m *kafka.Message) ([]kafka.TopicPartition, error) Close() error }
Consumer interface is used to enable full unit testing
type ConsumerFactory ¶
type DefaultAdminClient ¶
type DefaultAdminClient struct {
// contains filtered or unexported fields
}
func (DefaultAdminClient) CreateTopics ¶
func (d DefaultAdminClient) CreateTopics(ctx context.Context, topics []kafka.TopicSpecification, options ...kafka.CreateTopicsAdminOption) ([]kafka.TopicResult, error)
func (DefaultAdminClient) GetMetadata ¶
type DefaultConsumerFactory ¶
type DefaultConsumerFactory struct{}
func (DefaultConsumerFactory) NewAdminClientFromConsumer ¶
func (d DefaultConsumerFactory) NewAdminClientFromConsumer(c Consumer) (AdminClient, error)
func (DefaultConsumerFactory) NewConsumer ¶
func (d DefaultConsumerFactory) NewConsumer(conf *kafka.ConfigMap) (Consumer, error)
type DefaultProducerFactory ¶
type DefaultProducerFactory struct{}
func (DefaultProducerFactory) NewAdminClientFromProducer ¶
func (d DefaultProducerFactory) NewAdminClientFromProducer(p Producer) (AdminClient, error)
func (DefaultProducerFactory) NewProducer ¶
func (d DefaultProducerFactory) NewProducer(conf *kafka.ConfigMap) (Producer, error)
type Extractor ¶ added in v0.1.3
type Extractor struct {
// contains filtered or unexported fields
}
func (*Extractor) ExtractFromSink ¶ added in v0.1.3
func (e *Extractor) ExtractFromSink( ctx context.Context, query entity.ExtractorQuery, result *[]*entity.Transformed) (error, bool)
func (*Extractor) KafkaConfig ¶ added in v0.1.3
func (*Extractor) SendToSource ¶ added in v0.1.3
func (*Extractor) SetConsumerFactory ¶ added in v0.1.3
func (e *Extractor) SetConsumerFactory(cf ConsumerFactory)
func (*Extractor) SetProducerFactory ¶ added in v0.1.3
func (e *Extractor) SetProducerFactory(pf ProducerFactory)
func (*Extractor) StreamExtract ¶ added in v0.1.3
type Loader ¶
type Loader struct {
// contains filtered or unexported fields
}
func (*Loader) KafkaConfig ¶ added in v0.1.3
func (*Loader) StreamLoad ¶
type Producer ¶
type Producer interface { Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error Events() chan kafka.Event Flush(timeoutMs int) int Close() }
Producer interface is used to enable full unit testing
type ProducerFactory ¶
Click to show internal directories.
Click to hide internal directories.