streams

package module
v0.12.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 1, 2025 License: MIT Imports: 0 Imported by: 18

README

go-streams

Build PkgGoDev Go Report Card codecov

go-streams provides a lightweight and efficient stream processing framework for Go. Its concise DSL allows for easy definition of declarative data pipelines using composable sources, flows, and sinks.

pipeline-architecture-example

Wiki
In computing, a pipeline, also known as a data pipeline, is a set of data processing elements connected in series, where the output of one element is the input of the next one. The elements of a pipeline are often executed in parallel or in time-sliced fashion. Some amount of buffer storage is often inserted between elements.

Overview

The core module has no external dependencies and provides three key components for constructing stream processing pipelines:

  • Source: The entry point of a pipeline, emitting data into the stream. (One open output)
  • Flow: A processing unit, transforming data as it moves through the pipeline. (One open input, one open output)
  • Sink: The termination point of a pipeline, consuming processed data and often acting as a subscriber. (One open input)
Flows

The flow package provides a collection of Flow implementations for common stream processing operations. These building blocks can be used to transform and manipulate data within pipelines.

  • Map: Transforms each element in the stream.
  • FlatMap: Transforms each element into a stream of slices of zero or more elements.
  • Filter: Selects elements from the stream based on a condition.
  • Reduce: Combines elements of the stream with the last reduced value and emits the new value.
  • PassThrough: Passes elements through unchanged.
  • Split1: Divides the stream into two streams based on a boolean predicate.
  • FanOut1: Duplicates the stream to multiple outputs for parallel processing.
  • RoundRobin1: Distributes elements evenly across multiple outputs.
  • Merge1: Combines multiple streams into a single stream.
  • ZipWith1: Combines elements from multiple streams using a function.
  • Flatten1: Flattens a stream of slices of elements into a stream of elements.
  • Batch: Breaks a stream of elements into batches based on size or timing.
  • Throttler: Limits the rate at which elements are processed.
  • SlidingWindow: Creates overlapping windows of elements.
  • TumblingWindow: Creates non-overlapping, fixed-size windows of elements.
  • SessionWindow: Creates windows based on periods of activity and inactivity.
  • Keyed: Groups elements by key for parallel processing of related data.

1 Utility Flows

Connectors

Standard Source and Sink implementations are located in the extension package.

  • Go channel inbound and outbound connector
  • File inbound and outbound connector
  • Standard Go io.Reader Source and io.Writer Sink connectors
  • os.Stdout and Discard Sink connectors (useful for development and debugging)

The following connectors are available as separate modules and have their own dependencies.

Usage Examples

See the examples directory for practical code samples demonstrating how to build complete stream processing pipelines, covering various use cases and integration scenarios.

License

Licensed under the MIT License.

Documentation

Overview

Package streams specifies interfaces to be implemented by the streaming connectors and operators.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Flow

type Flow interface {
	Inlet
	Outlet
	// Via asynchronously streams data from the Flow's Outlet to the given Flow.
	// It should return a new Flow that represents the combined pipeline.
	Via(Flow) Flow
	// To streams data from the Flow's Outlet to the given Sink, and should block
	// until the Sink has completed processing all data, which can be verified
	// via the Sink's AwaitCompletion method.
	To(Sink)
}

Flow represents a set of stream processing steps that has one open input and one open output. Programs can combine multiple Flows into sophisticated dataflow topologies. Implement this interface to create a custom stream transformation operator.

type Inlet

type Inlet interface {
	// In returns the input channel for the Inlet.
	// Data sent to this channel will be consumed by the component that implements
	// this interface. This channel should be closed by the upstream component
	// when no more input is expected.
	In() chan<- any
}

Inlet represents a type that exposes one open input.

type Outlet

type Outlet interface {
	// Out returns the output channel for the Outlet.
	// Data sent to this channel can be consumed by another component further
	// in the processing pipeline. This channel should be closed by the implementing
	// component when upstream processing has been completed.
	Out() <-chan any
}

Outlet represents a type that exposes one open output.

type Sink

type Sink interface {
	Inlet
	// AwaitCompletion should block until the Sink has completed processing
	// all data received through its Inlet and has finished any necessary
	// finalization or cleanup tasks.
	// This method is intended for internal use by the pipeline when the
	// input stream is closed by the upstream.
	AwaitCompletion()
}

Sink represents a set of stream processing steps that has one open input. A Sink will usually connect to a database or streaming platform to flush data from a pipeline. Implement this interface to create a custom sink connector.

type Source

type Source interface {
	Outlet
	// Via asynchronously streams data from the Source's Outlet to the given Flow.
	// It should return a new Flow that represents the combined pipeline.
	Via(Flow) Flow
}

Source represents a set of stream processing steps that has one open output. A Source will usually connect to a database or streaming platform to produce a stream of events/records. Implement this interface to create a custom source connector.

Directories

Path Synopsis
aerospike module
aws module
azure module
examples module
Package extension provides basic connector implementations.
Package extension provides basic connector implementations.
Package flow provides streams.Flow implementations.
Package flow provides streams.Flow implementations.
gcp module
internal
ospkg
Package ospkg provides platform-specific utility functions and constants.
Package ospkg provides platform-specific utility functions and constants.
kafka module
nats module
pulsar module
redis module
websocket module
ws module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL