Documentation ¶
Index ¶
- Constants
- Variables
- type Config
- type Consumer
- type Destination
- type Producer
- type Source
- func (s *Source) Ack(context.Context, record.Position) error
- func (s *Source) Open(ctx context.Context, cfg plugins.Config) error
- func (s *Source) Read(ctx context.Context, position record.Position) (record.Record, error)
- func (s *Source) Teardown() error
- func (s *Source) Validate(cfg plugins.Config) error
- type Spec
Constants ¶
View Source
const ( Servers = "servers" Topic = "topic" SecurityProtocol = "securityProtocol" Acks = "acks" DeliveryTimeout = "deliveryTimeout" ReadFromBeginning = "readFromBeginning" )
Variables ¶
View Source
var ( ErrServersMissing = cerrors.New("servers missing") ErrTopicMissing = cerrors.New("topic missing") )
View Source
var Required = []string{Servers, Topic}
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { // A list of bootstrap servers, which will be used to discover all the servers in a cluster. // Maps to "bootstrap.servers" in a Kafka consumer's configuration Servers string Topic string // Maps to "security.protocol" in a Kafka consumer's configuration SecurityProtocol string // Maps to "acks" in a Kafka consumer's configuration Acks skafka.RequiredAcks DeliveryTimeout time.Duration // Read all messages present in a source topic. // Default value: false (only new messages are read) ReadFromBeginning bool }
Config contains all the possible configuration parameters for Kafka sources and destinations. When changing this struct, please also change the plugin specification (in main.go) as well as the ReadMe.
func (Config) AsKafkaCfg ¶
type Consumer ¶
type Consumer interface { // Get returns a message from the configured topic, waiting at most 'timeoutMs' milliseconds. // Returns: // A message and the client's 'position' in Kafka, if there's no error, OR // A nil message, the client's position in Kafka, and a nil error, // if no message was retrieved within the specified timeout, OR // A nil message, nil position and an error if there was an error while retrieving the message (e.g. broker down). Get(timeout time.Duration) (*kafka.Message, map[int32]int64, error) // Close this consumer and the associated resources (e.g. connections to the broker) Close() // StartFrom reads messages from the given topic, starting from the given positions. // For new partitions or partitions not found in the 'position', // the reading behavior is specified by 'readFromBeginning' parameter: // if 'true', then all messages will be read, if 'false', only new messages will be read. // Returns: An error, if the consumer could not be set to read from the given position, nil otherwise. StartFrom(topic string, position map[int32]int64, readFromBeginning bool) error }
func NewConsumer ¶
NewConsumer creates a new Kafka consumer. The current implementation uses Confluent's Kafka client. Full list of configuration properties is available here: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
type Destination ¶
func (*Destination) Teardown ¶
func (s *Destination) Teardown() error
Teardown shuts down the Kafka client.
type Producer ¶
type Producer interface { // Send synchronously delivers a message. // Returns an error, if the message could not be delivered. Send(key []byte, payload []byte) error // Close this producer and the associated resources (e.g. connections to the broker) Close() }
func NewProducer ¶
NewProducer creates a new Kafka producer. The current implementation uses Segment's kafka-go client.
Source Files ¶
Click to show internal directories.
Click to hide internal directories.