Documentation ¶
Index ¶
- Constants
- func IsNil(v any) bool
- type Config
- func (c *Config) SetCreateTopics(value bool)
- func (c *Config) SetDLQConfig(dlqConfig DLQConfig)
- func (c *Config) SetKafkaProperty(prop string, value any)
- func (c *Config) SetLoaderProps(props ConfigMap)
- func (c *Config) SetPollTimout(timeout int)
- func (c *Config) SetProps(props ConfigMap)
- func (c *Config) SetSendToSource(value bool)
- func (c *Config) String() string
- type ConfigMap
- type DLQConfig
- type DefaultAdminClient
- type DefaultConsumerFactory
- type DefaultProducerFactory
- type Extractor
- func (e *Extractor) DLQConfig() DLQConfig
- func (e *Extractor) Extract(ctx context.Context, query entity.ExtractorQuery, result any) (error, bool)
- func (e *Extractor) ExtractFromSink(ctx context.Context, query entity.ExtractorQuery, ...) (error, bool)
- func (e *Extractor) KafkaConfig() (pollTimeoutMs int, cfgMap map[string]any)
- func (e *Extractor) SendToSource(ctx context.Context, eventData any) (string, error)
- func (e *Extractor) SetConsumerFactory(cf ikafka.ConsumerFactory)
- func (e *Extractor) SetProducerFactory(pf ikafka.ProducerFactory)
- func (e *Extractor) StreamExtract(ctx context.Context, reportEvent entity.ProcessEventFunc, err *error, ...)
- type Loader
- type SharedProducerFactory
- func (s *SharedProducerFactory) CloseProducer(p ikafka.Producer)
- func (s *SharedProducerFactory) CloseSharedProducer()
- func (s *SharedProducerFactory) NewAdminClientFromProducer(p ikafka.Producer) (ikafka.AdminClient, error)
- func (s *SharedProducerFactory) NewProducer(conf *kafka.ConfigMap) (ikafka.Producer, error)
Constants ¶
const ( PropAutoCommit = "enable.auto.commit" PropAutoOffsetStore = "enable.auto.offset.store" )
Kafka props which cannot be overridden, required by Extractor
const DefaultPollTimeoutMs = 3000
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Config ¶
type Config struct {
// contains filtered or unexported fields
}
Config is the internal config used by each extractor/loader, combining config from external Config with config from what is inside the stream spec for this specific stream
func NewExtractorConfig ¶
func NewLoaderConfig ¶
func (*Config) SetCreateTopics ¶ added in v0.1.3
func (*Config) SetDLQConfig ¶ added in v0.3.1
func (*Config) SetKafkaProperty ¶
func (*Config) SetLoaderProps ¶ added in v0.2.0
func (*Config) SetPollTimout ¶
func (*Config) SetSendToSource ¶ added in v0.1.5
type DLQConfig ¶ added in v0.3.1
type DLQConfig struct { // Topic specifies which topic to use for DLQ events. If the global extractor // setting createTopics is set to false, only Topic.Name is regarded. Otherwise, // NumPartitions and ReplicationFactor will be used as well if the topic is created // (if it doesn't exist already). Topic *spec.TopicSpecification ProducerConfig map[string]any // If StreamIDEnrichmentPath is not empty it specifies the JSON path (e.g. // "my.enrichment.streamId") including the JSON field name, which will hold the // value of the injected stream ID for the current stream. That is, before the // event is sent to the DLQ the stream ID is added to a new field created in the // event, if this option is used. StreamIDEnrichmentPath string }
func NewDLQConfig ¶ added in v0.3.1
type DefaultAdminClient ¶
type DefaultAdminClient struct {
// contains filtered or unexported fields
}
func (DefaultAdminClient) Close ¶ added in v0.7.1
func (d DefaultAdminClient) Close()
func (DefaultAdminClient) CreateTopics ¶
func (d DefaultAdminClient) CreateTopics(ctx context.Context, topics []kafka.TopicSpecification, options ...kafka.CreateTopicsAdminOption) ([]kafka.TopicResult, error)
func (DefaultAdminClient) GetMetadata ¶
type DefaultConsumerFactory ¶
type DefaultConsumerFactory struct{}
func (DefaultConsumerFactory) NewAdminClientFromConsumer ¶
func (d DefaultConsumerFactory) NewAdminClientFromConsumer(c ikafka.Consumer) (ikafka.AdminClient, error)
func (DefaultConsumerFactory) NewConsumer ¶
type DefaultProducerFactory ¶
type DefaultProducerFactory struct{}
func (DefaultProducerFactory) CloseProducer ¶ added in v0.3.5
func (d DefaultProducerFactory) CloseProducer(p ikafka.Producer)
func (DefaultProducerFactory) NewAdminClientFromProducer ¶
func (d DefaultProducerFactory) NewAdminClientFromProducer(p ikafka.Producer) (ikafka.AdminClient, error)
func (DefaultProducerFactory) NewProducer ¶
type Extractor ¶ added in v0.1.3
type Extractor struct {
// contains filtered or unexported fields
}
func NewExtractor ¶
func NewExtractor(config *Config, cf ikafka.ConsumerFactory) (*Extractor, error)
func (*Extractor) ExtractFromSink ¶ added in v0.1.3
func (e *Extractor) ExtractFromSink( ctx context.Context, query entity.ExtractorQuery, result *[]*entity.Transformed) (error, bool)
func (*Extractor) KafkaConfig ¶ added in v0.1.3
func (*Extractor) SendToSource ¶ added in v0.1.3
SendToSource is meant for occasional event publish to the topic used by the extractor to consume from. The focus of the function is on resilient and synchronous event sending rather than high throughput scenarios. If the stream spec specifies multiple topics to consume from by the extractor, the first one will always be used as the one to use with SendToSource.
func (*Extractor) SetConsumerFactory ¶ added in v0.1.3
func (e *Extractor) SetConsumerFactory(cf ikafka.ConsumerFactory)
func (*Extractor) SetProducerFactory ¶ added in v0.1.3
func (e *Extractor) SetProducerFactory(pf ikafka.ProducerFactory)
func (*Extractor) StreamExtract ¶ added in v0.1.3
type Loader ¶
type Loader struct {
// contains filtered or unexported fields
}
func (*Loader) KafkaConfig ¶ added in v0.1.3
func (*Loader) StreamLoad ¶
type SharedProducerFactory ¶ added in v0.3.4
type SharedProducerFactory struct {
// contains filtered or unexported fields
}
SharedProducerFactory is a singleton per stream ID and creates and provides a shared Kafka producer for all stream instances of that stream.
func NewSharedProducerFactory ¶ added in v0.3.4
func NewSharedProducerFactory(pf ikafka.ProducerFactory) *SharedProducerFactory
func (*SharedProducerFactory) CloseProducer ¶ added in v0.3.5
func (s *SharedProducerFactory) CloseProducer(p ikafka.Producer)
CloseProducer for the non-shared case will close the Kafka producer. For the SharedProducerFactory this cannot be done since it might be used still concurrently by other goroutines. This will therefore not do anything here. For streams being shut down by disabling them, and later re-activated this will not leak any handles since the previous producer will just be re-used again based on the stream ID for that stream.
func (*SharedProducerFactory) CloseSharedProducer ¶ added in v0.4.0
func (s *SharedProducerFactory) CloseSharedProducer()
CloseSharedProducer is not part of the standard ProducerFactory interface and must only be used in cases where the caller has full control of the status of all running stream instances (extractors, including producers), which typically is the Geist's Supervisor.
func (*SharedProducerFactory) NewAdminClientFromProducer ¶ added in v0.3.4
func (s *SharedProducerFactory) NewAdminClientFromProducer(p ikafka.Producer) (ikafka.AdminClient, error)
func (*SharedProducerFactory) NewProducer ¶ added in v0.3.4
NewProducer provides a shared Kafka producer. Since this method is called concurrently (e.g. from extractors' init section of StreamLoad(), creating the DLQ producer), we need to protect its logic with a mutex.