pipe

package
v0.0.0-...-ab40c3c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 7, 2019 License: MIT Imports: 29 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	OffsetOldest = sarama.OffsetOldest
	OffsetNewest = sarama.OffsetNewest
)

Initial offset type

View Source
var Delimited = false

Delimited enables producing delimited message to text files and length prepended messages to binary files

View Source
var InitialOffset = OffsetNewest

InitialOffset allows to configure global initial offset from which to start consuming partitions which doesn't have offsets stored in the kafka_offsets table

View Source
var KafkaConfig *sarama.Config

KafkaConfig global per process Sarama config

View Source
var Pipes map[string]constructor

Pipes is the list of registered pipes Plugins insert their constructors into this map

Functions

func DeleteKafkaOffsets

func DeleteKafkaOffsets(conn *sql.DB, topic string) bool

DeleteKafkaOffsets delete offsets for specified topic

Types

type Consumer

type Consumer interface {
	Pop() (interface{}, error)
	Close() error
	//CloseOnFailure doesn't save offsets
	CloseOnFailure() error
	/*FetchNext is a blocking call which receives a message.
	  Message and error can be later retreived by Pop call.
	  If it returns false this means EOF and no more Pops allowed */
	FetchNext() bool
	//Allows to explicitly persists current consumer position
	SaveOffset() error

	//SetFormat allow to tell consumer the format of the file when there is no
	//header
	SetFormat(format string)
}

Consumer consumer interface for the pipe

type Header struct {
	Format    string
	Filters   []string `json:",omitempty"`
	Schema    []byte   `json:",omitempty"`
	Delimited bool     `json:",omitempty"`
	HMAC      string   `json:"HMAC-SHA256,omitempty"`
	IV        string   `json:"AES256-CFB-IV,omitempty"`
}

Header represent file metadata in the beginning of the file

type KafkaPipe

type KafkaPipe struct {
	Config *sarama.Config
	// contains filtered or unexported fields
}

KafkaPipe is wrapper on top of Sarama library to produce/consume through kafka

  • after failure shutdown pipe guarantees to resent last batchSize messages,

meaning batchSize messages may be inflight, reading (batchSize+1)th message automatically acknowledges previous batch.

  • producer caches and sents maximum batchSize messages at once

func (*KafkaPipe) Init

func (p *KafkaPipe) Init() error

Init initializes Kafka pipe creating kafka_offsets table

func (*KafkaPipe) NewConsumer

func (p *KafkaPipe) NewConsumer(topic string) (Consumer, error)

NewConsumer registers a new kafka consumer

func (*KafkaPipe) NewProducer

func (p *KafkaPipe) NewProducer(topic string) (Producer, error)

NewProducer registers a new sync producer

func (*KafkaPipe) Type

func (p *KafkaPipe) Type() string

Type returns Pipe type as Kafka

type LocalPipe

type LocalPipe struct {
	// contains filtered or unexported fields
}

LocalPipe pipe based on channels

func (*LocalPipe) NewConsumer

func (p *LocalPipe) NewConsumer(key string) (Consumer, error)

NewConsumer registers consumer with the given pipe name

func (*LocalPipe) NewProducer

func (p *LocalPipe) NewProducer(key string) (Producer, error)

NewProducer registers producer with the given pipe name

func (*LocalPipe) Type

func (p *LocalPipe) Type() string

Type returns type of the type

type Pipe

type Pipe interface {
	NewConsumer(topic string) (Consumer, error)
	NewProducer(topic string) (Producer, error)
	Type() string
}

Pipe connects named producers and consumers

func Create

func Create(pctx context.Context, pipeType string, batchSize int, cfg *config.AppConfig, db *sql.DB) (Pipe, error)

Create is a pipe factory Creates pipe of given type, with given buffer size cfg is used by Kafka pipe to get additional configuration db is used by Kafka pipe to save state pctx is used to be able to cancel blocking calls inside pipe, like during shutdown

type Producer

type Producer interface {
	Push(data interface{}) error
	PushK(key string, data interface{}) error
	PushSchema(key string, data []byte) error
	//PushBatch queues the messages instead of sending immediately
	PushBatch(key string, data interface{}) error
	//PushCommit writes out all the messages queued by PushBatch
	PushBatchCommit() error
	Close() error

	SetFormat(format string)
}

Producer producer interface for pipe

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL