gki

package
v0.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 24, 2022 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

View Source
const (
	PropAutoCommit      = "enable.auto.commit"
	PropAutoOffsetStore = "enable.auto.offset.store"
)

Kafka props which cannot be overridden, required by Extractor

View Source
const DefaultPollTimeoutMs = 3000

Variables

This section is empty.

Functions

This section is empty.

Types

type AdminClient

type AdminClient interface {
	GetMetadata(topic *string, allTopics bool, timeoutMs int) (*kafka.Metadata, error)
	CreateTopics(ctx context.Context, topics []kafka.TopicSpecification, options ...kafka.CreateTopicsAdminOption) ([]kafka.TopicResult, error)
}

type Config

type Config struct {
	// contains filtered or unexported fields
}

Config is the internal config used by each extractor/loader, combining config from external Config with config from what is inside the stream spec for this specific stream

func NewExtractorConfig

func NewExtractorConfig(
	spec *entity.Spec,
	topics []string,
	topicCreationMutex *sync.Mutex) *Config

func NewLoaderConfig

func NewLoaderConfig(
	spec *entity.Spec,
	topic *entity.TopicSpecification,
	topicCreationMutex *sync.Mutex,
	sync bool) *Config

func (*Config) SetCreateTopics added in v0.1.3

func (c *Config) SetCreateTopics(value bool)

func (*Config) SetKafkaProperty

func (c *Config) SetKafkaProperty(prop string, value any)

func (*Config) SetPollTimout

func (c *Config) SetPollTimout(timeout int)

func (*Config) SetProps

func (c *Config) SetProps(props ConfigMap)

func (*Config) String

func (c *Config) String() string

type ConfigMap

type ConfigMap map[string]any

type Consumer

type Consumer interface {
	SubscribeTopics(topics []string, rebalanceCb kafka.RebalanceCb) error
	Poll(timeoutMs int) (event kafka.Event)
	StoreOffsets(offsets []kafka.TopicPartition) ([]kafka.TopicPartition, error)
	Commit() ([]kafka.TopicPartition, error)
	CommitMessage(m *kafka.Message) ([]kafka.TopicPartition, error)
	Close() error
}

Consumer interface is used to enable full unit testing

type ConsumerFactory

type ConsumerFactory interface {
	NewConsumer(conf *kafka.ConfigMap) (Consumer, error)
	NewAdminClientFromConsumer(c Consumer) (AdminClient, error)
}

type DefaultAdminClient

type DefaultAdminClient struct {
	// contains filtered or unexported fields
}

func (DefaultAdminClient) CreateTopics

func (DefaultAdminClient) GetMetadata

func (d DefaultAdminClient) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*kafka.Metadata, error)

type DefaultConsumerFactory

type DefaultConsumerFactory struct{}

func (DefaultConsumerFactory) NewAdminClientFromConsumer

func (d DefaultConsumerFactory) NewAdminClientFromConsumer(c Consumer) (AdminClient, error)

func (DefaultConsumerFactory) NewConsumer

func (d DefaultConsumerFactory) NewConsumer(conf *kafka.ConfigMap) (Consumer, error)

type DefaultProducerFactory

type DefaultProducerFactory struct{}

func (DefaultProducerFactory) NewAdminClientFromProducer

func (d DefaultProducerFactory) NewAdminClientFromProducer(p Producer) (AdminClient, error)

func (DefaultProducerFactory) NewProducer

func (d DefaultProducerFactory) NewProducer(conf *kafka.ConfigMap) (Producer, error)

type Extractor added in v0.1.3

type Extractor struct {
	// contains filtered or unexported fields
}

func NewExtractor

func NewExtractor(config *Config, id string) (*Extractor, error)

func (*Extractor) Extract added in v0.1.3

func (e *Extractor) Extract(
	ctx context.Context,
	query entity.ExtractorQuery,
	result any) (error, bool)

func (*Extractor) ExtractFromSink added in v0.1.3

func (e *Extractor) ExtractFromSink(
	ctx context.Context,
	query entity.ExtractorQuery,
	result *[]*entity.Transformed) (error, bool)

func (*Extractor) KafkaConfig added in v0.1.3

func (e *Extractor) KafkaConfig() (pollTimeoutMs int, cfgMap map[string]any)

func (*Extractor) SendToSource added in v0.1.3

func (e *Extractor) SendToSource(ctx context.Context, eventData any) (string, error)

func (*Extractor) SetConsumerFactory added in v0.1.3

func (e *Extractor) SetConsumerFactory(cf ConsumerFactory)

func (*Extractor) SetProducerFactory added in v0.1.3

func (e *Extractor) SetProducerFactory(pf ProducerFactory)

func (*Extractor) StreamExtract added in v0.1.3

func (e *Extractor) StreamExtract(
	ctx context.Context,
	reportEvent entity.ProcessEventFunc,
	err *error,
	retryable *bool)

type Loader

type Loader struct {
	// contains filtered or unexported fields
}

func NewLoader

func NewLoader(ctx context.Context, config *Config, id string, pf ProducerFactory) (*Loader, error)

func (*Loader) KafkaConfig added in v0.1.3

func (l *Loader) KafkaConfig() map[string]any

func (*Loader) Shutdown

func (l *Loader) Shutdown()

func (*Loader) StreamLoad

func (l *Loader) StreamLoad(ctx context.Context, data []*entity.Transformed) (string, error, bool)

type Producer

type Producer interface {
	Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error
	Events() chan kafka.Event
	Flush(timeoutMs int) int
	Close()
}

Producer interface is used to enable full unit testing

type ProducerFactory

type ProducerFactory interface {
	NewProducer(conf *kafka.ConfigMap) (Producer, error)
	NewAdminClientFromProducer(p Producer) (AdminClient, error)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL