worker

package
v3.0.0-...-16f56ce Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 3, 2024 License: Apache-2.0, BSD-3-Clause, MIT Imports: 25 Imported by: 0

Documentation

Overview

Package worker handles interactions with SDK side workers, representing the worker services, communicating with those services, and SDK environments.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type B

type B struct {
	InstID string // ID for the instruction processing this bundle.
	PBDID  string // ID for the ProcessBundleDescriptor

	// InputTransformID is where data is being sent to in the SDK.
	InputTransformID       string
	Input                  []*engine.Block // Data and Timers for this bundle.
	EstimatedInputElements int
	HasTimers              []string

	// IterableSideInputData is a map from transformID + inputID, to window, to data.
	IterableSideInputData map[SideInputKey]map[typex.Window][][]byte
	// MultiMapSideInputData is a map from transformID + inputID, to window, to data key, to data values.
	MultiMapSideInputData map[SideInputKey]map[typex.Window]map[string][][]byte

	// OutputCount is the number of data or timer outputs this bundle has.
	// We need to see this many closed data channels before the bundle is complete.
	OutputCount int
	// DataWait is how we determine if a bundle is finished, by waiting for each of
	// a Bundle's DataSinks to produce their last output.
	// After this point we can "commit" the bundle's output for downstream use.
	DataWait chan struct{}

	OutputData engine.TentativeData

	Resp      chan *fnpb.ProcessBundleResponse
	BundleErr error

	SinkToPCollection map[string]string
	// contains filtered or unexported fields
}

B represents an extant ProcessBundle instruction sent to an SDK worker. Generally manipulated by another package to interact with a worker.

func (*B) Cleanup

func (b *B) Cleanup(wk *W)

Cleanup unregisters the bundle from the worker.

func (*B) DataOrTimerDone

func (b *B) DataOrTimerDone()

DataOrTimerDone indicates a final element has been received from a Data or Timer output.

func (*B) Init

func (b *B) Init()

Init initializes the bundle's internal state for waiting on all data and for relaying a response back.

func (*B) LogValue

func (b *B) LogValue() slog.Value

func (*B) ProcessOn

func (b *B) ProcessOn(ctx context.Context, wk *W) <-chan struct{}

ProcessOn executes the given bundle on the given W. The returned channel is closed once all expected data is returned.

Assumes the bundle is initialized (all maps are non-nil, and data waitgroup is set, response channel initialized) Assumes the bundle descriptor is already registered with the W.

While this method mostly manipulates a W, putting it on a B avoids mixing the workers public GRPC APIs up with local calls.

func (*B) Progress

func (b *B) Progress(ctx context.Context, wk *W) (*fnpb.ProcessBundleProgressResponse, error)

Progress sends a progress request for the given bundle to the passed in worker, blocking on the response.

func (*B) Respond

func (b *B) Respond(resp *fnpb.InstructionResponse)

func (*B) Split

func (b *B) Split(ctx context.Context, wk *W, fraction float64, allowedSplits []int64) (*fnpb.ProcessBundleSplitResponse, error)

Split sends a split request for the given bundle to the passed in worker, blocking on the response.

type SideInputKey

type SideInputKey struct {
	TransformID, Local string
}

SideInputKey is for data lookups for a given bundle.

type W

type W struct {
	fnpb.UnimplementedBeamFnControlServer
	fnpb.UnimplementedBeamFnDataServer
	fnpb.UnimplementedBeamFnStateServer
	fnpb.UnimplementedBeamFnLoggingServer
	fnpb.UnimplementedProvisionServiceServer

	ID, Env string

	JobKey, ArtifactEndpoint string
	EnvPb                    *pipepb.Environment
	PipelineOptions          *structpb.Struct

	InstReqs chan *fnpb.InstructionRequest
	DataReqs chan *fnpb.Elements

	Descriptors map[string]*fnpb.ProcessBundleDescriptor // Stages keyed by PBDID
	// contains filtered or unexported fields
}

A W manages worker environments, sending them work that they're able to execute, and manages the server side handlers for FnAPI RPCs.

func New

func New(id, env string) *W

New starts the worker server components of FnAPI Execution.

func (*W) Connected

func (wk *W) Connected() bool

Connected indicates whether the worker has connected to the control RPC.

func (*W) Control

func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) error

Control relays instructions to SDKs and back again, coordinated via unique instructionIDs.

Requests come from the runner, and are sent to the client in the SDK.

func (*W) Data

func (wk *W) Data(data fnpb.BeamFnData_DataServer) error

Data relays elements and timer bytes to SDKs and back again, coordinated via ProcessBundle instructionIDs, and receiving input transforms.

Data is multiplexed on a single stream for all active bundles on a worker.

func (*W) Endpoint

func (wk *W) Endpoint() string

func (*W) LogValue

func (wk *W) LogValue() slog.Value

func (*W) Logging

func (wk *W) Logging(stream fnpb.BeamFnLogging_LoggingServer) error

Logging relates SDK worker messages back to the job that spawned them. Messages are received from the SDK,

func (*W) MonitoringMetadata

func (wk *W) MonitoringMetadata(ctx context.Context, unknownIDs []string) *fnpb.MonitoringInfosMetadataResponse

MonitoringMetadata is a convenience method to request the metadata for monitoring shortIDs.

func (*W) NextInst

func (wk *W) NextInst() string

func (*W) Serve

func (wk *W) Serve()

Serve serves on the started listener. Blocks.

func (*W) State

func (wk *W) State(state fnpb.BeamFnState_StateServer) error

State relays elements and timer bytes to SDKs and back again, coordinated via ProcessBundle instructionIDs, and receiving input transforms.

State requests come from SDKs, and the runner responds.

func (*W) Stop

func (wk *W) Stop()

Stop the GRPC server.

func (*W) Stopped

func (wk *W) Stopped() bool

Stopped indicates that the worker has stopped.

func (*W) String

func (wk *W) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL