kafkasc

package
v0.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 11, 2021 License: MIT Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Consumer

type Consumer interface {
	Poll(int) kafka.Event
	Commit() ([]kafka.TopicPartition, error)
	Close() (err error)
}

Consumer interface

type Environment

type Environment struct {
	KafkaBootstrapServers string
	KafkaAutoOffsetReset  string
}

Environment represents the current inspr environment

func GetKafkaEnvironment

func GetKafkaEnvironment() *Environment

GetKafkaEnvironment returns the current inspr environment

func RefreshEnviromentVariables

func RefreshEnviromentVariables() *Environment

RefreshEnviromentVariables "refreshes" the value of kafka environment variables. This was develop for testing and probably sholdn't be used in other cases.

type MockConsumer

type MockConsumer struct {
	// contains filtered or unexported fields
}

MockConsumer mock

func (*MockConsumer) Close

func (mc *MockConsumer) Close() (err error)

Close mock

func (*MockConsumer) Commit

func (mc *MockConsumer) Commit() ([]kafka.TopicPartition, error)

Commit mock

func (*MockConsumer) CreateEvent

func (mc *MockConsumer) CreateEvent(ev kafka.Event)

CreateEvent creates an event on the mock channel of thre consumer

func (*MockConsumer) CreateMessage

func (mc *MockConsumer) CreateMessage()

CreateMessage creates a message on the mock consumer from the field defined in the structure

func (*MockConsumer) Events

func (mc *MockConsumer) Events() chan kafka.Event

Events mock for the events channel

func (*MockConsumer) Poll

func (mc *MockConsumer) Poll(timeoutMs int) kafka.Event

Poll mocks event polling

type MockEvent

type MockEvent struct {
	// contains filtered or unexported fields
}

MockEvent mock

func (*MockEvent) String

func (me *MockEvent) String() string

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader reads/commit messages from the channels defined in the env

func NewReader

func NewReader() (*Reader, error)

NewReader return a new Reader

func (*Reader) Close

func (reader *Reader) Close() error

Close closes the reader consumers

func (*Reader) Commit

func (reader *Reader) Commit(ctx context.Context, channel string) error

Commit commits the last message read by Reader

func (*Reader) Consumers

func (reader *Reader) Consumers() map[string]Consumer

Consumers returns a Reader's consumers

func (*Reader) ReadMessage

func (reader *Reader) ReadMessage(ctx context.Context, channel string) ([]byte, error)

ReadMessage reads message by message. Returns channel the message belongs to, the message and an error if any occurred.

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

Writer defines an interface for writing messages

func NewWriter

func NewWriter() (*Writer, error)

NewWriter creates a new writer/kafka producer

func (*Writer) Close

func (writer *Writer) Close()

Close closes the kafka producer

func (*Writer) WriteMessage

func (writer *Writer) WriteMessage(channel string, message []byte) error

WriteMessage receives a message and sends it to the topic defined by the given channel

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL