mokafka

package
v1.2.3-hotfix-20240916 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 14, 2024 License: Apache-2.0 Imports: 24 Imported by: 0

Documentation

Index

Constants

View Source
const (
	TypeKey  = "type"
	TopicKey = "topic"

	DatabaseKey = "database"

	TimeWindowKey = "time_window"

	BufferLimitKey = "buffer_limit"

	TableKey = "table"

	ValueKey = "value"

	PartitionKey        = "partition"
	RelkindKey          = "relkind"
	BootstrapServersKey = "bootstrap.servers"
	ProtobufSchemaKey   = "protobuf.schema"
	ProtobufMessagekey  = "protobuf.message"

	SchemaRegistryKey = "schema.registry"

	JSON       ValueType = "json"
	AVRO       ValueType = "avro"
	PROTOBUF   ValueType = "protobuf"
	PROTOBUFSR ValueType = "protobuf_sr"

	CREATED_AT = "created_at"
)

Variables

This section is empty.

Functions

func GetStreamCurrentSize

func GetStreamCurrentSize(ctx context.Context, configs map[string]interface{}, factory KafkaAdapterFactory) (int64, error)

func PopulateBatchFromMSG

func PopulateBatchFromMSG(ctx context.Context, ka KafkaAdapterInterface, typs []types.Type, attrKeys []string, msgs []*kafka.Message, configs map[string]interface{}, mp *mpool.MPool) (*batch.Batch, error)

func RetrieveData

func RetrieveData(ctx context.Context, msgs []*kafka.Message, configs map[string]interface{}, attrs []string, types []types.Type, offset int64, limit int64, mp *mpool.MPool, factory KafkaAdapterFactory) (*batch.Batch, error)

func ValidateConfig

func ValidateConfig(ctx context.Context, configs map[string]interface{}, factory func(configMap *kafka.ConfigMap) (KafkaAdapterInterface, error)) error

Types

type DataGetter

type DataGetter interface {
	GetFieldValue(name string) (interface{}, bool)
}

type JsonDataGetter

type JsonDataGetter struct {
	Key   []byte
	Value []byte
	Data  map[string]interface{} // Cache the parsed JSON for efficiency
}

func (*JsonDataGetter) GetFieldValue

func (j *JsonDataGetter) GetFieldValue(name string) (interface{}, bool)

type KafkaAdapter

type KafkaAdapter struct {
	Producer       *kafka.Producer
	Consumer       *kafka.Consumer
	AdminClient    *kafka.AdminClient
	SchemaRegistry schemaregistry.Client
	ConfigMap      *kafka.ConfigMap
	Connected      bool
}

func (*KafkaAdapter) BatchRead

func (ka *KafkaAdapter) BatchRead(topic string, startOffset int64, limit int, batchSize int) ([]*kafka.Message, error)

func (*KafkaAdapter) Close

func (ka *KafkaAdapter) Close()

func (*KafkaAdapter) CreateTopic

func (ka *KafkaAdapter) CreateTopic(ctx context.Context, topicName string, partitions int, replicationFactor int) error

func (*KafkaAdapter) DescribeTopicDetails

func (ka *KafkaAdapter) DescribeTopicDetails(ctx context.Context, topicName string) (*kafka.TopicMetadata, error)

func (*KafkaAdapter) GetKafkaConsumer

func (ka *KafkaAdapter) GetKafkaConsumer() (*kafka.Consumer, error)

func (*KafkaAdapter) GetSchemaForTopic

func (ka *KafkaAdapter) GetSchemaForTopic(topic string, isKey bool) (schemaregistry.SchemaMetadata, error)

func (*KafkaAdapter) InitSchemaRegistry

func (ka *KafkaAdapter) InitSchemaRegistry(url string) error

func (*KafkaAdapter) ProduceMessage

func (ka *KafkaAdapter) ProduceMessage(topic string, key, value []byte) (int64, error)

func (*KafkaAdapter) ReadMessagesFromPartition

func (ka *KafkaAdapter) ReadMessagesFromPartition(topic string, partition int32, offset int64, limit int) ([]*kafka.Message, error)

func (*KafkaAdapter) ReadMessagesFromTopic

func (ka *KafkaAdapter) ReadMessagesFromTopic(topic string, offset int64, limit int64, configs map[string]interface{}) ([]*kafka.Message, error)

type KafkaAdapterFactory

type KafkaAdapterFactory func(configMap *kafka.ConfigMap) (KafkaAdapterInterface, error)

type KafkaAdapterInterface

type KafkaAdapterInterface interface {
	InitSchemaRegistry(url string) error
	Close()
	CreateTopic(ctx context.Context, topicName string, partitions int, replicationFactor int) error
	DescribeTopicDetails(ctx context.Context, topicName string) (*kafka.TopicMetadata, error)
	ReadMessagesFromPartition(topic string, partition int32, offset int64, limit int) ([]*kafka.Message, error)
	ReadMessagesFromTopic(topic string, offset int64, limit int64, configs map[string]interface{}) ([]*kafka.Message, error)
	GetSchemaForTopic(topic string, isKey bool) (schemaregistry.SchemaMetadata, error)

	GetKafkaConsumer() (*kafka.Consumer, error)
	ProduceMessage(topic string, key, value []byte) (int64, error)
}

func NewKafkaAdapter

func NewKafkaAdapter(configMap *kafka.ConfigMap) (KafkaAdapterInterface, error)

type ProtoDataGetter

type ProtoDataGetter struct {
	Value *dynamic.Message
	Key   any
}

func (*ProtoDataGetter) GetFieldValue

func (p *ProtoDataGetter) GetFieldValue(name string) (interface{}, bool)

type ValueType

type ValueType string

Directories

Path Synopsis
test

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL