beam

package module
v0.0.0-...-574e91b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 16, 2024 License: Apache-2.0 Imports: 35 Imported by: 1

README

beam-go

Go Reference license Build codecov Go Report Card

An Unofficial Alternative Apache Beam Go SDK

Status: Experimental

Not currently fit for production use, but ready for experimentation and feedback. Breaking changes are currently possible and likely.

However, pipelines should execute correctly on any runner that has a Beam JobManagement service endpoint.

Compatibility and Differences

Notably, it's using a completely different approach to building and executing the pipeline, in order to allow Go to typecheck the pipeline, instead of reflection heavy SDK side code. As a result it's not currently possible to mix and match versions. In particular, this is actively prevented at program initialization time via a protocol buffer collision.

Things that are different from the original Apache Beam Go SDK.

  • Coders
  • DoFns
  • Use of Beam featues
  • Pipeline Construction
  • No registration required.
  • Re-builds pipelines on worker initialization
  • Avoids package level variables
  • Tighter user surface

TODOs

  • Fix Pipeline handle ergonomics regarding extracting/querying metrics and status.
  • WindowInto, Triggers and Panes
    • PAssert
  • UnboundedSDFs and ProcessContinuations
  • State and Timers
  • User Defined Coders
    • Simple Compression Coders?
  • Additional examples and Documentation
  • Submitting jobs to Google Cloud Dataflow

Things to Consider

  • Split into multiple purposeful packages: One for Pipeline Construction, and one for DoFn definition?
    • Would that make implementation too difficult? Would we just be wrapping a single "internal" package with two different user packages? Would that make the GoDoc easier to parse for users?
    • Would "job launching" be a 3rd separate package for operational clarity?
    • Would just having robustly documented sections, and unambiguous documentation in the main beam package be sufficient?
    • Would that break pipeline construction with beam.PCol doing double duty?

Documentation

Overview

Package beam is a version of an Apache Beam SDK for Go that leverages generics. It's currently aimed at exploring the approach for the purposes of Data processing.

Status: Prototype

This is published to receive feedback and similar on the approach, and design.

This package serves three purposes:

  • implementing DoFns
  • Constructing Pipelines.
  • Launching and managing Jobs.

Implementing DoFns

DoFns define execution time behavior for the pipeline, processing, and transforming elements.

DoFns are primarily intended to be pointers to a struct type that implement the Transform generic interface, using the DFC type.

Further, the ElmC type is used within Process functions for Per Element use.

TODO(lostluck): Elaborate on how to build a DoFn.

Constructing Pipelines

Pipelines are constructed and launched by loading construction functions into a Configuration, calling Configuration.Ready to receive a Launcher, and then Launcher.Run to start pipeline execution. This returns a Pipeline handle that can be used to block until the pipeline is complete, or query active pipeline status and metrics.

Simple single pipelines can use the Launch or LaunchAndWait convenience

TODO(lostluck): Elaborate on how to build a pipeline.

Running Jobs

  • TODO Cover launching a binary
  • TODO Cover testing and Metrics
  • TODO Cover flags

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Expand

func Expand[I Composite[O], O any](parent *Scope, name string, comp I) O

func MakeCoder

func MakeCoder[E any]() coders.Coder[E]

func ParDo

func ParDo[E Element, DF Transform[E]](s *Scope, input PCol[E], dofn DF, opts ...Options) DF

ParDo takes the users's DoFn and returns the same type for downstream piepline construction.

The returned DoFn's emitter fields can then be used as inputs into other DoFns. What if we used Emitters as PCollections directly? Obviously, we'd rename the type PCollection or similar If only to also

Types

type AccumulatorCreator

type AccumulatorCreator[A Element] interface {
	CreateAccumulator() A
	AccumulatorMerger[A]
}

AccumulatorCreator is an interface to allow combiners to produce a more sophisticated accumulator type, when the zero value is inappropriate for accumulation.

type AccumulatorMerger

type AccumulatorMerger[A Element] interface {
	MergeAccumulators(A, A) A
}

AccumulatorMerger is an interface for combiners that only need a binary merge, and the input, output, and accumulator types are all the same.

type AfterBundle

type AfterBundle struct {
	// contains filtered or unexported fields
}

AfterBundle allows a DoFn to register a function that runs after the bundle has been durably committed. Emitting elements here will fail.

TODO consider moving this to a simple interface function. Upside, not likely to try to incorrectly emit in the closure. Downside, the caching for anything to finalize needs to be stored in the DoFn struct this violates the potential of a ConfigOnly DoFn.

func (*AfterBundle) Do

func (*AfterBundle) Do(dfc bundleFinalizer, finalizeBundle func() error)

type BoundedSDF

type BoundedSDF[FAC RestrictionFactory[E, R, P], E any, T Tracker[R, P], R Restriction[P], P, WES any] struct{}

BoundedSDF indicates this DoFn is able to split elements into independently processessable sub parts, called Restrictions.

Due to the handling required, call the BoundedSDF Process method, instead of the one on DFC.

func (BoundedSDF[FAC, E, T, R, P, WES]) Process

func (sdf BoundedSDF[FAC, E, T, R, P, WES]) Process(dfc *DFC[E],
	makeTracker func(R) T,
	proc ProcessRestriction[E, R, P]) error

Process is called during ProcessBundle set up to define the processing happening per element.

type Combiner

type Combiner[A, I, O Element, AM AccumulatorMerger[A]] struct {
	// contains filtered or unexported fields
}

Combiners represent an optimizable approach to aggregating, by breaking down the aggregation into 3 component types.

func AddMerge

func AddMerge[A, I Element, IA InputAdder[A, I]](c IA) Combiner[A, I, A, IA]

AddMerge produces a Combiner from an InputAdder.

func FullCombine

func FullCombine[A, I, O Element, C FullCombiner[A, I, O]](c C) Combiner[A, I, O, C]

MergeExtract produces a Combiner from a FullCombiner.

func MergeExtract

func MergeExtract[A, O Element, OE OutputExtractor[A, O]](c OE) Combiner[A, A, O, OE]

MergeExtract produces a Combiner from an OutputExtractor.

func SimpleMerge

func SimpleMerge[A Element, AM AccumulatorMerger[A]](c AM) Combiner[A, A, A, AM]

SimpleMerge produces a Combiner from an AccumulatorMerger.

type Composite

type Composite[O any] interface {
	Expand(s *Scope) O
}

Composite transforms allow structural re-use of sub pipelines.

type Configuration

type Configuration struct {
	// contains filtered or unexported fields
}

Configuration represents static configuration for pipelines.

The same [Config] may be used for multiple pipeline launches, however, that requires registration of DoFns in advance.

Serializable Construction time options may be registered with the TODO method, and referred to in calls to Launch or [Prepare].

You may [Prepare] multiple pipelines in advance, in order to ensure their configuration is registered with an ID. However these pipelines will not be built until Launch is called with the provided ID.

After prepare has been called,

func New

func New() *Configuration

New produces a new Beam framework configuration to statically configure pipelines for execution.

func (*Configuration) Flags

func (cfg *Configuration) Flags(fs *flag.FlagSet)

Flags initializes this Config from the flags of the provided command line set.

func (*Configuration) FromCommandLine

func (cfg *Configuration) FromCommandLine()

FromCommandLine registers this Config to be initialized from the command line flags of the binary.

func (*Configuration) Load

func (cfg *Configuration) Load(pid string, expand func(*Scope) error)

Load registers a pipeline construction function to be invoked later. The provided PID must be static to over multiple runs of the binary

Use Load when there are multiple possible pipelines that may be invoked from this binary.

This can be useful for a server binary that is also used as a processing pipeline, or when it's desirable to run tests for pipelines using a container execution mode, instead of loopback mode.

Load panics if the expand function is nil, or if the PID is already in use.

func (*Configuration) Ready

func (cfg *Configuration) Ready(ctx context.Context) Launcher

Ready is called once the framework has been configured, and flags have been parsed. Ready serves as an execution split point.

When Ready detects the binary is being executed as a worker, Ready will not return, calling os.Exit when execution is complete. Otherwise Ready will return and allow the remainder of execution to take place.

type CounterInt64

type CounterInt64 struct {
	// contains filtered or unexported fields
}

CounterInt64 represents a int64 counter metric.

func (*CounterInt64) Inc

func (c *CounterInt64) Inc(dfc Metrics, diff int64)

type DFC

type DFC[E Element] struct {
	// contains filtered or unexported fields
}

DFC or the DoFn Context is the local registry and router for Beam features used by the DoFn, in conjunction with reflection analysis of the DoFn's fields.

The generic type `E` represents the type of elements the DoFn processes.

Most ProcessBundle method implementations must call DFC.Process with an element processing func (Process). To not do so is a fatal error, caught at runtime.

Within a ProcessBundle method, the DFC may be passed to other Beam feature fields, such as OnBundleFinish, AfterBundle, metrics like CounterInt64, DistributionInt64,

Advance DoFns may use a BoundedSDF, which enable sub element work stealing at execution time. Such a DoFn is considered an SplittableDoFn.

func (*DFC[E]) Context

func (c *DFC[E]) Context() context.Context

Context returns the context for this bundle.

func (*DFC[E]) Logger

func (c *DFC[E]) Logger() *slog.Logger

Logger returns the *slog.Logger for the current bundle and transform.

func (*DFC[E]) Process

func (c *DFC[E]) Process(perElm Process[E]) error

Process is where you set the per Element processing function that accepts elements. Process returns an error to allow inlining with the error return from a Transform's ProcessBundle method.

func (*DFC[E]) ToElmC

func (c *DFC[E]) ToElmC(eventTime time.Time) ElmC

ToElmC is to get the appropriate element context for elements not derived from a specific element directly.

This derives the element windows, and sets a no-firing pane.

type DistributionInt64

type DistributionInt64 struct {
	// contains filtered or unexported fields
}

func (*DistributionInt64) Update

func (c *DistributionInt64) Update(dfc Metrics, val int64)

type Element

type Element interface {
	any // Sadly, can't really restrict this without breaking iterators in GBK results.
}

Element represents any user type. Beam processses arbitrary user types, but requires them to be encodeable.

type ElmC

type ElmC struct {
	// contains filtered or unexported fields
}

ElmC is the per element context.

This includes

  • Key (for state and timer using functions)
  • Windows
  • Timestamp
  • Pane

Provides the downstream emission context, so it actually sends data to the next DoFn.

Use an ObserveWindow feature to extract the Window and Pane.

func (*ElmC) EventTime

func (e *ElmC) EventTime() time.Time

EventTime returns the element's event time.

type FullCombiner

type FullCombiner[A, I, O Element] interface {
	InputAdder[A, I]
	AccumulatorMerger[A]
	OutputExtractor[A, O]
}

type InputAdder

type InputAdder[A, I Element] interface {
	AddInput(A, I) A
	AccumulatorMerger[A]
}

InputAdder is an interface to allow combiners to incorporate an input type

type Iter

type Iter[V Element] struct {
	// contains filtered or unexported fields
}

func (*Iter[V]) All

func (it *Iter[V]) All() func(perElm func(elm V) bool)

All allows a single iteration of its stream of values.

type KV

type KV[K, V Element] struct {
	// TODO consider making these methods instead
	// and decode them on demand?
	Key   K
	Value V
}

KV represents key vlaue pairs. These are useful to Beam pipelines, including allowing to GroupByKey, and stateful transforms.

func Pair

func Pair[K, V Element](k K, v V) KV[K, V]

Pair is a convenience function to build generic KVs.

type Keys

type Keys interface {
	comparable
}

type Launcher

type Launcher struct {
	// contains filtered or unexported fields
}

Launcher is able to Run pipelines [Load]ed from a [Ready] Configuration.

func (Launcher) Run

func (l Launcher) Run(ctx context.Context, pid string, opts ...Options) (Pipeline, error)

Run begins execution of the pipeline represented by the given PID, and then returns a handle to the pipeline.

Run returns an error if no pipeline has been registered with that ID.

type Metrics

type Metrics interface {
	// contains filtered or unexported methods
}

Metrics implementations enable users to update beam metrics for a job, to be aggregated by the runner.

Metrics are recorded as a tuple of the transform and the metric in question, so the same metric name used in different transforms are considered distinct.

The 'DFC' type implements Metrics and can be used for standard DoFns. Other contexts where user metrics are appropriate may also have a paremeter that implement Metrics.

type ORTracker

type ORTracker struct {
	Rest OffsetRange
	// contains filtered or unexported fields
}

