Documentation ¶
Index ¶
- Constants
- Variables
- type Config
- type Consumer
- type Destination
- type Producer
- type Source
- func (s *Source) Ack(context.Context, record.Position) error
- func (s *Source) Open(ctx context.Context, cfg plugins.Config) error
- func (s *Source) Read(ctx context.Context, position record.Position) (record.Record, error)
- func (s *Source) Teardown() error
- func (s *Source) Validate(cfg plugins.Config) error
- type Spec
Constants ¶
View Source
const ( Servers = "servers" Topic = "topic" SecurityProtocol = "securityProtocol" Acks = "acks" DeliveryTimeout = "deliveryTimeout" ReadFromBeginning = "readFromBeginning" )
Variables ¶
View Source
var ( ErrServersMissing = cerrors.New("servers missing") ErrTopicMissing = cerrors.New("topic missing") )
View Source
var Required = []string{Servers, Topic}
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { // A list of bootstrap servers, which will be used to discover all the servers in a cluster. Servers []string Topic string // Required acknowledgments when writing messages to a topic: // Can be: 0, 1, -1 (all) Acks kafka.RequiredAcks DeliveryTimeout time.Duration // Read all messages present in a source topic. // Default value: false (only new messages are read) ReadFromBeginning bool }
Config contains all the possible configuration parameters for Kafka sources and destinations. When changing this struct, please also change the plugin specification (in main.go) as well as the ReadMe.
type Consumer ¶
type Consumer interface { // StartFrom instructs the consumer to connect to a broker and a topic, using the provided consumer group ID. // The group ID is significant for this consumer's offsets. // By using the same group ID after a restart, we make sure that the consumer continues from where it left off. // Returns: An error, if the consumer could not be set to read from the given position, nil otherwise. StartFrom(config Config, groupID string) error // Get returns a message from the configured topic. Waits until a messages is available // or until it errors out. // Returns: a message (if available), the consumer group ID and an error (if there was one). Get(ctx context.Context) (*kafka.Message, string, error) Ack() error // Close this consumer and the associated resources (e.g. connections to the broker) Close() }
Consumer represents a Kafka consumer in a simplified form, with just the functionality which is needed for this plugin. A Consumer's offset is being managed by the broker.
func NewConsumer ¶
NewConsumer creates a new Kafka consumer. The consumer needs to be started (using the StartFrom method) before actually being used.
type Destination ¶
func (*Destination) Teardown ¶
func (s *Destination) Teardown() error
Teardown shuts down the Kafka client.
type Producer ¶
type Producer interface { // Send synchronously delivers a message. // Returns an error, if the message could not be delivered. Send(key []byte, payload []byte) error // Close this producer and the associated resources (e.g. connections to the broker) Close() }
func NewProducer ¶
NewProducer creates a new Kafka producer. The current implementation uses Segment's kafka-go client.
Source Files ¶
Click to show internal directories.
Click to hide internal directories.