Documentation
¶
Overview ¶
Package forward is used for creating a data forwarder for the UDF vertex.
Package forward does the Read (fromBufferPartition) -> Process (map UDF) -> Forward (toBuffers) -> Ack (fromBufferPartition) loop.
Index ¶
- func DefaultOptions() *options
- type InterStepDataForward
- type Option
- func WithCallbackUploader(cp *callback.Uploader) Option
- func WithLogger(l *zap.SugaredLogger) Option
- func WithReadBatchSize(f int64) Option
- func WithRetryInterval(f time.Duration) Option
- func WithUDFBatchMap(f applier.BatchMapApplier) Option
- func WithUDFConcurrency(f int) Option
- func WithUDFStreamingMap(f applier.MapStreamApplier) Option
- func WithUDFUnaryMap(f applier.MapApplier) Option
- type Shutdown
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DefaultOptions ¶
func DefaultOptions() *options
Types ¶
type InterStepDataForward ¶
type InterStepDataForward struct { FSD forwarder.ToWhichStepDecider Shutdown // contains filtered or unexported fields }
InterStepDataForward forwards the data from previous step to the current step via inter-step buffer.
func NewInterStepDataForward ¶
func NewInterStepDataForward(vertexInstance *dfv1.VertexInstance, fromStep isb.BufferReader, toSteps map[string][]isb.BufferWriter, fsd forwarder.ToWhichStepDecider, fetchWatermark fetch.Fetcher, publishWatermark map[string]publish.Publisher, idleManager wmb.IdleManager, opts ...Option) (*InterStepDataForward, error)
NewInterStepDataForward creates an inter-step forwarder.
func (*InterStepDataForward) ForceStop ¶
func (isdf *InterStepDataForward) ForceStop()
ForceStop sets up the force shutdown flag.
func (*InterStepDataForward) IsShuttingDown ¶
func (isdf *InterStepDataForward) IsShuttingDown() (bool, error)
IsShuttingDown returns whether we can stop processing.
func (*InterStepDataForward) Start ¶
func (isdf *InterStepDataForward) Start() <-chan struct{}
Start starts reading the buffer and forwards to the next buffers. Call `Stop` to stop.
func (*InterStepDataForward) Stop ¶
func (isdf *InterStepDataForward) Stop()
Stop stops the processing.
type Option ¶
type Option func(*options) error
func WithCallbackUploader ¶ added in v1.3.0
WithCallbackUploader sets the callback uploader for the vertex
func WithLogger ¶
func WithLogger(l *zap.SugaredLogger) Option
WithLogger is used to return logger information
func WithReadBatchSize ¶
WithReadBatchSize sets the read batch size
func WithRetryInterval ¶
WithRetryInterval sets the retry interval
func WithUDFBatchMap ¶ added in v1.3.0
func WithUDFBatchMap(f applier.BatchMapApplier) Option
WithUDFBatchMap enables the batch map for UDF if provided with a non-nil applier
func WithUDFConcurrency ¶
WithUDFConcurrency sets concurrency for map UDF processing
func WithUDFStreamingMap ¶ added in v1.3.0
func WithUDFStreamingMap(f applier.MapStreamApplier) Option
WithUDFStreamingMap sets streaming for map UDF processing if provided with a non-nil applier
func WithUDFUnaryMap ¶ added in v1.3.0
func WithUDFUnaryMap(f applier.MapApplier) Option
WithUDFUnaryMap enables the unary map for UDF if provided with a non-nil applier