Documentation ¶
Index ¶
- Constants
- Variables
- func GetKafkaHeader(message *kafka.Message, key string) string
- func GetKafkaIntHeader(message *kafka.Message, name string) (int, error)
- func GetKafkaTimeHeader(message *kafka.Message, name string) (time.Time, error)
- func KafkaErrorCode(err error) string
- func PutKafkaHeader(headers *[]kafka.Header, key string, value string)
- type DummyPartitionSelector
- type KafkaConfig
- type MetricsLabelsFunc
- type PartitionSelector
- type Producer
Constants ¶
View Source
const MessageIdHeader = "message_id"
Variables ¶
View Source
var ( ProducerMessages = func(topicId, destinationId, mode, tableName, status, errorType string) prometheus.Counter { return producerMessages.WithLabelValues(topicId, destinationId, mode, tableName, status, errorType) } ProducerQueueLength = promauto.NewGauge(prometheus.GaugeOpts{ Namespace: "bulkerapp", Subsystem: "producer", Name: "queue_length", }) )
Functions ¶
func GetKafkaTimeHeader ¶
func KafkaErrorCode ¶
Types ¶
type DummyPartitionSelector ¶
type DummyPartitionSelector struct { }
func (*DummyPartitionSelector) SelectPartition ¶
func (dps *DummyPartitionSelector) SelectPartition() int32
type KafkaConfig ¶
type KafkaConfig struct { // KafkaBootstrapServers List of Kafka brokers separated by comma. Each broker should be in format host:port. KafkaBootstrapServers string `mapstructure:"KAFKA_BOOTSTRAP_SERVERS"` KafkaSSL bool `mapstructure:"KAFKA_SSL" default:"false"` KafkaSSLSkipVerify bool `mapstructure:"KAFKA_SSL_SKIP_VERIFY" default:"false"` KafkaSSLCA string `mapstructure:"KAFKA_SSL_CA"` KafkaSSLCAFile string `mapstructure:"KAFKA_SSL_CA_FILE"` // Kafka authorization as JSON object {"mechanism": "SCRAM-SHA-256|PLAIN", "username": "user", "password": "password"} KafkaSASL string `mapstructure:"KAFKA_SASL"` KafkaSessionTimeoutMs int `mapstructure:"KAFKA_SESSION_TIMEOUT_MS" default:"45000"` KafkaMaxPollIntervalMs int `mapstructure:"KAFKA_MAX_POLL_INTERVAL_MS" default:"300000"` KafkaTopicCompression string `mapstructure:"KAFKA_TOPIC_COMPRESSION" default:"snappy"` KafkaTopicRetentionHours int `mapstructure:"KAFKA_TOPIC_RETENTION_HOURS" default:"48"` KafkaTopicSegmentHours int `mapstructure:"KAFKA_TOPIC_SEGMENT_HOURS" default:"24"` KafkaTopicPrefix string `mapstructure:"KAFKA_TOPIC_PREFIX" default:""` KafkaRetryTopicSegmentBytes int `mapstructure:"KAFKA_RETRY_TOPIC_SEGMENT_BYTES" default:"104857600"` KafkaDeadTopicRetentionHours int `mapstructure:"KAFKA_DEAD_TOPIC_RETENTION_HOURS" default:"168"` KafkaTopicReplicationFactor int `mapstructure:"KAFKA_TOPIC_REPLICATION_FACTOR"` KafkaAdminMetadataTimeoutMs int `mapstructure:"KAFKA_ADMIN_METADATA_TIMEOUT_MS" default:"1000"` KafkaConsumerPartitionsAssigmentStrategy string `mapstructure:"KAFKA_CONSUMER_PARTITIONS_ASSIGMENT_STRATEGY" default:"roundrobin"` // KafkaDestinationsTopicName destination topic for /ingest endpoint KafkaDestinationsTopicName string `mapstructure:"KAFKA_DESTINATIONS_TOPIC_NAME" default:"destination-messages"` KafkaDestinationsTopicPartitions int `mapstructure:"KAFKA_DESTINATIONS_TOPIC_PARTITIONS" default:"16"` KafkaDestinationsRetryTopicName string `mapstructure:"KAFKA_DESTINATIONS_RETRY_TOPIC_NAME" default:"destination-messages-retry"` KafkaDestinationsDeadLetterTopicName string `mapstructure:"KAFKA_DESTINATIONS_DEAD_LETTER_TOPIC_NAME" default:"destination-messages-dead-letter"` // ProducerWaitForDeliveryMs For ProduceSync only is a timeout for producer to wait for delivery report. ProducerQueueSize int `mapstructure:"PRODUCER_QUEUE_SIZE" default:"100000"` ProducerBatchSize int `mapstructure:"PRODUCER_BATCH_SIZE" default:"65535"` ProducerLingerMs int `mapstructure:"PRODUCER_LINGER_MS" default:"1000"` ProducerWaitForDeliveryMs int `mapstructure:"PRODUCER_WAIT_FOR_DELIVERY_MS" default:"1000"` }
func (*KafkaConfig) GetKafkaConfig ¶
func (ac *KafkaConfig) GetKafkaConfig() *kafka.ConfigMap
GetKafkaConfig returns kafka config
func (*KafkaConfig) PostInit ¶
func (c *KafkaConfig) PostInit(settings *appbase.AppSettings) error
type MetricsLabelsFunc ¶
type PartitionSelector ¶
type PartitionSelector interface {
SelectPartition() int32
}
type Producer ¶
func NewProducer ¶
func NewProducer(config *KafkaConfig, kafkaConfig *kafka.ConfigMap, reportQueueLength bool, metricsLabelFunc MetricsLabelsFunc) (*Producer, error)
NewProducer creates new Producer
func (*Producer) ProduceAsync ¶
func (p *Producer) ProduceAsync(topic string, messageKey string, event []byte, headers map[string]string, partition int32) error
ProduceAsync TODO: transactional delivery? produces messages to kafka
func (*Producer) ProduceSync ¶
ProduceSync TODO: transactional delivery? produces messages to kafka
Click to show internal directories.
Click to hide internal directories.