engine

package
v3.0.0-...-16f56ce Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 3, 2024 License: Apache-2.0, BSD-3-Clause, MIT Imports: 20 Imported by: 0

Documentation

Overview

Package engine handles the operational components of a runner, to track elements, watermarks, timers, triggers etc

Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Index

Constants

This section is empty.

Variables

View Source
var (
	OneKeyPerBundle  bool // OneKeyPerBundle sets if a bundle is restricted to a single key.
	OneElementPerKey bool // OneElementPerKey sets if a key in a bundle is restricted to one element.
)

TODO: Move to better place for configuration

Functions

This section is empty.

Types

type Block

type Block struct {
	Kind              BlockKind
	Bytes             [][]byte
	Transform, Family string
}

Block represents a contiguous set of data or timers for the same destination.

type BlockKind

type BlockKind int32

BlockKind indicates how the block is to be handled.

const (
	BlockData  BlockKind // BlockData represents data for the bundle.
	BlockTimer           // BlockTimer represents timers for the bundle.
)

type Config

type Config struct {
	// MaxBundleSize caps the number of elements permitted in a bundle.
	// 0 or less means this is ignored.
	MaxBundleSize int
}

type ElementManager

type ElementManager struct {
	// contains filtered or unexported fields
}

ElementManager handles elements, watermarks, and related errata to determine if a stage is able to be executed. It is the core execution engine of Prism.

Essentially, it needs to track the current watermarks for each PCollection and transform/stage. But it's tricky, since the watermarks for the PCollections are always relative to transforms/stages.

Key parts:

  • The parallel input's PCollection's watermark is relative to committed consumed elements. That is, the input elements consumed by the transform after a successful bundle, can advance the watermark, based on the minimum of their elements.
  • An output PCollection's watermark is relative to its producing transform, which relates to *all of it's outputs*.

This means that a PCollection's watermark is the minimum of all it's consuming transforms.

So, the watermark manager needs to track: Pending Elements for each stage, along with their windows and timestamps. Each transform's view of the watermarks for the PCollections.

Watermarks are advanced based on consumed input, except if the stage produces residuals.

func NewElementManager

func NewElementManager(config Config) *ElementManager

func (*ElementManager) AddStage

func (em *ElementManager) AddStage(ID string, inputIDs, outputIDs []string, sides []LinkID)

AddStage adds a stage to this element manager, connecting it's PCollections and nodes to the watermark propagation graph.

func (*ElementManager) AddTestStream

func (em *ElementManager) AddTestStream(id string, tagToPCol map[string]string) TestStreamBuilder

AddTestStream provides a builder interface for the execution layer to build the test stream from the protos.

func (*ElementManager) Bundles

func (em *ElementManager) Bundles(ctx context.Context, nextBundID func() string) <-chan RunBundle

Bundles is the core execution loop. It produces a sequences of bundles able to be executed. The returned channel is closed when the context is canceled, or there are no pending elements remaining.

func (*ElementManager) DataAndTimerInputForBundle

func (em *ElementManager) DataAndTimerInputForBundle(rb RunBundle, info PColInfo) ([]*Block, int)

DataAndTimerInputForBundle returns pre-allocated data for the given bundle and the estimated number of elements. Elements are encoded with the PCollection's coders.

func (*ElementManager) FailBundle

func (em *ElementManager) FailBundle(rb RunBundle)

FailBundle clears the extant data allowing the execution to shut down.

func (*ElementManager) GetSideData

func (em *ElementManager) GetSideData(sID, tID, inputID string, watermark mtime.Time) map[typex.Window][][]byte

GetSideData returns side input data for the provided stage+transform+input tuple, valid to the watermark.

func (*ElementManager) Impulse

func (em *ElementManager) Impulse(stageID string)

Impulse marks and initializes the given stage as an impulse which is a root transform that starts processing.

func (*ElementManager) InputForBundle

func (em *ElementManager) InputForBundle(rb RunBundle, info PColInfo) [][]byte

InputForBundle returns pre-allocated data for the given bundle, encoding the elements using the PCollection's coders.

func (*ElementManager) PersistBundle

func (em *ElementManager) PersistBundle(rb RunBundle, col2Coders map[string]PColInfo, d TentativeData, inputInfo PColInfo, residuals Residuals)

PersistBundle uses the tentative bundle output to update the watermarks for the stage. Each stage has two monotonically increasing watermarks, the input watermark, and the output watermark.

MAX(CurrentInputWatermark, MIN(PendingElements, InputPCollectionWatermarks) MAX(CurrentOutputWatermark, MIN(InputWatermark, WatermarkHolds))

PersistBundle takes in the stage ID, ID of the bundle associated with the pending input elements, and the committed output elements.

func (*ElementManager) ProcessingTimeNow

func (em *ElementManager) ProcessingTimeNow() (ret mtime.Time)

ProcessingTimeNow gives the current processing time for the runner.

func (*ElementManager) ReturnResiduals

func (em *ElementManager) ReturnResiduals(rb RunBundle, firstRsIndex int, inputInfo PColInfo, residuals Residuals)

ReturnResiduals is called after a successful split, so the remaining work can be re-assigned to a new bundle.

func (*ElementManager) StageAggregates

func (em *ElementManager) StageAggregates(ID string)

StageAggregates marks the given stage as an aggregation, which means elements will only be processed based on windowing strategies.

func (*ElementManager) StageProcessingTimeTimers

func (em *ElementManager) StageProcessingTimeTimers(ID string, ptTimers map[string]bool)

StageProcessingTimeTimers indicates which timers are processingTime domain timers.

func (*ElementManager) StageStateful

func (em *ElementManager) StageStateful(ID string)

StageStateful marks the given stage as stateful, which means elements are processed by key.

func (*ElementManager) StateForBundle

func (em *ElementManager) StateForBundle(rb RunBundle) TentativeData

StateForBundle retreives relevant state for the given bundle, WRT the data in the bundle.

TODO(lostluck): Consider unifiying with InputForBundle, to reduce lock contention.

type LinkID

type LinkID struct {
	Transform, Local, Global string
}

LinkID represents a fully qualified input or output.

type PColInfo

type PColInfo struct {
	GlobalID string
	WDec     exec.WindowDecoder
	WEnc     exec.WindowEncoder
	EDec     func(io.Reader) []byte
	KeyDec   func(io.Reader) []byte
}

type Residual

type Residual struct {
	Element []byte
	Delay   time.Duration // The relative time delay.
	Bounded bool          // Whether this element is finite or not.
}

Residual represents the unprocessed portion of a single element to be rescheduled for processing later.

type Residuals

type Residuals struct {
	Data                 []Residual
	TransformID, InputID string                // Prism only allows one SDF at the root of a bundledescriptor so there should only be one each.
	MinOutputWatermarks  map[string]mtime.Time // Output watermarks (technically per Residual, but aggregated here until it makes a difference.)
}

Residuals is used to specify process continuations within a bundle.

type RunBundle

type RunBundle struct {
	StageID   string
	BundleID  string
	Watermark mtime.Time
}

func (RunBundle) LogValue

func (rb RunBundle) LogValue() slog.Value

type StateData

type StateData struct {
	Bag      [][]byte
	Multimap map[string][][]byte
}

StateData is a "union" between Bag state and MultiMap state to increase common code.

type TentativeData

type TentativeData struct {
	Raw map[string][][]byte
	// contains filtered or unexported fields
}

TentativeData is where data for in progress bundles is put until the bundle executes successfully.

func (*TentativeData) AppendBagState

func (d *TentativeData) AppendBagState(stateID LinkID, wKey, uKey, data []byte)

AppendBagState appends the incoming data to the existing tentative data bundle.

The stateID has the Transform and Local fields populated, for the Transform and UserStateID respectively.

func (*TentativeData) AppendMultimapState

func (d *TentativeData) AppendMultimapState(stateID LinkID, wKey, uKey, mapKey, data []byte)

AppendMultimapState appends the incoming data to the existing tentative data bundle.

The stateID has the Transform and Local fields populated, for the Transform and UserStateID respectively.

func (*TentativeData) ClearBagState

func (d *TentativeData) ClearBagState(stateID LinkID, wKey, uKey []byte)

ClearBagState clears any tentative data for the state. Since state data is only initialized if any exists, Clear takes the approach to not create state that doesn't already exist. Existing state is zeroed to allow that to be committed post bundle commpletion.

The stateID has the Transform and Local fields populated, for the Transform and UserStateID respectively.

func (*TentativeData) ClearMultimapKeysState

func (d *TentativeData) ClearMultimapKeysState(stateID LinkID, wKey, uKey []byte)

ClearMultimapKeysState clears tentative data for all user map keys. Since state data is only initialized if any exists, Clear takes the approach to not create state that doesn't already exist. Existing state is zeroed to allow that to be committed post bundle commpletion.

The stateID has the Transform and Local fields populated, for the Transform and UserStateID respectively.

func (*TentativeData) ClearMultimapState

func (d *TentativeData) ClearMultimapState(stateID LinkID, wKey, uKey, mapKey []byte)

ClearMultimapState clears any tentative data for the state. Since state data is only initialized if any exists, Clear takes the approach to not create state that doesn't already exist. Existing state is zeroed to allow that to be committed post bundle commpletion.

The stateID has the Transform and Local fields populated, for the Transform and UserStateID respectively.

func (*TentativeData) GetBagState

func (d *TentativeData) GetBagState(stateID LinkID, wKey, uKey []byte) [][]byte

GetBagState retrieves available state from the tentative bundle data. The stateID has the Transform and Local fields populated, for the Transform and UserStateID respectively.

func (*TentativeData) GetMultimapKeysState

func (d *TentativeData) GetMultimapKeysState(stateID LinkID, wKey, uKey []byte) [][]byte

GetMultimapKeysState retrieves all available user map keys.

The stateID has the Transform and Local fields populated, for the Transform and UserStateID respectively.

func (*TentativeData) GetMultimapState

func (d *TentativeData) GetMultimapState(stateID LinkID, wKey, uKey, mapKey []byte) [][]byte

GetMultimapState retrieves available state from the tentative bundle data. The stateID has the Transform and Local fields populated, for the Transform and UserStateID respectively.

func (*TentativeData) WriteData

func (d *TentativeData) WriteData(colID string, data []byte)

WriteData adds data to a given global collectionID.

func (*TentativeData) WriteTimers

func (d *TentativeData) WriteTimers(transformID, familyID string, timers []byte)

WriteTimers adds timers to the associated transform handler.

type TestStreamBuilder

type TestStreamBuilder interface {
	AddElementEvent(tag string, elements []TestStreamElement)
	AddWatermarkEvent(tag string, newWatermark mtime.Time)
	AddProcessingTimeEvent(d time.Duration)
}

TestStreamBuilder builds a synthetic sequence of events for the engine to execute. A pipeline may only have a single TestStream and may panic.

type TestStreamElement

type TestStreamElement struct {
	Encoded   []byte
	EventTime mtime.Time
}

TestStreamElement wraps the provided bytes and timestamp for ingestion and use.

type TimerKey

type TimerKey struct {
	Transform, Family string
}

TimerKey is for use as a key for timers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL