worker

package
v0.0.0-...-498d591 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 11, 2023 License: Unlicense Imports: 18 Imported by: 0

Documentation

Overview

Package worker handles interactions with SDK side workers, representing the worker services, communicating with those services, and SDK environments.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type B

type B struct {
	InstID string // ID for the instruction processing this bundle.
	PBDID  string // ID for the ProcessBundleDescriptor

	// InputTransformID is data being sent to the SDK.
	InputTransformID string
	InputData        [][]byte // Data specifically for this bundle.

	// TODO change to a single map[tid] -> map[input] -> map[window] -> struct { Iter data, MultiMap data } instead of all maps.
	// IterableSideInputData is a map from transformID, to inputID, to window, to data.
	IterableSideInputData map[string]map[string]map[typex.Window][][]byte
	// MultiMapSideInputData is a map from transformID, to inputID, to window, to data key, to data values.
	MultiMapSideInputData map[string]map[string]map[typex.Window]map[string][][]byte

	// OutputCount is the number of data outputs this bundle has.
	// We need to see this many closed data channels before the bundle is complete.
	OutputCount int

	OutputData engine.TentativeData
	Resp       chan *fnpb.ProcessBundleResponse

	SinkToPCollection map[string]string
	// contains filtered or unexported fields
}

B represents an extant ProcessBundle instruction sent to an SDK worker. Generally manipulated by another package to interact with a worker.

func (*B) Init

func (b *B) Init()

Init initializes the

func (*B) LogValue

func (b *B) LogValue() slog.Value

func (*B) ProcessOn

func (b *B) ProcessOn(wk *W)

ProcessOn executes the given bundle on the given worker.

Assumes the bundle is initialized (all maps are non-nil, and data waitgroup is set.) Assumes the bundle descriptor is already registered.

While this method mostly manipulates a W, putting it on a B avoids mixing the workers public GRPC APIs up with local calls.

type DataService

type DataService struct {
	// contains filtered or unexported fields
}

DataService is slated to be deleted in favour of stage based state management for side inputs.

func (*DataService) Commit

func (d *DataService) Commit(tent engine.TentativeData)

Commit tentative data to the datastore.

func (*DataService) GetAllData

func (d *DataService) GetAllData(colID string) [][]byte

Hack for Side Inputs until watermarks are sorted out.

type W

type W struct {
	fnpb.UnimplementedBeamFnControlServer
	fnpb.UnimplementedBeamFnDataServer
	fnpb.UnimplementedBeamFnStateServer
	fnpb.UnimplementedBeamFnLoggingServer

	ID string

	InstReqs chan *fnpb.InstructionRequest
	DataReqs chan *fnpb.Elements

	Descriptors map[string]*fnpb.ProcessBundleDescriptor // Stages keyed by PBDID

	D *DataService
	// contains filtered or unexported fields
}

A W manages worker environments, sending them work that they're able to execute, and manages the server side handlers for FnAPI RPCs.

func New

func New(id string) *W

New starts the worker server components of FnAPI Execution.

func (*W) Control

func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) error

func (*W) Data

func (wk *W) Data(data fnpb.BeamFnData_DataServer) error

func (*W) Endpoint

func (wk *W) Endpoint() string

func (*W) LogValue

func (wk *W) LogValue() slog.Value

func (*W) Logging

func (wk *W) Logging(stream fnpb.BeamFnLogging_LoggingServer) error

func (*W) NextInst

func (wk *W) NextInst() string

func (*W) NextStage

func (wk *W) NextStage() string

func (*W) Serve

func (wk *W) Serve()

Serve serves on the started listener. Blocks.

func (*W) State

func (wk *W) State(state fnpb.BeamFnState_StateServer) error

func (*W) Stop

func (wk *W) Stop()

Stop the GRPC server.

func (*W) String

func (wk *W) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL