Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( OffsetOldest = sarama.OffsetOldest OffsetNewest = sarama.OffsetNewest )
Initial offset type
var Delimited = false
Delimited enables producing delimited message to text files and length prepended messages to binary files
var InitialOffset = OffsetNewest
InitialOffset allows to configure global initial offset from which to start consuming partitions which doesn't have offsets stored in the kafka_offsets table
var KafkaConfig *sarama.Config
KafkaConfig global per process Sarama config
var Pipes map[string]constructor
Pipes is the list of registered pipes Plugins insert their constructors into this map
Functions ¶
Types ¶
type Consumer ¶
type Consumer interface { Pop() (interface{}, error) Close() error //CloseOnFailure doesn't save offsets CloseOnFailure() error /*FetchNext is a blocking call which receives a message. Message and error can be later retreived by Pop call. If it returns false this means EOF and no more Pops allowed */ FetchNext() bool //Allows to explicitly persists current consumer position SaveOffset() error //SetFormat allow to tell consumer the format of the file when there is no //header SetFormat(format string) }
Consumer consumer interface for the pipe
type Header ¶
type Header struct { Format string Filters []string `json:",omitempty"` Schema []byte `json:",omitempty"` Delimited bool `json:",omitempty"` HMAC string `json:"HMAC-SHA256,omitempty"` IV string `json:"AES256-CFB-IV,omitempty"` }
Header represent file metadata in the beginning of the file
type KafkaPipe ¶
KafkaPipe is wrapper on top of Sarama library to produce/consume through kafka
- after failure shutdown pipe guarantees to resent last batchSize messages,
meaning batchSize messages may be inflight, reading (batchSize+1)th message automatically acknowledges previous batch.
- producer caches and sents maximum batchSize messages at once
func (*KafkaPipe) NewConsumer ¶
NewConsumer registers a new kafka consumer
func (*KafkaPipe) NewProducer ¶
NewProducer registers a new sync producer
type LocalPipe ¶
type LocalPipe struct {
// contains filtered or unexported fields
}
LocalPipe pipe based on channels
func (*LocalPipe) NewConsumer ¶
NewConsumer registers consumer with the given pipe name
func (*LocalPipe) NewProducer ¶
NewProducer registers producer with the given pipe name
type Pipe ¶
type Pipe interface { NewConsumer(topic string) (Consumer, error) NewProducer(topic string) (Producer, error) Type() string }
Pipe connects named producers and consumers
func Create ¶
func Create(pctx context.Context, pipeType string, batchSize int, cfg *config.AppConfig, db *sql.DB) (Pipe, error)
Create is a pipe factory Creates pipe of given type, with given buffer size cfg is used by Kafka pipe to get additional configuration db is used by Kafka pipe to save state pctx is used to be able to cancel blocking calls inside pipe, like during shutdown
type Producer ¶
type Producer interface { Push(data interface{}) error PushK(key string, data interface{}) error PushSchema(key string, data []byte) error //PushBatch queues the messages instead of sending immediately PushBatch(key string, data interface{}) error //PushCommit writes out all the messages queued by PushBatch PushBatchCommit() error Close() error SetFormat(format string) }
Producer producer interface for pipe