Documentation
¶
Overview ¶
Package kafka contains receives CockroachDB CDC changefeed events that are routed via a kafka cluster.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var Set = wire.NewSet( ProvideConn, ProvideConveyorConfig, ProvideEagerConfig, ProvideSchemaWatchConfig, )
Set is used by Wire.
Functions ¶
func ProvideConveyorConfig ¶
ProvideConveyorConfig is called by Wire.
func ProvideSchemaWatchConfig ¶ added in v1.0.5
func ProvideSchemaWatchConfig(cfg *Config) *schemawatch.Config
ProvideSchemaWatchConfig is called by Wire.
Types ¶
type Config ¶
type Config struct { ConveyorConfig conveyor.Config DLQ dlq.Config SchemaWatch schemawatch.Config Script script.Config Sequencer sequencer.Config Stage stage.Config // Staging table configuration. Staging sinkprod.StagingConfig // Staging database configuration. Target sinkprod.TargetConfig TargetSchema ident.Schema TLS secure.Config BatchSize int // How many messages to accumulate before committing to the target Brokers []string // The address of the Kafka brokers Group string // the Kafka consumer group id. MaxTimestamp string // Only accept messages at or older than this timestamp MinTimestamp string // Only accept messages at or newer than this timestamp ResolvedInterval time.Duration // Minimal duration between resolved timestamps. SASL SASLConfig // SASL parameters Strategy string // Kafka consumer group re-balance strategy Topics []string // The list of topics that the consumer should use. // contains filtered or unexported fields }
Config contains the configuration necessary for creating a replication connection. ServerID and SourceConn are mandatory.
type Conn ¶
type Conn struct {
// contains filtered or unexported fields
}
Conn encapsulates all wire-connection behavior. It is responsible for receiving replication messages and replying with status updates. TODO (silvano): support Avro format, schema registry. https://github.com/cockroachdb/cdc-sink/issues/776
func ProvideConn ¶
ProvideConn is called by Wire to construct this package's logical.Dialect implementation. There's a fake dependency on the script loader so that flags can be evaluated first.
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer represents a Kafka consumer
func (*Consumer) Cleanup ¶
func (c *Consumer) Cleanup(session sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*Consumer) ConsumeClaim ¶
func (c *Consumer) ConsumeClaim( session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim, ) (err error)
ConsumeClaim processes new messages for the topic/partition specified in the claim.
type Conveyor ¶
type Conveyor interface { // AcceptMultiBatch processes a batch. The batch is committed to the target // database or to a staging area, depending on the mode in which // the connector is running. AcceptMultiBatch(context.Context, *types.MultiBatch, *types.AcceptOptions) error // Advance extends the proposed checkpoint timestamp associated with a partition. // It is called when a resolved timestamp is received by the consumer. Advance(context.Context, ident.Ident, hlc.Time) error // Ensure that a checkpoint exists for all the given partitions. It should be // called every time a new partition or topic is discovered by the consumer group. Ensure(context.Context, []ident.Ident) error // Access to the underlying schema. Watcher() types.Watcher }
Conveyor exposes the methods used by the kafka connector to deliver mutations, in batches, to the destination. It controls the checkpoint timestamp associated to each partition. We also implement a mock conveyor for testing purposes.
type EagerConfig ¶
type EagerConfig Config
EagerConfig is a hack to get Wire to move userscript evaluation to the beginning of the injector. This allows CLI flags to be set by the script.
func ProvideEagerConfig ¶
func ProvideEagerConfig(cfg *Config, _ *script.Loader) *EagerConfig
ProvideEagerConfig is a hack to move up the evaluation of the user script so that the options callbacks can set any non-script-related CLI flags.
type Kafka ¶
type Kafka struct { Conn *Conn Diagnostics *diag.Diagnostics }
Kafka is a kafka logical replication loop.
func (*Kafka) GetDiagnostics ¶
func (k *Kafka) GetDiagnostics() *diag.Diagnostics
GetDiagnostics implements stdlogical.HasDiagnostics.
type OffsetSeeker ¶
type OffsetSeeker interface { // GetOffsets finds the most recent offsets for resolved timestamp messages // that are before the given time, and in the given topics. GetOffsets([]string, hlc.Time) ([]*partitionState, error) // Close shuts down the connection with the Kafka broker. Close() error }
OffsetSeeker finds offsets within Kafka topics.
func NewOffsetSeeker ¶
func NewOffsetSeeker(config *Config) (OffsetSeeker, error)
NewOffsetSeeker instantiates an offsetManager.