streams

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 15, 2023 License: Apache-2.0 Imports: 11 Imported by: 0

README

🏄♀ Streams

Release Go Reference Go Report Card Taylor Swift Volkswagen

A teeny-tiny package to create stream processing workloads. It is intended to be used with Apache Kafka.

Getting Started

There are only a few packages that help Gophers to create stream processing workloads. This package is one of them. It is intended to be used with Apache Kafka.

Supported:

go get github.com/katallaxie/streams

It features a channel based API to consume messages from a Kafka topic and a channel based API to produce messages to a Kafka topic. It assumes the use of a consumer group for the consumption of messages.

There is a source which feeds messages from a source into a stream. There is a sink which writes processed messages into a data sink.

When using a sink in the procesing the commit mode can be set to CommitManual which means that the sink will not commit the offset of the consumed message. This is useful when the message is processed in a stream and the offset should only be committed after the message has been processed.

The package connects a source with a sink via small functional operatios.

  • Branch
  • Do
  • FanOut
  • Filter
  • Log
  • Map
  • Merge
  • Table (experimental) Stores the message in a table. The table can be queried via a View.

There is support for Prometheus metrics.

The view package provides the ability to create views with data from the stream processing workload.

Docs

You can find the documentation hosted on godoc.org.

License

Apache 2.0

Documentation

Overview

Package streams provides a set of functions to work with event streams. The package is designed to be used with Kafka, NATS, and other message brokers.

Index

Constants

This section is empty.

Variables

View Source
var (
	// DefaultRegistry is a default prometheus registry.
	DefaultRegistry = NewRegistry()

	// DefaultRegisterer is a default prometheus registerer.
	DefaultRegisterer prometheus.Registerer = DefaultRegistry

	// DefaultGatherer is a default prometheus gatherer.
	DefaultGatherer prometheus.Gatherer = DefaultRegistry
)
View Source
var DefaultMetrics = NewMetrics()

DefaultMetrics is a set of default metrics.

View Source
var ErrNotImplemented = errors.New("not implemented")

ErrNotImplemented is returned when a method is not implemented.

Functions

This section is empty.

Types

type Collector

type Collector interface {
	// Collect ...
	Collect(ch chan<- Metric)
}

Collector ...

type Gatherer

type Gatherer interface {
	// Gather ...
	Gather(collector Collector)
}

Gatherer is a type that can gather metrics.

type Iterator

type Iterator interface {
	// Next moves the cursor to the next key/value pair, which will then be available through the Key, Value and Latest methods.
	// It returns false if the iterator is exhausted.
	Next() <-chan NextCursor
}

Iterator is the interface that wraps the basic Next method.

type Key

type Key interface {
	int | ~string | []byte
}

Key is a message key.

type MessageChannel

type MessageChannel[K, V any] chan msg.Message[K, V]

MessageChannel ...

type MessageReceiver

type MessageReceiver[K, V any] chan msg.Message[K, V]

MessageReceiver ...

type Messages

type Messages[K, V any] chan msg.Message[K, V]

Messages is a channel of messages.

type Metric

type Metric interface {
	// Write ...
	Write(m *Monitor) error
}

Metric

type Metrics

type Metrics struct {
	// contains filtered or unexported fields
}

Metrics is a set of metrics.

func NewMetrics

func NewMetrics() *Metrics

NewMetrics is a constructor for Metrics.

func (*Metrics) Collect

func (m *Metrics) Collect(ch chan<- prometheus.Metric)

Collect implements prometheus.Collector.

func (*Metrics) Describe

func (m *Metrics) Describe(ch chan<- *prometheus.Desc)

Describe implements prometheus.Collector.

type Monitor

type Monitor struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Monitor is a statistics monitor.

func NewMonitor

func NewMonitor(metrics *Metrics) *Monitor

NewMonitor is a constructor for Monitor.

func (*Monitor) Gather

func (m *Monitor) Gather(collector Collector)

Gather is a method that gathers metrics.

func (*Monitor) SetCount

func (m *Monitor) SetCount(node string, count float64)

SetCount sets the count metric.

func (*Monitor) SetLatency

func (m *Monitor) SetLatency(node string, latency float64)

SetLatency sets the latency metric.

type NextCursor

type NextCursor struct {
	Key    string
	Value  []byte
	Latest bool
}

NextCursor is the next cursor.

type Node

type Node interface {
	// AddChild adds a child to a node.
	AddChild(Node)

	// Children returns the children of a node.
	Children() []Node

	// Name returns the name of a node.
	Name() string
}

Node is a node in a topology.

func NewNode

func NewNode(name string) Node

NewNode is a constructor for a new node in the topology.

type Opt

type Opt func(*Opts)

Opt is a function that configures a stream.

func WithBuffer

func WithBuffer(size int) Opt

WithBuffer configures the buffer size for a stream.

func WithErrorLogger

func WithErrorLogger(logger logger.LogFunc) Opt

WithErrorLogger configures the error logger for a stream.

func WithLogger

func WithLogger(logger logger.LogFunc) Opt

WithLogger configures the logger for a stream.

func WithMonitor

func WithMonitor(m *Monitor) Opt

WithMonitor configures a statistics monitor.

func WithName

func WithName(name string) Opt

WithName configures the node name for a stream.

func WithTimeout

func WithTimeout(timeout time.Duration) Opt

WithTimeout configures the timeout for a stream.

type Opts

type Opts struct {
	// contains filtered or unexported fields
}

Opts is a set of options for a stream.

func DefaultOpts

func DefaultOpts() *Opts

DefaultOpts are the default options for a stream.

func (*Opts) Configure

func (o *Opts) Configure(opts ...Opt)

Configure is a function that configures a stream.

type Predicate

type Predicate[K, V any] func(msg.Message[K, V]) (bool, error)

Predicate is a function that returns true or false.

type Probe

type Probe[K, V any] interface {
	// Do ...
	Do(ctx context.Context, monitor Monitor) error

	Collector
}

Probe ...

type Registry

type Registry struct {
	*prometheus.Registry
}

Registry is a prometheus registry.

func NewRegistry

func NewRegistry() *Registry

NewRegistry is a constructor for Registry.

func (*Registry) Handler

func (r *Registry) Handler() http.Handler

Handler returns a HTTP handler for this registry. Should be registered at "/metrics".

type Sink

type Sink[K, V any] interface {
	Write(...msg.Message[K, V]) error
}

Sink is a sink of messages.

type Source

type Source[K, V any] interface {
	// Messages returns a channel of messages.
	Messages() chan msg.Message[K, V]

	// Commit commits a message.
	Commit(...msg.Message[K, V]) error

	// Error returns an error.
	Error() error
}

Source is a source of messages.

type Stream

type Stream[K Key, V Value] interface {
	// Close closes a stream.
	Close()

	// Do executes a function on a stream.
	Do(name string, fn func(msg.Message[K, V])) Stream[K, V]

	// Drain drains a stream.
	Drain()

	// FanOut splits a stream into multiple streams.
	FanOut(name string, predicates ...Predicate[K, V]) []Stream[K, V]

	// Filter filters a stream.
	Filter(name string, predicate Predicate[K, V]) Stream[K, V]

	// Map maps a stream.
	Map(name string, fn func(msg.Message[K, V]) (msg.Message[K, V], error)) Stream[K, V]

	// Mark marks a message.
	Mark() Stream[K, V]

	// Log logs a message.
	Log(name string) Stream[K, V]

	// Sink sends messages to a sink.
	Sink(name string, sink Sink[K, V])

	// Errors returns the first error.
	Error() error
}

Stream is a stream of messages.

type StreamImpl

type StreamImpl[K, V any] struct {
	Collector
	// contains filtered or unexported fields
}

StreamImpl implements Stream.

func NewStream

func NewStream[K, V any](src Source[K, V], opts ...Opt) *StreamImpl[K, V]

NewStream from a source of messages.

func (*StreamImpl[K, V]) Branch

func (s *StreamImpl[K, V]) Branch(name string, fns ...Predicate[K, V]) []*StreamImpl[K, V]

Branch is branch a stream to multiple streams.

func (*StreamImpl[K, V]) Close

func (s *StreamImpl[K, V]) Close()

Close is a function that closes a stream.

func (*StreamImpl[K, V]) Collect

func (s *StreamImpl[K, V]) Collect(ch chan<- Metric)

Collect is collect the content of a stream.

func (*StreamImpl[K, V]) Do

func (s *StreamImpl[K, V]) Do(name string, fn func(msg.Message[K, V])) *StreamImpl[K, V]

Do is a function that executes a function on a stream.

func (*StreamImpl[K, V]) Drain

func (s *StreamImpl[K, V]) Drain()

Drain is a function that drains a stream.

func (*StreamImpl[K, V]) Error

func (s *StreamImpl[K, V]) Error() error

Error is a function that returns the error of a stream.

func (*StreamImpl[K, V]) Fail

func (s *StreamImpl[K, V]) Fail(err error)

Fail is a function that fails a stream

func (*StreamImpl[K, V]) FanOut

func (s *StreamImpl[K, V]) FanOut(name string, num int) []*StreamImpl[K, V]

FanOut is fan out a stream to multiple streams.

func (*StreamImpl[K, V]) Filter

func (s *StreamImpl[K, V]) Filter(name string, fn Predicate[K, V]) *StreamImpl[K, V]

Filter is a function that filters a stream.

func (*StreamImpl[K, V]) Log

func (s *StreamImpl[K, V]) Log(name string) *StreamImpl[K, V]

Log is logging the content of a stream.

func (*StreamImpl[K, V]) Map

func (s *StreamImpl[K, V]) Map(name string, fn func(msg.Message[K, V]) (msg.Message[K, V], error)) *StreamImpl[K, V]

Map is a function that maps a stream.

func (*StreamImpl[K, V]) Mark

func (s *StreamImpl[K, V]) Mark(name string) *StreamImpl[K, V]

Mark is a function that marks a message.

func (*StreamImpl[K, V]) Merge

func (s *StreamImpl[K, V]) Merge(name string, streams ...StreamImpl[K, V]) *StreamImpl[K, V]

Merge is merge multiple streams into one.

func (*StreamImpl[K, V]) Sink

func (s *StreamImpl[K, V]) Sink(name string, sink Sink[K, V])

Sink is wire up a stream to a sink.

type Table

type Table interface {
	// Set is setting a key/value pair.
	Set(key string, value []byte) error

	// Delete is deleting a key/value pair.
	Delete(key string) error

	// Setup is setting up the table.
	Setup() error

	// Error is returning the error.
	Error() error

	// Sink is the interface that wraps the basic Sink method.
	Sink[string, []byte]

	Iterator
}

Table is the interface that wraps the basic Set, Delete, Setup, Error and Sink methods.

type Topology

type Topology interface {
	// Root returns the root node of a topology.
	Root() Node
}

Topology is a graph of nodes.

func NewTopology

func NewTopology(root Node) Topology

NewTopology is a constructor for Topology.

type Unimplemented

type Unimplemented[K Key, V Value] struct{}

Unimplemented ...

func (*Unimplemented[K, V]) Close

func (u *Unimplemented[K, V]) Close()

Close is a function that closes a stream.

func (*Unimplemented[K, V]) Do

func (u *Unimplemented[K, V]) Do(name string, fn func(msg.Message[K, V])) Stream[K, V]

Do is a function that executes a function on a stream.

func (*Unimplemented[K, V]) Drain

func (u *Unimplemented[K, V]) Drain()

Drain is a function that drains a stream.

func (*Unimplemented[K, V]) Error

func (u *Unimplemented[K, V]) Error() error

Error is a function that returns the first error.

func (*Unimplemented[K, V]) FanOut

func (u *Unimplemented[K, V]) FanOut(name string, predicates ...Predicate[K, V]) []Stream[K, V]

FanOut is a function that splits a stream into multiple streams.

func (*Unimplemented[K, V]) Filter

func (u *Unimplemented[K, V]) Filter(name string, predicate Predicate[K, V]) Stream[K, V]

Filter is a function that filters a stream.

func (*Unimplemented[K, V]) Log

func (u *Unimplemented[K, V]) Log(name string) Stream[K, V]

Log is a function that logs a message.

func (*Unimplemented[K, V]) Map

func (u *Unimplemented[K, V]) Map(name string, fn func(msg.Message[K, V]) (msg.Message[K, V], error)) Stream[K, V]

Map is a function that maps a stream.

func (*Unimplemented[K, V]) Mark

func (u *Unimplemented[K, V]) Mark() Stream[K, V]

Mark is a function that marks a message.

func (*Unimplemented[K, V]) Sink

func (u *Unimplemented[K, V]) Sink(name string, sink Sink[K, V])

Sink is a function that sends messages to a sink.

type Value

type Value interface {
	int | ~string | []byte
}

Value is a message value.

Directories

Path Synopsis
examples
kafka
reader/mock
Package mock is a generated GoMock package.
Package mock is a generated GoMock package.
writer/mocks
Package mocks is a generated GoMock package.
Package mocks is a generated GoMock package.
nats

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL