gki

package
v0.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 4, 2024 License: MIT Imports: 14 Imported by: 0

Documentation

Index

Constants

View Source
const (
	PropAutoCommit      = "enable.auto.commit"
	PropAutoOffsetStore = "enable.auto.offset.store"
)

Kafka props which cannot be overridden, required by Extractor

View Source
const DefaultPollTimeoutMs = 3000

Variables

This section is empty.

Functions

func IsNil added in v0.3.5

func IsNil(v any) bool

Types

type Config

type Config struct {
	// contains filtered or unexported fields
}

Config is the internal config used by each extractor/loader, combining config from external Config with config from what is inside the stream spec for this specific stream

func NewExtractorConfig

func NewExtractorConfig(
	c entity.Config,
	topics []string,
	topicCreationMutex *sync.Mutex) *Config

func NewLoaderConfig

func NewLoaderConfig(
	c entity.Config,
	topic *spec.TopicSpecification,
	message *spec.Message,
	topicCreationMutex *sync.Mutex,
	sync bool) *Config

func (*Config) SetCreateTopics added in v0.1.3

func (c *Config) SetCreateTopics(value bool)

func (*Config) SetDLQConfig added in v0.3.1

func (c *Config) SetDLQConfig(dlqConfig DLQConfig)

func (*Config) SetKafkaProperty

func (c *Config) SetKafkaProperty(prop string, value any)

func (*Config) SetLoaderProps added in v0.2.0

func (c *Config) SetLoaderProps(props ConfigMap)

func (*Config) SetPollTimout

func (c *Config) SetPollTimout(timeout int)

func (*Config) SetProps

func (c *Config) SetProps(props ConfigMap)

func (*Config) SetSendToSource added in v0.1.5

func (c *Config) SetSendToSource(value bool)

func (*Config) String

func (c *Config) String() string

type ConfigMap

type ConfigMap map[string]any

type DLQConfig added in v0.3.1

type DLQConfig struct {
	// Topic specifies which topic to use for DLQ events. If the global extractor
	// setting createTopics is set to false, only Topic.Name is regarded. Otherwise,
	// NumPartitions and ReplicationFactor will be used as well if the topic is created
	// (if it doesn't exist already).
	Topic *spec.TopicSpecification

	ProducerConfig map[string]any

	// If StreamIDEnrichmentPath is not empty it specifies the JSON path (e.g.
	// "my.enrichment.streamId") including the JSON field name, which will hold the
	//  value of the injected stream ID for the current stream. That is, before the
	// event is sent to the DLQ the stream ID is added to a new field created in the
	// event, if this option is used.
	StreamIDEnrichmentPath string
}

func NewDLQConfig added in v0.3.1

func NewDLQConfig(pconf map[string]any, enrichPath string, topic *spec.TopicSpecification) (DLQConfig, error)

type DefaultAdminClient

type DefaultAdminClient struct {
	// contains filtered or unexported fields
}

func (DefaultAdminClient) Close added in v0.7.1

func (d DefaultAdminClient) Close()

func (DefaultAdminClient) CreateTopics

func (DefaultAdminClient) GetMetadata

func (d DefaultAdminClient) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*kafka.Metadata, error)

type DefaultConsumerFactory

type DefaultConsumerFactory struct{}

func (DefaultConsumerFactory) NewAdminClientFromConsumer

func (d DefaultConsumerFactory) NewAdminClientFromConsumer(c ikafka.Consumer) (ikafka.AdminClient, error)

func (DefaultConsumerFactory) NewConsumer

func (d DefaultConsumerFactory) NewConsumer(conf *kafka.ConfigMap) (ikafka.Consumer, error)

type DefaultProducerFactory

type DefaultProducerFactory struct{}

func (DefaultProducerFactory) CloseProducer added in v0.3.5

func (d DefaultProducerFactory) CloseProducer(p ikafka.Producer)

func (DefaultProducerFactory) NewAdminClientFromProducer

func (d DefaultProducerFactory) NewAdminClientFromProducer(p ikafka.Producer) (ikafka.AdminClient, error)

func (DefaultProducerFactory) NewProducer

func (d DefaultProducerFactory) NewProducer(conf *kafka.ConfigMap) (ikafka.Producer, error)

type Extractor added in v0.1.3

type Extractor struct {
	// contains filtered or unexported fields
}

func NewExtractor

func NewExtractor(config *Config, cf ikafka.ConsumerFactory) (*Extractor, error)

func (*Extractor) DLQConfig added in v0.3.1

func (e *Extractor) DLQConfig() DLQConfig

func (*Extractor) Extract added in v0.1.3

func (e *Extractor) Extract(
	ctx context.Context,
	query entity.ExtractorQuery,
	result any) (error, bool)

func (*Extractor) ExtractFromSink added in v0.1.3

func (e *Extractor) ExtractFromSink(
	ctx context.Context,
	query entity.ExtractorQuery,
	result *[]*entity.Transformed) (error, bool)

func (*Extractor) KafkaConfig added in v0.1.3

func (e *Extractor) KafkaConfig() (pollTimeoutMs int, cfgMap map[string]any)

func (*Extractor) SendToSource added in v0.1.3

func (e *Extractor) SendToSource(ctx context.Context, eventData any) (string, error)

SendToSource is meant for occasional event publish to the topic used by the extractor to consume from. The focus of the function is on resilient and synchronous event sending rather than high throughput scenarios. If the stream spec specifies multiple topics to consume from by the extractor, the first one will always be used as the one to use with SendToSource.

func (*Extractor) SetConsumerFactory added in v0.1.3

func (e *Extractor) SetConsumerFactory(cf ikafka.ConsumerFactory)

func (*Extractor) SetProducerFactory added in v0.1.3

func (e *Extractor) SetProducerFactory(pf ikafka.ProducerFactory)

func (*Extractor) StreamExtract added in v0.1.3

func (e *Extractor) StreamExtract(
	ctx context.Context,
	reportEvent entity.ProcessEventFunc,
	err *error,
	retryable *bool)

type Loader

type Loader struct {
	// contains filtered or unexported fields
}

func NewLoader

func NewLoader(ctx context.Context, config *Config, pf ikafka.ProducerFactory) (*Loader, error)

func (*Loader) KafkaConfig added in v0.1.3

func (l *Loader) KafkaConfig() map[string]any

func (*Loader) Shutdown

func (l *Loader) Shutdown(ctx context.Context)

func (*Loader) StreamLoad

func (l *Loader) StreamLoad(ctx context.Context, data []*entity.Transformed) (string, error, bool)

type SharedProducerFactory added in v0.3.4

type SharedProducerFactory struct {
	// contains filtered or unexported fields
}

SharedProducerFactory is a singleton per stream ID and creates and provides a shared Kafka producer for all stream instances of that stream.

func NewSharedProducerFactory added in v0.3.4

func NewSharedProducerFactory(pf ikafka.ProducerFactory) *SharedProducerFactory

func (*SharedProducerFactory) CloseProducer added in v0.3.5

func (s *SharedProducerFactory) CloseProducer(p ikafka.Producer)

CloseProducer for the non-shared case will close the Kafka producer. For the SharedProducerFactory this cannot be done since it might be used still concurrently by other goroutines. This will therefore not do anything here. For streams being shut down by disabling them, and later re-activated this will not leak any handles since the previous producer will just be re-used again based on the stream ID for that stream.

func (*SharedProducerFactory) CloseSharedProducer added in v0.4.0

func (s *SharedProducerFactory) CloseSharedProducer()

CloseSharedProducer is not part of the standard ProducerFactory interface and must only be used in cases where the caller has full control of the status of all running stream instances (extractors, including producers), which typically is the Geist's Supervisor.

func (*SharedProducerFactory) NewAdminClientFromProducer added in v0.3.4

func (s *SharedProducerFactory) NewAdminClientFromProducer(p ikafka.Producer) (ikafka.AdminClient, error)

func (*SharedProducerFactory) NewProducer added in v0.3.4

func (s *SharedProducerFactory) NewProducer(conf *kafka.ConfigMap) (ikafka.Producer, error)

NewProducer provides a shared Kafka producer. Since this method is called concurrently (e.g. from extractors' init section of StreamLoad(), creating the DLQ producer), we need to protect its logic with a mutex.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL