Documentation ¶
Overview ¶
Package streams provides a set of functions to work with event streams. The package is designed to be used with Kafka, NATS, and other message brokers.
Index ¶
- Variables
- type Collector
- type Gatherer
- type Iterator
- type Key
- type MessageChannel
- type MessageReceiver
- type Messages
- type Metric
- type Metrics
- type Monitor
- type NextCursor
- type Node
- type Opt
- type Opts
- type Predicate
- type Probe
- type Registry
- type Sink
- type Source
- type Stream
- type StreamImpl
- func (s *StreamImpl[K, V]) Branch(name string, fns ...Predicate[K, V]) []*StreamImpl[K, V]
- func (s *StreamImpl[K, V]) Close()
- func (s *StreamImpl[K, V]) Collect(ch chan<- Metric)
- func (s *StreamImpl[K, V]) Do(name string, fn func(msg.Message[K, V])) *StreamImpl[K, V]
- func (s *StreamImpl[K, V]) Drain()
- func (s *StreamImpl[K, V]) Error() error
- func (s *StreamImpl[K, V]) Fail(err error)
- func (s *StreamImpl[K, V]) FanOut(name string, num int) []*StreamImpl[K, V]
- func (s *StreamImpl[K, V]) Filter(name string, fn Predicate[K, V]) *StreamImpl[K, V]
- func (s *StreamImpl[K, V]) Log(name string) *StreamImpl[K, V]
- func (s *StreamImpl[K, V]) Map(name string, fn func(msg.Message[K, V]) (msg.Message[K, V], error)) *StreamImpl[K, V]
- func (s *StreamImpl[K, V]) Mark(name string) *StreamImpl[K, V]
- func (s *StreamImpl[K, V]) Merge(name string, streams ...StreamImpl[K, V]) *StreamImpl[K, V]
- func (s *StreamImpl[K, V]) Sink(name string, sink Sink[K, V])
- type Table
- type Topology
- type Unimplemented
- func (u *Unimplemented[K, V]) Close()
- func (u *Unimplemented[K, V]) Do(name string, fn func(msg.Message[K, V])) Stream[K, V]
- func (u *Unimplemented[K, V]) Drain()
- func (u *Unimplemented[K, V]) Error() error
- func (u *Unimplemented[K, V]) FanOut(name string, predicates ...Predicate[K, V]) []Stream[K, V]
- func (u *Unimplemented[K, V]) Filter(name string, predicate Predicate[K, V]) Stream[K, V]
- func (u *Unimplemented[K, V]) Log(name string) Stream[K, V]
- func (u *Unimplemented[K, V]) Map(name string, fn func(msg.Message[K, V]) (msg.Message[K, V], error)) Stream[K, V]
- func (u *Unimplemented[K, V]) Mark() Stream[K, V]
- func (u *Unimplemented[K, V]) Sink(name string, sink Sink[K, V])
- type Value
Constants ¶
This section is empty.
Variables ¶
var ( // DefaultRegistry is a default prometheus registry. DefaultRegistry = NewRegistry() // DefaultRegisterer is a default prometheus registerer. DefaultRegisterer prometheus.Registerer = DefaultRegistry // DefaultGatherer is a default prometheus gatherer. DefaultGatherer prometheus.Gatherer = DefaultRegistry )
var DefaultMetrics = NewMetrics()
DefaultMetrics is a set of default metrics.
var ErrNotImplemented = errors.New("not implemented")
ErrNotImplemented is returned when a method is not implemented.
Functions ¶
This section is empty.
Types ¶
type Collector ¶
type Collector interface { // Collect ... Collect(ch chan<- Metric) }
Collector ...
type Gatherer ¶
type Gatherer interface { // Gather ... Gather(collector Collector) }
Gatherer is a type that can gather metrics.
type Iterator ¶
type Iterator interface { // Next moves the cursor to the next key/value pair, which will then be available through the Key, Value and Latest methods. // It returns false if the iterator is exhausted. Next() <-chan NextCursor }
Iterator is the interface that wraps the basic Next method.
type Metrics ¶
type Metrics struct {
// contains filtered or unexported fields
}
Metrics is a set of metrics.
func (*Metrics) Collect ¶
func (m *Metrics) Collect(ch chan<- prometheus.Metric)
Collect implements prometheus.Collector.
func (*Metrics) Describe ¶
func (m *Metrics) Describe(ch chan<- *prometheus.Desc)
Describe implements prometheus.Collector.
type Monitor ¶
Monitor is a statistics monitor.
func NewMonitor ¶
NewMonitor is a constructor for Monitor.
func (*Monitor) SetLatency ¶
SetLatency sets the latency metric.
type NextCursor ¶
NextCursor is the next cursor.
type Node ¶
type Node interface { // AddChild adds a child to a node. AddChild(Node) // Children returns the children of a node. Children() []Node // Name returns the name of a node. Name() string }
Node is a node in a topology.
type Opt ¶
type Opt func(*Opts)
Opt is a function that configures a stream.
func WithErrorLogger ¶
WithErrorLogger configures the error logger for a stream.
func WithLogger ¶
WithLogger configures the logger for a stream.
func WithTimeout ¶
WithTimeout configures the timeout for a stream.
type Opts ¶
type Opts struct {
// contains filtered or unexported fields
}
Opts is a set of options for a stream.
type Probe ¶
type Probe[K, V any] interface { // Do ... Do(ctx context.Context, monitor Monitor) error Collector }
Probe ...
type Source ¶
type Source[K, V any] interface { // Messages returns a channel of messages. Messages() chan msg.Message[K, V] // Commit commits a message. Commit(...msg.Message[K, V]) error // Error returns an error. Error() error }
Source is a source of messages.
type Stream ¶
type Stream[K Key, V Value] interface { // Close closes a stream. Close() // Do executes a function on a stream. Do(name string, fn func(msg.Message[K, V])) Stream[K, V] // Drain drains a stream. Drain() // FanOut splits a stream into multiple streams. FanOut(name string, predicates ...Predicate[K, V]) []Stream[K, V] // Filter filters a stream. Filter(name string, predicate Predicate[K, V]) Stream[K, V] // Map maps a stream. Map(name string, fn func(msg.Message[K, V]) (msg.Message[K, V], error)) Stream[K, V] // Mark marks a message. Mark() Stream[K, V] // Log logs a message. Log(name string) Stream[K, V] // Sink sends messages to a sink. Sink(name string, sink Sink[K, V]) // Errors returns the first error. Error() error }
Stream is a stream of messages.
type StreamImpl ¶
StreamImpl implements Stream.
func NewStream ¶
func NewStream[K, V any](src Source[K, V], opts ...Opt) *StreamImpl[K, V]
NewStream from a source of messages.
func (*StreamImpl[K, V]) Branch ¶
func (s *StreamImpl[K, V]) Branch(name string, fns ...Predicate[K, V]) []*StreamImpl[K, V]
Branch is branch a stream to multiple streams.
func (*StreamImpl[K, V]) Close ¶
func (s *StreamImpl[K, V]) Close()
Close is a function that closes a stream.
func (*StreamImpl[K, V]) Collect ¶
func (s *StreamImpl[K, V]) Collect(ch chan<- Metric)
Collect is collect the content of a stream.
func (*StreamImpl[K, V]) Do ¶
func (s *StreamImpl[K, V]) Do(name string, fn func(msg.Message[K, V])) *StreamImpl[K, V]
Do is a function that executes a function on a stream.
func (*StreamImpl[K, V]) Drain ¶
func (s *StreamImpl[K, V]) Drain()
Drain is a function that drains a stream.
func (*StreamImpl[K, V]) Error ¶
func (s *StreamImpl[K, V]) Error() error
Error is a function that returns the error of a stream.
func (*StreamImpl[K, V]) Fail ¶
func (s *StreamImpl[K, V]) Fail(err error)
Fail is a function that fails a stream
func (*StreamImpl[K, V]) FanOut ¶
func (s *StreamImpl[K, V]) FanOut(name string, num int) []*StreamImpl[K, V]
FanOut is fan out a stream to multiple streams.
func (*StreamImpl[K, V]) Filter ¶
func (s *StreamImpl[K, V]) Filter(name string, fn Predicate[K, V]) *StreamImpl[K, V]
Filter is a function that filters a stream.
func (*StreamImpl[K, V]) Log ¶
func (s *StreamImpl[K, V]) Log(name string) *StreamImpl[K, V]
Log is logging the content of a stream.
func (*StreamImpl[K, V]) Map ¶
func (s *StreamImpl[K, V]) Map(name string, fn func(msg.Message[K, V]) (msg.Message[K, V], error)) *StreamImpl[K, V]
Map is a function that maps a stream.
func (*StreamImpl[K, V]) Mark ¶
func (s *StreamImpl[K, V]) Mark(name string) *StreamImpl[K, V]
Mark is a function that marks a message.
func (*StreamImpl[K, V]) Merge ¶
func (s *StreamImpl[K, V]) Merge(name string, streams ...StreamImpl[K, V]) *StreamImpl[K, V]
Merge is merge multiple streams into one.
func (*StreamImpl[K, V]) Sink ¶
func (s *StreamImpl[K, V]) Sink(name string, sink Sink[K, V])
Sink is wire up a stream to a sink.
type Table ¶
type Table interface { // Set is setting a key/value pair. Set(key string, value []byte) error // Delete is deleting a key/value pair. Delete(key string) error // Setup is setting up the table. Setup() error // Error is returning the error. Error() error // Sink is the interface that wraps the basic Sink method. Sink[string, []byte] Iterator }
Table is the interface that wraps the basic Set, Delete, Setup, Error and Sink methods.
type Topology ¶
type Topology interface { // Root returns the root node of a topology. Root() Node }
Topology is a graph of nodes.
type Unimplemented ¶
Unimplemented ...
func (*Unimplemented[K, V]) Close ¶
func (u *Unimplemented[K, V]) Close()
Close is a function that closes a stream.
func (*Unimplemented[K, V]) Do ¶
func (u *Unimplemented[K, V]) Do(name string, fn func(msg.Message[K, V])) Stream[K, V]
Do is a function that executes a function on a stream.
func (*Unimplemented[K, V]) Drain ¶
func (u *Unimplemented[K, V]) Drain()
Drain is a function that drains a stream.
func (*Unimplemented[K, V]) Error ¶
func (u *Unimplemented[K, V]) Error() error
Error is a function that returns the first error.
func (*Unimplemented[K, V]) FanOut ¶
func (u *Unimplemented[K, V]) FanOut(name string, predicates ...Predicate[K, V]) []Stream[K, V]
FanOut is a function that splits a stream into multiple streams.
func (*Unimplemented[K, V]) Filter ¶
func (u *Unimplemented[K, V]) Filter(name string, predicate Predicate[K, V]) Stream[K, V]
Filter is a function that filters a stream.
func (*Unimplemented[K, V]) Log ¶
func (u *Unimplemented[K, V]) Log(name string) Stream[K, V]
Log is a function that logs a message.
func (*Unimplemented[K, V]) Map ¶
func (u *Unimplemented[K, V]) Map(name string, fn func(msg.Message[K, V]) (msg.Message[K, V], error)) Stream[K, V]
Map is a function that maps a stream.
func (*Unimplemented[K, V]) Mark ¶
func (u *Unimplemented[K, V]) Mark() Stream[K, V]
Mark is a function that marks a message.
func (*Unimplemented[K, V]) Sink ¶
func (u *Unimplemented[K, V]) Sink(name string, sink Sink[K, V])
Sink is a function that sends messages to a sink.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
examples
|
|
kafka
|
|
reader/mock
Package mock is a generated GoMock package.
|
Package mock is a generated GoMock package. |
writer/mocks
Package mocks is a generated GoMock package.
|
Package mocks is a generated GoMock package. |
nats
|
|