ORTracker is a tracker for an offset range restriction.

func (*ORTracker) GetError

func (tracker *ORTracker) GetError() error

GetError returns the error that caused the tracker to stop, if there is one.

func (*ORTracker) GetProgress

func (tracker *ORTracker) GetProgress() (done, remaining float64)

GetProgress reports progress based on the claimed size and unclaimed sizes of the restriction.

func (*ORTracker) GetRestriction

func (tracker *ORTracker) GetRestriction() OffsetRange

GetRestriction returns the restriction.

func (*ORTracker) IsDone

func (tracker *ORTracker) IsDone() bool

IsDone returns true if the most recent claimed element is at or past the end of the restriction

func (*ORTracker) Size

func (t *ORTracker) Size(rest OffsetRange) float64

Size returns a an estimate of the amount of work in this restrction.

func (*ORTracker) TryClaim

func (tracker *ORTracker) TryClaim(pos int64) bool

TryClaim validates that the position is within the restriction and has been unclaimed.

func (*ORTracker) TrySplit

func (tracker *ORTracker) TrySplit(fraction float64) (primary, residual OffsetRange, err error)

TrySplit splits at the nearest integer greater than the given fraction of the remainder. If the fraction given is outside of the [0, 1] range, it is clamped to 0 or 1.

type ObserveWindow

type ObserveWindow struct{}

ObserveWindow indicates this DoFn needs to be aware of windows explicitly. Required to use as a field, but may be embedded for legibility.

DoFns that observe windows must process the element for each window individually. If ObserveWindow isn't being used, remove it to possibly improve performance.

func (*ObserveWindow) Of

func (*ObserveWindow) Of(ec ElmC) TODO

Of returns the window for this element.

func (*ObserveWindow) PaneOf

func (*ObserveWindow) PaneOf(ec ElmC) TODO

PaneOf returns the window for this element.

type OffsetRange

type OffsetRange struct {
	Min, Max int64
}

OffsetRange is an offset range restriction.

func (OffsetRange) Bounded

func (r OffsetRange) Bounded() bool

func (OffsetRange) End

func (r OffsetRange) End() int64

func (OffsetRange) Start

func (r OffsetRange) Start() int64

type OnBundleFinish

type OnBundleFinish struct{}

OnBundleFinish allows a DoFn to register a function that runs just before a bundle finishes. Elements may be emitted downstream, if an ElmC is retrieved from the DFC.

func (*OnBundleFinish) Do

func (*OnBundleFinish) Do(dfc bundleFinisher, finishBundle func() error)

Do registers a callback to execute after all bundle elements have been processed. Any resources that a DoFn needs explicitly cleaned up explicitly rather than implicitly via garbage collection, should be called here.

Only a single callback may be registered, and it will be the last one passed to Do.

type Options

type Options = beamopts.Options

Options configure Run, ParDo, and Combine with specific features. Each function takes a variadic list of options, where properties set in later options override the value of previously set properties.

func Endpoint

func Endpoint(endpoint string) Options

Endpoint sets the url when applicable, such as the JobManagement endpoint for submitting jobs or for configuring a target for expansion services.

func Name

func Name(name string) Options

Name sets the name of the pipeline or transform in question, typically to make it easier to refer to.

type OutputExtractor

type OutputExtractor[A, O Element] interface {
	AccumulatorMerger[A]
	ExtractOutput(A) O
}

type PCol

type PCol[E Element] struct {
	// contains filtered or unexported fields
}

PCol or PCollection represents an a logical collection of elements produced, or consumed by of a DoFn.

At pipeline execution time, they are used in a ProcessBundle method to emit elements and pass along per element context, such as the EventTime and Window.

Used as an Exported value field of a DoFn struct, they represent the outputs from the DoFn. After the DoFn is added to the graph, the processed DoFn's PCol fields are initialized and can be passed around by value, to further build the pipeline graph.

func CombinePerKey

func CombinePerKey[K Keys, A, I, O Element, AM AccumulatorMerger[A]](s *Scope, input PCol[KV[K, I]], comb Combiner[A, I, O, AM]) PCol[KV[K, O]]

func Create

func Create[E Element](s *Scope, values ...E) PCol[E]

Create static output for a transform, for simple initialization, and testing.

Values in create are serialzied as part of the Beam Pipeline graph, so it is not suitable for large elements, or large numbers of elements.

func Flatten

func Flatten[E Element](s *Scope, inputs ...PCol[E]) PCol[E]

Flatten joins together multiple Emitters of the same type into a single Emitter for downstream consumption.

func GBK

func GBK[K Keys, V Element](s *Scope, input PCol[KV[K, V]], opts ...Options) PCol[KV[K, Iter[V]]]

GBK produces an output PCollection of grouped values.

func Impulse

func Impulse(s *Scope) PCol[[]byte]

Impulse adds an impulse transform to the graph, which emits single element to downstream transforms, allowing processing to begin.

The element is a single byte slice in the global window, with an event timestamp at the start of the global window.

func Map

func Map[I, O Element](s *Scope, input PCol[I], lambda func(I) O, opts ...beamopts.Options) PCol[O]

func Reshuffle

func Reshuffle[E Element](s *Scope, input PCol[E], opts ...Options) PCol[E]

Reshuffle inserts a fusion break in the pipeline, preventing a producer transform from being fused with the consuming transform.

func (*PCol[E]) Emit

func (emt *PCol[E]) Emit(ec ElmC, elm E)

Emit the element within the current element's context.

The ElmC value is sourced from the DFC.Process method.

type Pipeline

type Pipeline struct {

	// TODO make these methods instead & support cancellation.
	Counters      map[string]int64
	Distributions map[string]struct{ Count, Sum, Min, Max int64 }
	// contains filtered or unexported fields
}

Pipeline is a handle to a running or terminated pipeline for programmatic access to the given runner.

func Launch

func Launch(ctx context.Context, expand func(*Scope) error, opts ...Options) (Pipeline, error)

Launch begins to execute the pipeline built in the construction function, returning a pipeline handle.

Launch is non-blocking. Call wait in the pipeline handle to wait until the job is finished, or use LaunchAndWait instead.

func LaunchAndWait

func LaunchAndWait(ctx context.Context, expand func(*Scope) error, opts ...Options) (Pipeline, error)

LaunchAndWait calls Launch and then Pipeline.Wait, blocking until the pipeline terminates.

func (*Pipeline) Wait

func (pr *Pipeline) Wait(ctx context.Context) error

type Process

type Process[E Element] func(ElmC, E) error

Process is the function type for handling a single element in a bundle.

Typically a closure returned from a Transform's ProcessBundle method.

Errors returned from Process functions abort bundle processing, and may cause pipeline termination. A runner may retry a bundle that has failed.

type ProcessRestriction

type ProcessRestriction[E any, R Restriction[P], P any] func(ElmC, E, R, TryClaim[P]) error

ProcessRestriction defines processing the given element with respect to the provided restriction.

type Restriction

type Restriction[P any] interface {
	// Start is the earliest position in this restriction.
	Start() P
	// End is the last position that must be processed with this restriction.
	End() P
	// Bounded whether this restiction is bounded or not.
	Bounded() bool
}

Restriction is a range of logical positions to be processed for this element. Restriction implementations must be serializable.

type RestrictionFactory

type RestrictionFactory[O Element, R Restriction[P], P any] interface {
	// Setup takes in the DoFn for configuration?
	Setup() error
	// Produce returns a restriction that processess the entire element.
	Produce(O) R

	// InitialSplit returns an iterator of a non-overlapping sub restriction and it's relative size.
	InitialSplit(O, R) iter.Seq2[R, float64]
}

Restriction Factory must have a valid zero value. It will not be serialized? HMMM. Might need to support receiving the user DoFn as configuration.

type Scope

type Scope struct {
	// contains filtered or unexported fields
}

Scope is used for building pipeline graphs.

Scope is a hierarchical grouping for composite transforms. Scopes can be enclosed in other scopes and for a tree structure. For pipeline updates, the scope chain form a unique name. The scope chain can also be used for monitoring and visualization purposes.

func (*Scope) String

func (s *Scope) String() string

type SideInputIter

type SideInputIter[E Element] struct {
	// contains filtered or unexported fields
}

func AsSideIter

func AsSideIter[E Element](emt PCol[E]) SideInputIter[E]

AsSideIter initializes an IterSideInput from a valid upstream Emitter. It allows access to the data of that Emitter's PCollection,

func (*SideInputIter[E]) All

func (si *SideInputIter[E]) All(ec ElmC) iter.Seq[E]

type SideInputMap

type SideInputMap[K, V Element] struct {
	// contains filtered or unexported fields
}

SideInputMap allows a side input to be accessed via multip-map key lookups.

func AsSideMap

func AsSideMap[K, V Element](emt PCol[KV[K, V]]) SideInputMap[K, V]

AsSideMap initializes a MapSideInput from a valid upstream Emitter.

func (*SideInputMap[K, V]) Get

func (si *SideInputMap[K, V]) Get(ec ElmC, k K) iter.Seq[V]

Get looks up an iterator of values associated with the key.

func (*SideInputMap[K, V]) Keys

func (si *SideInputMap[K, V]) Keys(ec ElmC) iter.Seq[K]

Get looks up an iterator of values associated with the key.

type StateBag

type StateBag[E Element] struct {
	// contains filtered or unexported fields
}

type StateCombining

type StateCombining[E Element] struct {
	// contains filtered or unexported fields
}

type StateMap

type StateMap[K, V Element] struct {
	// contains filtered or unexported fields
}

type StateSet

type StateSet[E Element] struct {
	// contains filtered or unexported fields
}

type StateValue

type StateValue[E Element] struct {
	// contains filtered or unexported fields
}

type TODO

type TODO any

TODO is a marker indicating that a better type should go here at some point but it's not yet implemented.

type TimerEvent

type TimerEvent struct {
	// contains filtered or unexported fields
}

type TimerProcessing

type TimerProcessing struct {
	// contains filtered or unexported fields
}

type Tracker

type Tracker[R Restriction[P], P any] interface {
	// Size returns a an estimate of the amount of work in this restrction.
	// A zero size restriction isn't permitted.
	Size(R) float64
	// TryClaim attempts to claim the given position within the restriction.
	// Claiming a position at or beyond the end of the restriction signals that the
	// entire restriction has been processed and is now done, at which point this
	// method signals to end processing.
	TryClaim(P) bool

	// TrySplit splits at the nearest position greater than the given fraction of the remainder. If the
	// fraction given is outside of the position's range, it is clamped to Min or Max.
	TrySplit(fraction float64) (primary, residual R, err error)
	IsDone() bool
	GetError() error
	GetProgress() (done, remaining float64)
	GetRestriction() R
}

Tracker manages state around splitting an element.

Tracker implementations are not serialized.

type Transform

type Transform[E Element] interface {
	ProcessBundle(dfc *DFC[E]) error
}

Transform is the only interface that needs to be implemented by most DoFns.

type TryClaim

type TryClaim[P any] func(func(P) (P, error)) error

TryClaim processes a DoFn provided closure, passing in a claimed position. The closure returns the next position, or an error.

Directories

Path Synopsis
cmd
beamgo
beamgo is a convenience builder and launcher for Beam Go WASI SDK pipelines jobs.
beamgo is a convenience builder and launcher for Beam Go WASI SDK pipelines jobs.
Package coders is a pair of convenience handles for encoding and decoding values to [][byte].
Package coders is a pair of convenience handles for encoding and decoding values to [][byte].
examples
wordcount
wordcount is an example pipeline that demonstrates basics of Apache Beam.
wordcount is an example pipeline that demonstrates basics of Apache Beam.
extworker
Package extworker provides an external worker service and related utilities.
Package extworker provides an external worker service and related utilities.
pipelinex
Package pipelinex contains utilities for manipulating Beam proto pipelines.
Package pipelinex contains utilities for manipulating Beam proto pipelines.
runner/prism
Package prism downloads, unzips, boots up a prism binary to run a pipeline against.
Package prism downloads, unzips, boots up a prism binary to run a pipeline against.
io
blobio
Package blobio abstracts the scalable reading and writing of file blobs against conformant services.
Package blobio abstracts the scalable reading and writing of file blobs against conformant services.
textio
Package textio contains transforms for reading and writing text blobs.
Package textio contains transforms for reading and writing text blobs.
transforms
io/synthetic
Package synthetic produces elements and load.
Package synthetic produces elements and load.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL