kafkabase

package module
v0.0.0-...-34f638f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 5, 2024 License: MIT Imports: 12 Imported by: 2

Documentation

Index

Constants

View Source
const MessageIdHeader = "message_id"

Variables

View Source
var (
	ProducerMessages = func(topicId, destinationId, mode, tableName, status, errorType string) prometheus.Counter {
		return producerMessages.WithLabelValues(topicId, destinationId, mode, tableName, status, errorType)
	}

	ProducerQueueLength = promauto.NewGauge(prometheus.GaugeOpts{
		Namespace: "bulkerapp",
		Subsystem: "producer",
		Name:      "queue_length",
	})
)

Functions

func GetKafkaHeader

func GetKafkaHeader(message *kafka.Message, key string) string

func GetKafkaIntHeader

func GetKafkaIntHeader(message *kafka.Message, name string) (int, error)

func GetKafkaTimeHeader

func GetKafkaTimeHeader(message *kafka.Message, name string) (time.Time, error)

func KafkaErrorCode

func KafkaErrorCode(err error) string

func PutKafkaHeader

func PutKafkaHeader(headers *[]kafka.Header, key string, value string)

Types

type DummyPartitionSelector

type DummyPartitionSelector struct {
}

func (*DummyPartitionSelector) SelectPartition

func (dps *DummyPartitionSelector) SelectPartition() int32

type KafkaConfig

type KafkaConfig struct {
	// KafkaBootstrapServers List of Kafka brokers separated by comma. Each broker should be in format host:port.
	KafkaBootstrapServers string `mapstructure:"KAFKA_BOOTSTRAP_SERVERS"`
	KafkaSSL              bool   `mapstructure:"KAFKA_SSL" default:"false"`
	KafkaSSLSkipVerify    bool   `mapstructure:"KAFKA_SSL_SKIP_VERIFY" default:"false"`
	//Kafka authorization as JSON object {"mechanism": "SCRAM-SHA-256|PLAIN", "username": "user", "password": "password"}
	KafkaSASL string `mapstructure:"KAFKA_SASL"`

	KafkaSessionTimeoutMs    int    `mapstructure:"KAFKA_SESSION_TIMEOUT_MS" default:"45000"`
	KafkaMaxPollIntervalMs   int    `mapstructure:"KAFKA_MAX_POLL_INTERVAL_MS" default:"300000"`
	KafkaTopicCompression    string `mapstructure:"KAFKA_TOPIC_COMPRESSION" default:"snappy"`
	KafkaTopicRetentionHours int    `mapstructure:"KAFKA_TOPIC_RETENTION_HOURS" default:"48"`
	KafkaTopicSegmentHours   int    `mapstructure:"KAFKA_TOPIC_SEGMENT_HOURS" default:"24"`

	KafkaRetryTopicSegmentBytes              int    `mapstructure:"KAFKA_RETRY_TOPIC_SEGMENT_BYTES" default:"104857600"`
	KafkaDeadTopicRetentionHours             int    `mapstructure:"KAFKA_DEAD_TOPIC_RETENTION_HOURS" default:"168"`
	KafkaTopicReplicationFactor              int    `mapstructure:"KAFKA_TOPIC_REPLICATION_FACTOR"`
	KafkaAdminMetadataTimeoutMs              int    `mapstructure:"KAFKA_ADMIN_METADATA_TIMEOUT_MS" default:"1000"`
	KafkaConsumerPartitionsAssigmentStrategy string `mapstructure:"KAFKA_CONSUMER_PARTITIONS_ASSIGMENT_STRATEGY" default:"roundrobin"`

	// KafkaDestinationsTopicName destination topic for /ingest endpoint
	KafkaDestinationsTopicName       string `mapstructure:"KAFKA_DESTINATIONS_TOPIC_NAME" default:"destination-messages"`
	KafkaDestinationsTopicPartitions int    `mapstructure:"KAFKA_DESTINATIONS_TOPIC_PARTITIONS" default:"16"`

	KafkaDestinationsRetryTopicName      string `mapstructure:"KAFKA_DESTINATIONS_RETRY_TOPIC_NAME" default:"destination-messages-retry"`
	KafkaDestinationsDeadLetterTopicName string `mapstructure:"KAFKA_DESTINATIONS_DEAD_LETTER_TOPIC_NAME" default:"destination-messages-dead-letter"`

	// ProducerWaitForDeliveryMs For ProduceSync only is a timeout for producer to wait for delivery report.
	ProducerQueueSize         int `mapstructure:"PRODUCER_QUEUE_SIZE" default:"100000"`
	ProducerBatchSize         int `mapstructure:"PRODUCER_BATCH_SIZE" default:"65535"`
	ProducerLingerMs          int `mapstructure:"PRODUCER_LINGER_MS" default:"1000"`
	ProducerWaitForDeliveryMs int `mapstructure:"PRODUCER_WAIT_FOR_DELIVERY_MS" default:"1000"`
}

func (*KafkaConfig) GetKafkaConfig

func (ac *KafkaConfig) GetKafkaConfig() *kafka.ConfigMap

GetKafkaConfig returns kafka config

func (*KafkaConfig) PostInit

func (c *KafkaConfig) PostInit(settings *appbase.AppSettings) error

type MetricsLabelsFunc

type MetricsLabelsFunc func(topicId string, status, errText string) (topic, destinationId, mode, tableName, st string, err string)

type PartitionSelector

type PartitionSelector interface {
	SelectPartition() int32
}

type Producer

type Producer struct {
	appbase.Service
	// contains filtered or unexported fields
}

func NewProducer

func NewProducer(config *KafkaConfig, kafkaConfig *kafka.ConfigMap, reportQueueLength bool, metricsLabelFunc MetricsLabelsFunc) (*Producer, error)

NewProducer creates new Producer

func (*Producer) Close

func (p *Producer) Close() error

Close closes producer

func (*Producer) ProduceAsync

func (p *Producer) ProduceAsync(topic string, messageKey string, event []byte, headers map[string]string, partition int32) error

ProduceAsync TODO: transactional delivery? produces messages to kafka

func (*Producer) ProduceSync

func (p *Producer) ProduceSync(topic string, event kafka.Message) error

ProduceSync TODO: transactional delivery? produces messages to kafka

func (*Producer) Start

func (p *Producer) Start()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL