kafka

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 30, 2020 License: MIT Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultConnectTimeout = 10 * time.Second
	DefaultGroupId        = "plumber"
	DefaultMaxBytes       = 1048576 // 1MB
	DefaultMaxWait        = 50 * time.Millisecond
	DefaultBatchSize      = 1
)

Variables

This section is empty.

Functions

func Read

func Read(c *cli.Context) error

func Write

func Write(c *cli.Context) error

Types

type IKafka

type IKafka interface {
	NewReader(id, topic string) *Reader
	NewWriter(id, topic string) *Writer
}

type IReader

type IReader interface {
	Read(ctx context.Context) (skafka.Message, error)
}

type IWriter

type IWriter interface {
	Write(key, value []byte) error
}

type Kafka

type Kafka struct {
	Options *Options
	Dialer  *skafka.Dialer
}

func (*Kafka) NewReader

func (k *Kafka) NewReader(id string, md *desc.MessageDescriptor, opts *Options) *Reader

func (*Kafka) NewWriter

func (k *Kafka) NewWriter(id string, opts *Options) *Writer

GetWriterByTopic returns a new writer per topic

type Options

type Options struct {
	Address             string
	Topic               string
	GroupId             string
	ConnectTimeout      time.Duration
	UseInsecureTLS      bool
	Context             context.Context
	LineNumbers         bool
	Follow              bool
	Key                 string
	InputData           string
	ProtobufDir         string
	ProtobufRootMessage string
	InputType           string
	OutputType          string
	InputFile           string
	Convert             string
}

type Reader

type Reader struct {
	Id                string
	Reader            *skafka.Reader
	Options           *Options
	MessageDescriptor *desc.MessageDescriptor
	// contains filtered or unexported fields
}

func (*Reader) Read

func (r *Reader) Read(ctx context.Context) error

type Writer

type Writer struct {
	Id      string
	Writer  *skafka.Writer
	Options *Options
	// contains filtered or unexported fields
}

func (*Writer) Write

func (w *Writer) Write(key, value []byte) error

Publish a message into Kafka

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL