relay

package
v1.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 4, 2024 License: MIT Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SASLMechanismPlain       = "PLAIN"
	SASLMechanismScramSHA256 = "SCRAM-SHA-256"
	SASLMechanismScramSHA512 = "SCRAM-SHA-512"
)
View Source
const (
	ModeFailover    = "failover"
	ModeSingle      = "single"
	IndefiniteRetry = -1
)
View Source
const (
	StateDisconnected = iota
	StateConnecting
)

Variables

View Source
var (
	SrcNetworkErrMetric = relayMetricPrefix + "source_errors_total{node_id=\"%d\", error=\"%s\"}"
	SrcsUnhealthyMetric = relayMetricPrefix + "sources_unhealthy_total"
	SrcKafkaErrMetric   = relayMetricPrefix + "source_kafka_errors_total{node_id=\"%d\", error=\"%s\"}"
	SrcHealthMetric     = relayMetricPrefix + "source_highwatermark{node_id=\"%d\"}"

	TargetNetworkErrMetric = relayMetricPrefix + "target_errors_total{error=\"%s\"}"
	TargetKafkaErrMetric   = relayMetricPrefix + "target_kafka_errors_total{error=\"%s\"}"
	RelayedMsgsMetric      = relayMetricPrefix + "msgs_total{source=\"%s\", src_partition=\"%d\", destination=\"%s\", dest_partition=\"%d\"}"

	ErrLaggingBehind = fmt.Errorf("topic end offset is lagging behind")
)
View Source
var (
	ErrorNoHealthy = errors.New("no healthy node")
)

Functions

This section is empty.

Types

type ConsumerCfg added in v1.2.0

type ConsumerCfg struct {
	KafkaCfg `koanf:",squash"`
}

ConsumerCfg is the direct consumer config.

type KafkaCfg

type KafkaCfg struct {
	// Namespace
	Name string `koanf:"name"`

	// Broker.
	BootstrapBrokers []string      `koanf:"servers"`
	SessionTimeout   time.Duration `koanf:"session_timeout"`

	// Auth.
	EnableAuth bool `koanf:"enable_auth"`
	// PLAIN/SCRAM-SHA-256/SCRAM-SHA-512
	SASLMechanism string `koanf:"sasl_mechanism"`
	Username      string `koanf:"username"`
	Password      string `koanf:"password"`

	// If enabled and the three files are passed, will
	// use the relevant certs and keys. If enabled but all three
	// file paths are empty, it will default to using DialTLS()
	EnableTLS      bool   `koanf:"enable_tls"`
	ClientKeyPath  string `koanf:"client_key_path"`
	ClientCertPath string `koanf:"client_cert_path"`
	CACertPath     string `koanf:"ca_cert_path"`

	EnableLog bool `koanf:"enable_log"`
}

KafkaCfg is the message broker's client config.

type ProducerCfg

type ProducerCfg struct {
	KafkaCfg `koanf:",squash"`

	EnableIdempotency bool          `koanf:"enable_idempotency"`
	CommitAck         string        `koanf:"commit_ack_type"` // tcp|leader|cluster|default
	MaxRetries        int           `koanf:"max_retries"`
	FlushFrequency    time.Duration `koanf:"flush_frequency"`
	MaxMessageBytes   int           `koanf:"max_message_bytes"`
	BatchSize         int           `koanf:"batch_size"`
	BufferSize        int           `koanf:"buffer_size"`
	FlushBatchSize    int           `koanf:"flush_batch_size"`
	Compression       string        `koanf:"compression"` // gzip|snappy|lz4|zstd|none
}

ProducerCfg is the Kafka producer config.

type Relay

type Relay struct {
	// contains filtered or unexported fields
}

Relay represents the input, output kafka and the remapping necessary to forward messages from one topic to another.

func NewRelay

func NewRelay(cfg RelayCfg, src *SourcePool, target *Target, topics Topics, filters map[string]filter.Provider, log *slog.Logger) (*Relay, error)

func (*Relay) Start

func (re *Relay) Start(globalCtx context.Context) error

Start starts the consumer loop on kafka (A), fetch messages and relays over to kafka (B) using an async

type RelayCfg

type RelayCfg struct {
	StopAtEnd bool
}

type Server

type Server struct {
	Config ConsumerCfg
	ID     int

	// Weight is the cumulative high watermark (offset) of every single topic
	// on a source. This is used for comparing lags between different sources
	// based on a threshold. If a server is unhealthy, the weight is marked as -1.
	Weight int64

	Healthy bool

	// This is only set when a new live Kafka consumer connection is established
	// on demand via Get(), where a server{} is returned. Internally, no connections
	// are maintained on SourcePool.[]servers and only the config, weight etc.
	// params are used to keep track of healthy servers.
	Client *kgo.Client
}

Server represents a source Server's config with health and weight parameters which are used for tracking health status.

type SourcePool

type SourcePool struct {
	sync.Mutex
	// contains filtered or unexported fields
}

SourcePool manages the source Kafka instances and consumption.

func NewSourcePool

func NewSourcePool(cfg SourcePoolCfg, serverCfgs []ConsumerCfg, topics Topics, targetOffsets TopicOffsets, m *metrics.Set, log *slog.Logger) (*SourcePool, error)

NewSourcePool returns a controller instance that manages the lifecycle of a pool of N source (consumer) servers. The pool always attempts to find one healthy node for the relay to consume from.

func (*SourcePool) Close

func (sp *SourcePool) Close()

Close closes the active source Kafka client.

func (*SourcePool) Get

func (sp *SourcePool) Get(globalCtx context.Context) (*Server, error)

Get attempts return a healthy source Kafka client connection. It internally applies backoff/retries between connection attempts and thus can take indefinitely long to return based on the config.

func (*SourcePool) GetFetches

func (sp *SourcePool) GetFetches(s *Server) (kgo.Fetches, error)

GetFetches retrieves a Kafka fetch iterator to retrieve individual messages from.

func (*SourcePool) GetHighWatermark

func (sp *SourcePool) GetHighWatermark(ctx context.Context, cl *kgo.Client) (kadm.ListedOffsets, error)

func (*SourcePool) RecordOffsets

func (sp *SourcePool) RecordOffsets(rec *kgo.Record) error

RecordOffsets records the offsets of the latest fetched records per topic. This is used to resume consumption on new connections/reconnections from the source during runtime.

type SourcePoolCfg

type SourcePoolCfg struct {
	HealthCheckInterval time.Duration
	ReqTimeout          time.Duration
	LagThreshold        int64
	MaxRetries          int
	EnableBackoff       bool
	BackoffMin          time.Duration
	BackoffMax          time.Duration
}

type Target

type Target struct {
	// contains filtered or unexported fields
}

Target is a holder for the kafka Target client.

func NewTarget

func NewTarget(globalCtx context.Context, cfg TargetCfg, pCfg ProducerCfg, topics Topics, m *metrics.Set, log *slog.Logger) (*Target, error)

NewTarget returns a new producer relay that handles target Kafka instances.

func (*Target) Close

func (tg *Target) Close()

Close remove the producer topics from &kgo.Client.

func (*Target) GetBatchCh

func (tg *Target) GetBatchCh() chan *kgo.Record

GetBatchCh returns the Producer batch channel.

func (*Target) GetHighWatermark

func (tg *Target) GetHighWatermark() (kadm.ListedOffsets, error)

GetHighWatermark returns the offsets on the target topics.

func (*Target) Start

func (tg *Target) Start() error

Start starts the blocking producer which flushes messages to the target Kafka.

type TargetCfg

type TargetCfg struct {
	ReqTimeout    time.Duration
	EnableBackoff bool
	BackoffMin    time.Duration
	BackoffMax    time.Duration
}

TargetCfg is the producer/target Kafka config.

type Topic

type Topic struct {
	SourceTopic         string
	TargetTopic         string
	TargetPartition     uint
	AutoTargetPartition bool
}

Topic represents a source->target topic configuration.

type TopicOffsets added in v1.2.0

type TopicOffsets map[string]map[int32]kgo.Offset

TopicOffsets defines topic->partition->offset map for any src/target kafka cluster

type Topics

type Topics map[string]Topic

Topics is an abstraction over source->target topic map.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL