Documentation ¶
Overview ¶
Package worker handles interactions with SDK side workers, representing the worker services, communicating with those services, and SDK environments.
Index ¶
- type B
- type DataService
- type W
- func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) error
- func (wk *W) Data(data fnpb.BeamFnData_DataServer) error
- func (wk *W) Endpoint() string
- func (wk *W) GetProcessBundleDescriptor(ctx context.Context, req *fnpb.GetProcessBundleDescriptorRequest) (*fnpb.ProcessBundleDescriptor, error)
- func (wk *W) LogValue() slog.Value
- func (wk *W) Logging(stream fnpb.BeamFnLogging_LoggingServer) error
- func (wk *W) NextInst() string
- func (wk *W) NextStage() string
- func (wk *W) Serve()
- func (wk *W) State(state fnpb.BeamFnState_StateServer) error
- func (wk *W) Stop()
- func (wk *W) String() string
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type B ¶
type B struct { InstID string // ID for the instruction processing this bundle. PBDID string // ID for the ProcessBundleDescriptor // InputTransformID is data being sent to the SDK. InputTransformID string InputData [][]byte // Data specifically for this bundle. // TODO change to a single map[tid] -> map[input] -> map[window] -> struct { Iter data, MultiMap data } instead of all maps. // IterableSideInputData is a map from transformID, to inputID, to window, to data. IterableSideInputData map[string]map[string]map[typex.Window][][]byte // MultiMapSideInputData is a map from transformID, to inputID, to window, to data key, to data values. MultiMapSideInputData map[string]map[string]map[typex.Window]map[string][][]byte // OutputCount is the number of data outputs this bundle has. // We need to see this many closed data channels before the bundle is complete. OutputCount int OutputData engine.TentativeData Resp chan *fnpb.ProcessBundleResponse SinkToPCollection map[string]string // contains filtered or unexported fields }
B represents an extant ProcessBundle instruction sent to an SDK worker. Generally manipulated by another package to interact with a worker.
func (*B) ProcessOn ¶
ProcessOn executes the given bundle on the given worker.
Assumes the bundle is initialized (all maps are non-nil, and data waitgroup is set.) Assumes the bundle descriptor is already registered.
While this method mostly manipulates a W, putting it on a B avoids mixing the workers public GRPC APIs up with local calls.
type DataService ¶
type DataService struct {
// contains filtered or unexported fields
}
DataService is slated to be deleted in favour of stage based state management for side inputs.
func (*DataService) Commit ¶
func (d *DataService) Commit(tent engine.TentativeData)
Commit tentative data to the datastore.
func (*DataService) GetAllData ¶
func (d *DataService) GetAllData(colID string) [][]byte
Hack for Side Inputs until watermarks are sorted out.
type W ¶
type W struct { fnpb.UnimplementedBeamFnControlServer fnpb.UnimplementedBeamFnDataServer fnpb.UnimplementedBeamFnStateServer fnpb.UnimplementedBeamFnLoggingServer ID string InstReqs chan *fnpb.InstructionRequest DataReqs chan *fnpb.Elements Descriptors map[string]*fnpb.ProcessBundleDescriptor // Stages keyed by PBDID D *DataService // contains filtered or unexported fields }
A W manages worker environments, sending them work that they're able to execute, and manages the server side handlers for FnAPI RPCs.
func (*W) GetProcessBundleDescriptor ¶
func (wk *W) GetProcessBundleDescriptor(ctx context.Context, req *fnpb.GetProcessBundleDescriptorRequest) (*fnpb.ProcessBundleDescriptor, error)