Documentation ¶
Index ¶
- Constants
- Variables
- type ConsumerCfg
- type KafkaCfg
- type ProducerCfg
- type Relay
- type RelayCfg
- type Server
- type SourcePool
- func (sp *SourcePool) Close()
- func (sp *SourcePool) Get(globalCtx context.Context) (*Server, error)
- func (sp *SourcePool) GetFetches(s *Server) (kgo.Fetches, error)
- func (sp *SourcePool) GetHighWatermark(ctx context.Context, cl *kgo.Client) (kadm.ListedOffsets, error)
- func (sp *SourcePool) RecordOffsets(rec *kgo.Record) error
- type SourcePoolCfg
- type Target
- type TargetCfg
- type Topic
- type TopicOffsets
- type Topics
Constants ¶
const ( SASLMechanismPlain = "PLAIN" SASLMechanismScramSHA256 = "SCRAM-SHA-256" SASLMechanismScramSHA512 = "SCRAM-SHA-512" )
const ( ModeFailover = "failover" ModeSingle = "single" IndefiniteRetry = -1 )
const ( StateDisconnected = iota StateConnecting )
Variables ¶
var ( SrcNetworkErrMetric = relayMetricPrefix + "source_errors_total{node_id=\"%d\", error=\"%s\"}" SrcsUnhealthyMetric = relayMetricPrefix + "sources_unhealthy_total" SrcKafkaErrMetric = relayMetricPrefix + "source_kafka_errors_total{node_id=\"%d\", error=\"%s\"}" SrcHealthMetric = relayMetricPrefix + "source_highwatermark{node_id=\"%d\"}" TargetNetworkErrMetric = relayMetricPrefix + "target_errors_total{error=\"%s\"}" TargetKafkaErrMetric = relayMetricPrefix + "target_kafka_errors_total{error=\"%s\"}" RelayedMsgsMetric = relayMetricPrefix + "msgs_total{source=\"%s\", src_partition=\"%d\", destination=\"%s\", dest_partition=\"%d\"}" ErrLaggingBehind = fmt.Errorf("topic end offset is lagging behind") )
var (
ErrorNoHealthy = errors.New("no healthy node")
)
Functions ¶
This section is empty.
Types ¶
type ConsumerCfg ¶ added in v1.2.0
type ConsumerCfg struct {
KafkaCfg `koanf:",squash"`
}
ConsumerCfg is the direct consumer config.
type KafkaCfg ¶
type KafkaCfg struct { // Namespace Name string `koanf:"name"` // Broker. BootstrapBrokers []string `koanf:"servers"` SessionTimeout time.Duration `koanf:"session_timeout"` // Auth. EnableAuth bool `koanf:"enable_auth"` // PLAIN/SCRAM-SHA-256/SCRAM-SHA-512 SASLMechanism string `koanf:"sasl_mechanism"` Username string `koanf:"username"` Password string `koanf:"password"` // If enabled and the three files are passed, will // use the relevant certs and keys. If enabled but all three // file paths are empty, it will default to using DialTLS() EnableTLS bool `koanf:"enable_tls"` ClientKeyPath string `koanf:"client_key_path"` ClientCertPath string `koanf:"client_cert_path"` CACertPath string `koanf:"ca_cert_path"` EnableLog bool `koanf:"enable_log"` }
KafkaCfg is the message broker's client config.
type ProducerCfg ¶
type ProducerCfg struct { KafkaCfg `koanf:",squash"` EnableIdempotency bool `koanf:"enable_idempotency"` CommitAck string `koanf:"commit_ack_type"` // tcp|leader|cluster|default MaxRetries int `koanf:"max_retries"` FlushFrequency time.Duration `koanf:"flush_frequency"` MaxMessageBytes int `koanf:"max_message_bytes"` BatchSize int `koanf:"batch_size"` BufferSize int `koanf:"buffer_size"` FlushBatchSize int `koanf:"flush_batch_size"` Compression string `koanf:"compression"` // gzip|snappy|lz4|zstd|none }
ProducerCfg is the Kafka producer config.
type Relay ¶
type Relay struct {
// contains filtered or unexported fields
}
Relay represents the input, output kafka and the remapping necessary to forward messages from one topic to another.
type Server ¶
type Server struct { Config ConsumerCfg ID int // Weight is the cumulative high watermark (offset) of every single topic // on a source. This is used for comparing lags between different sources // based on a threshold. If a server is unhealthy, the weight is marked as -1. Weight int64 Healthy bool // This is only set when a new live Kafka consumer connection is established // on demand via Get(), where a server{} is returned. Internally, no connections // are maintained on SourcePool.[]servers and only the config, weight etc. // params are used to keep track of healthy servers. Client *kgo.Client }
Server represents a source Server's config with health and weight parameters which are used for tracking health status.
type SourcePool ¶
SourcePool manages the source Kafka instances and consumption.
func NewSourcePool ¶
func NewSourcePool(cfg SourcePoolCfg, serverCfgs []ConsumerCfg, topics Topics, targetOffsets TopicOffsets, m *metrics.Set, log *slog.Logger) (*SourcePool, error)
NewSourcePool returns a controller instance that manages the lifecycle of a pool of N source (consumer) servers. The pool always attempts to find one healthy node for the relay to consume from.
func (*SourcePool) Close ¶
func (sp *SourcePool) Close()
Close closes the active source Kafka client.
func (*SourcePool) Get ¶
func (sp *SourcePool) Get(globalCtx context.Context) (*Server, error)
Get attempts return a healthy source Kafka client connection. It internally applies backoff/retries between connection attempts and thus can take indefinitely long to return based on the config.
func (*SourcePool) GetFetches ¶
func (sp *SourcePool) GetFetches(s *Server) (kgo.Fetches, error)
GetFetches retrieves a Kafka fetch iterator to retrieve individual messages from.
func (*SourcePool) GetHighWatermark ¶
func (sp *SourcePool) GetHighWatermark(ctx context.Context, cl *kgo.Client) (kadm.ListedOffsets, error)
func (*SourcePool) RecordOffsets ¶
func (sp *SourcePool) RecordOffsets(rec *kgo.Record) error
RecordOffsets records the offsets of the latest fetched records per topic. This is used to resume consumption on new connections/reconnections from the source during runtime.
type SourcePoolCfg ¶
type Target ¶
type Target struct {
// contains filtered or unexported fields
}
Target is a holder for the kafka Target client.
func NewTarget ¶
func NewTarget(globalCtx context.Context, cfg TargetCfg, pCfg ProducerCfg, topics Topics, m *metrics.Set, log *slog.Logger) (*Target, error)
NewTarget returns a new producer relay that handles target Kafka instances.
func (*Target) GetBatchCh ¶
GetBatchCh returns the Producer batch channel.
func (*Target) GetHighWatermark ¶
func (tg *Target) GetHighWatermark() (kadm.ListedOffsets, error)
GetHighWatermark returns the offsets on the target topics.
type TargetCfg ¶
type TargetCfg struct { ReqTimeout time.Duration EnableBackoff bool BackoffMin time.Duration BackoffMax time.Duration }
TargetCfg is the producer/target Kafka config.
type Topic ¶
type Topic struct { SourceTopic string TargetTopic string TargetPartition uint AutoTargetPartition bool }
Topic represents a source->target topic configuration.
type TopicOffsets ¶ added in v1.2.0
TopicOffsets defines topic->partition->offset map for any src/target kafka cluster