Documentation ¶
Index ¶
- type ByteDecoder
- type ByteEncoder
- type Decoder
- type DecoderFunc
- type Encoder
- type EncoderFunc
- type Metadata
- type PartitionOffset
- type Sink
- type SinkConfig
- type Source
- func (s *Source) Cleanup(_ sarama.ConsumerGroupSession) error
- func (s *Source) Close() error
- func (s *Source) Commit(v interface{}) error
- func (s *Source) Consume() (streams.Message, error)
- func (s *Source) ConsumeClaim(_ sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (s *Source) Setup(session sarama.ConsumerGroupSession) error
- type SourceConfig
- type StringDecoder
- type StringEncoder
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ByteDecoder ¶
type ByteDecoder struct{}
ByteDecoder represents a byte decoder.
func (ByteDecoder) Decode ¶
func (d ByteDecoder) Decode(b []byte) (interface{}, error)
Decode transforms byte data to the desired type.
type ByteEncoder ¶
type ByteEncoder struct{}
ByteEncoder represents a byte encoder.
func (ByteEncoder) Encode ¶
func (e ByteEncoder) Encode(v interface{}) ([]byte, error)
Encode transforms the typed data to bytes.
type Decoder ¶
type Decoder interface { // Decode transforms byte data to the desired type. Decode([]byte) (interface{}, error) }
Decoder represents a Kafka data decoder.
type DecoderFunc ¶
DecoderFunc is an adapter allowing to use a function as a decoder.
var NilDecoder DecoderFunc = func([]byte) (interface{}, error) { return nil, nil }
NilDecoder is a decoder that always returns a nil, no matter the input.
func (DecoderFunc) Decode ¶
func (f DecoderFunc) Decode(value []byte) (interface{}, error)
Decode transforms byte data to the desired type.
type Encoder ¶
type Encoder interface { // Encode transforms the typed data to bytes. Encode(interface{}) ([]byte, error) }
Encoder represents a Kafka data encoder.
type EncoderFunc ¶
EncoderFunc is an adapter allowing to use a function as an encoder.
func (EncoderFunc) Encode ¶
func (f EncoderFunc) Encode(value interface{}) ([]byte, error)
Encode transforms the typed data to bytes.
type Metadata ¶
type Metadata []*PartitionOffset
Metadata represents an the kafka topic metadata.
func (Metadata) Merge ¶
func (m Metadata) Merge(v streams.Metadata, s streams.MetadataStrategy) streams.Metadata
Merge merges the contained metadata into the given the metadata.
func (Metadata) WithOrigin ¶
func (m Metadata) WithOrigin(o streams.MetadataOrigin)
WithOrigin sets the MetadataOrigin on the metadata.
type PartitionOffset ¶
type PartitionOffset struct { Origin streams.MetadataOrigin Topic string Partition int32 Offset int64 }
PartitionOffset represents the position in the stream of a message.
type Sink ¶
type Sink struct {
// contains filtered or unexported fields
}
Sink represents a Kafka streams sink.
type SinkConfig ¶
type SinkConfig struct { sarama.Config Brokers []string Topic string KeyEncoder Encoder ValueEncoder Encoder BatchSize int }
SinkConfig represents the configuration of a Sink.
func (*SinkConfig) Validate ¶
func (c *SinkConfig) Validate() error
Validate checks a Config instance. It will return a sarama.ConfigurationError if the specified values don't make sense.
type Source ¶
type Source struct {
// contains filtered or unexported fields
}
Source represents a Kafka stream source.
func NewSource ¶
func NewSource(c *SourceConfig) (*Source, error)
NewSource creates a new Kafka stream source.
func (*Source) Cleanup ¶
func (s *Source) Cleanup(_ sarama.ConsumerGroupSession) error
Cleanup is ran once for a session, after the consumption ends.
func (*Source) ConsumeClaim ¶
func (s *Source) ConsumeClaim(_ sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim consumes messages from a single partition of a topic.
type SourceConfig ¶
type SourceConfig struct { sarama.Config Brokers []string Topic string GroupID string Ctx context.Context KeyDecoder Decoder ValueDecoder Decoder BufferSize int ErrorsBufferSize int }
SourceConfig represents the configuration for a Kafka stream source.
func NewSourceConfig ¶
func NewSourceConfig() *SourceConfig
NewSourceConfig creates a new Kafka source configuration.
func (*SourceConfig) Validate ¶
func (c *SourceConfig) Validate() error
Validate checks a Config instance. It will return a sarama.ConfigurationError if the specified values don't make sense.
type StringDecoder ¶
type StringDecoder struct{}
StringDecoder represents a string decoder.
func (StringDecoder) Decode ¶
func (d StringDecoder) Decode(b []byte) (interface{}, error)
Decode transforms byte data to a string.
type StringEncoder ¶
type StringEncoder struct{}
StringEncoder represents a string encoder.
func (StringEncoder) Encode ¶
func (e StringEncoder) Encode(v interface{}) ([]byte, error)
Encode transforms the string data to bytes.