Documentation ¶
Index ¶
- Constants
- func GetStreamCurrentSize(ctx context.Context, configs map[string]interface{}, ...) (int64, error)
- func PopulateBatchFromMSG(ctx context.Context, ka KafkaAdapterInterface, typs []types.Type, ...) (*batch.Batch, error)
- func RetrieveData(ctx context.Context, msgs []*kafka.Message, configs map[string]interface{}, ...) (*batch.Batch, error)
- func ValidateConfig(ctx context.Context, configs map[string]interface{}, ...) error
- type DataGetter
- type JsonDataGetter
- type KafkaAdapter
- func (ka *KafkaAdapter) BatchRead(topic string, startOffset int64, limit int, batchSize int) ([]*kafka.Message, error)
- func (ka *KafkaAdapter) Close()
- func (ka *KafkaAdapter) CreateTopic(ctx context.Context, topicName string, partitions int, replicationFactor int) error
- func (ka *KafkaAdapter) DescribeTopicDetails(ctx context.Context, topicName string) (*kafka.TopicMetadata, error)
- func (ka *KafkaAdapter) GetKafkaConsumer() (*kafka.Consumer, error)
- func (ka *KafkaAdapter) GetSchemaForTopic(topic string, isKey bool) (schemaregistry.SchemaMetadata, error)
- func (ka *KafkaAdapter) InitSchemaRegistry(url string) error
- func (ka *KafkaAdapter) ProduceMessage(topic string, key, value []byte) (int64, error)
- func (ka *KafkaAdapter) ReadMessagesFromPartition(topic string, partition int32, offset int64, limit int) ([]*kafka.Message, error)
- func (ka *KafkaAdapter) ReadMessagesFromTopic(topic string, offset int64, limit int64, configs map[string]interface{}) ([]*kafka.Message, error)
- type KafkaAdapterFactory
- type KafkaAdapterInterface
- type ProtoDataGetter
- type ValueType
Constants ¶
View Source
const ( TypeKey = "type" TopicKey = "topic" DatabaseKey = "database" TimeWindowKey = "time_window" BufferLimitKey = "buffer_limit" TableKey = "table" ValueKey = "value" PartitionKey = "partition" RelkindKey = "relkind" BootstrapServersKey = "bootstrap.servers" ProtobufSchemaKey = "protobuf.schema" ProtobufMessagekey = "protobuf.message" SchemaRegistryKey = "schema.registry" JSON ValueType = "json" AVRO ValueType = "avro" PROTOBUF ValueType = "protobuf" PROTOBUFSR ValueType = "protobuf_sr" CREATED_AT = "created_at" )
Variables ¶
This section is empty.
Functions ¶
func GetStreamCurrentSize ¶
func PopulateBatchFromMSG ¶
func RetrieveData ¶
func ValidateConfig ¶
Types ¶
type DataGetter ¶
type JsonDataGetter ¶
type JsonDataGetter struct { Key []byte Value []byte Data map[string]interface{} // Cache the parsed JSON for efficiency }
func (*JsonDataGetter) GetFieldValue ¶
func (j *JsonDataGetter) GetFieldValue(name string) (interface{}, bool)
type KafkaAdapter ¶
type KafkaAdapter struct { Producer *kafka.Producer Consumer *kafka.Consumer AdminClient *kafka.AdminClient SchemaRegistry schemaregistry.Client ConfigMap *kafka.ConfigMap Connected bool }
func (*KafkaAdapter) Close ¶
func (ka *KafkaAdapter) Close()
func (*KafkaAdapter) CreateTopic ¶
func (*KafkaAdapter) DescribeTopicDetails ¶
func (ka *KafkaAdapter) DescribeTopicDetails(ctx context.Context, topicName string) (*kafka.TopicMetadata, error)
func (*KafkaAdapter) GetKafkaConsumer ¶
func (ka *KafkaAdapter) GetKafkaConsumer() (*kafka.Consumer, error)
func (*KafkaAdapter) GetSchemaForTopic ¶
func (ka *KafkaAdapter) GetSchemaForTopic(topic string, isKey bool) (schemaregistry.SchemaMetadata, error)
func (*KafkaAdapter) InitSchemaRegistry ¶
func (ka *KafkaAdapter) InitSchemaRegistry(url string) error
func (*KafkaAdapter) ProduceMessage ¶
func (ka *KafkaAdapter) ProduceMessage(topic string, key, value []byte) (int64, error)
func (*KafkaAdapter) ReadMessagesFromPartition ¶
func (*KafkaAdapter) ReadMessagesFromTopic ¶
type KafkaAdapterFactory ¶
type KafkaAdapterFactory func(configMap *kafka.ConfigMap) (KafkaAdapterInterface, error)
type KafkaAdapterInterface ¶
type KafkaAdapterInterface interface { InitSchemaRegistry(url string) error Close() CreateTopic(ctx context.Context, topicName string, partitions int, replicationFactor int) error DescribeTopicDetails(ctx context.Context, topicName string) (*kafka.TopicMetadata, error) ReadMessagesFromPartition(topic string, partition int32, offset int64, limit int) ([]*kafka.Message, error) ReadMessagesFromTopic(topic string, offset int64, limit int64, configs map[string]interface{}) ([]*kafka.Message, error) GetSchemaForTopic(topic string, isKey bool) (schemaregistry.SchemaMetadata, error) GetKafkaConsumer() (*kafka.Consumer, error) ProduceMessage(topic string, key, value []byte) (int64, error) }
func NewKafkaAdapter ¶
func NewKafkaAdapter(configMap *kafka.ConfigMap) (KafkaAdapterInterface, error)
type ProtoDataGetter ¶
func (*ProtoDataGetter) GetFieldValue ¶
func (p *ProtoDataGetter) GetFieldValue(name string) (interface{}, bool)
Click to show internal directories.
Click to hide internal directories.