Documentation ¶
Index ¶
- func CreateKafkaTopic(ctx context.Context, topic string, p *confluent.Producer, numPartitions int, ...) error
- type Main
- type PutCmd
- type PutSource
- type Record
- type Schema
- type Source
- func (s *Source) Close() error
- func (s *Source) CommitMessages(recs []confluent.TopicPartition) ([]confluent.TopicPartition, error)
- func (s *Source) Open() error
- func (s *Source) Record() (idk.Record, error)
- func (s *Source) Schema() []idk.Field
- func (s *Source) SchemaID() int
- func (s *Source) SchemaMetadata() string
- func (s *Source) SchemaSchema() string
- func (s *Source) SchemaSubject() string
- func (s *Source) SchemaVersion() int
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Main ¶
type Main struct { idk.Main `flag:"!embed"` idk.ConfluentCommand `flag:"!embed"` Group string `help:"Kafka group."` Topics []string `help:"Kafka topics to read from."` Timeout time.Duration `help:"Time to wait for more records from Kafka before flushing a batch. 0 to disable."` SkipOld bool `short:"" help:"False sets kafka consumer configuration auto.offset.reset to earliest, True sets it to latest."` ConsumerCloseTimeout int `help:"The amount of time in seconds to wait for the consumer to close properly."` }
type PutCmd ¶
type PutSource ¶
type PutSource struct { idk.ConfluentCommand Topic string `help:"Kafka topic to post to."` Subject string `help:"Kafka schema subject."` BatchSize int `help:"Size of record batches to submit to Kafka."` // FB specific config for setting partition key efficiently FBPrimaryKeyFields []string FBIndexName string FBIDField string Concurrency int `help:"Number of concurrent sources and indexing routines to launch."` TrackProgress bool `help:"Periodically print status updates on how many records have been sourced." short:""` ReplicationFactor int `help:"set replication factor for kafka cluster"` NumPartitions int `help:"set partition for kafka cluster"` // NewSource must be set by the user of Main before calling // Main.Run. Main.Run will call this function "Concurrency" times. It // is the job of this function to ensure that the concurrent // sources which are started partition work appropriately. This is // typically set up (by convention) in the Source's package in // cmd.go NewSource func() (idk.Source, error) `flag:"-"` Log logger.Logger ConfigMap *confluent.ConfigMap `flag:"-"` Target string // contains filtered or unexported fields }
PutSource represents a kafka put process that operates on one or more idk.Source.
func NewPutSource ¶
NewPutSource returns a new instance of PutSource.
type Record ¶
type Record struct {
// contains filtered or unexported fields
}
func (*Record) StreamOffset ¶
type Schema ¶
type Schema struct { Schema string `json:"schema"` // The actual AVRO schema Subject string `json:"subject"` // Subject where the schema is registered for Version int `json:"version"` // Version within this subject ID int `json:"id"` // Registry's unique id }
The Schema type is an object produced by the schema registry.
type Source ¶
type Source struct { idk.ConfluentCommand Topics []string Group string Log logger.Logger Timeout time.Duration SkipOld bool Verbose bool TLS idk.TLSConfig ConfigMap *confluent.ConfigMap // contains filtered or unexported fields }
Source implements the idk.Source interface using kafka as a data source. It is not threadsafe! Due to the way Kafka clients work, to achieve concurrency, create multiple Sources.
func (*Source) CommitMessages ¶
func (s *Source) CommitMessages(recs []confluent.TopicPartition) ([]confluent.TopicPartition, error)
func (*Source) Open ¶
Open initializes the kafka source. (i.e. creating and configuring a consumer) The configuration options for the confluentinc/confluent-kafka-go/kafka libarary are: https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md
func (*Source) Record ¶
Record returns the value of the next kafka message. The same Record object may be used by successive calls to Record, so it should not be retained.