Documentation ¶
Overview ¶
Package forward does the Read (fromBufferPartition) -> Process (UDF) -> Forward (toBuffers) -> Ack (fromBufferPartition) loop.
Index ¶
- func DefaultOptions() *options
- type GoWhere
- type InterStepDataForward
- type Option
- func WithLogger(l *zap.SugaredLogger) Option
- func WithReadBatchSize(f int64) Option
- func WithRetryInterval(f time.Duration) Option
- func WithSourceWatermarkPublisher(p isb.SourceWatermarkPublisher) Option
- func WithUDFConcurrency(f int) Option
- func WithUDFStreaming(f bool) Option
- func WithVertexType(t dfv1.VertexType) Option
- type Shutdown
- type StarterStopper
- type ToWhichStepDecider
- type VertexBuffer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DefaultOptions ¶ added in v0.7.3
func DefaultOptions() *options
Types ¶
type GoWhere ¶
type GoWhere func([]string, []string) ([]VertexBuffer, error)
GoWhere is the step decider on where it needs to go
type InterStepDataForward ¶
type InterStepDataForward struct { FSD ToWhichStepDecider UDF applier.MapApplier Shutdown // contains filtered or unexported fields }
InterStepDataForward forwards the data from previous step to the current step via inter-step buffer.
func NewInterStepDataForward ¶
func NewInterStepDataForward(vertex *dfv1.Vertex, fromStep isb.BufferReader, toSteps map[string][]isb.BufferWriter, fsd ToWhichStepDecider, applyUDF applier.MapApplier, fetchWatermark fetch.Fetcher, publishWatermark map[string]publish.Publisher, opts ...Option) (*InterStepDataForward, error)
NewInterStepDataForward creates an inter-step forwarder.
func (*InterStepDataForward) ForceStop ¶
func (isdf *InterStepDataForward) ForceStop()
ForceStop sets up the force shutdown flag.
func (*InterStepDataForward) IsShuttingDown ¶
func (isdf *InterStepDataForward) IsShuttingDown() (bool, error)
IsShuttingDown returns whether we can stop processing.
func (*InterStepDataForward) Start ¶
func (isdf *InterStepDataForward) Start() <-chan struct{}
Start starts reading the buffer and forwards to the next buffers. Call `Stop` to stop.
func (*InterStepDataForward) Stop ¶
func (isdf *InterStepDataForward) Stop()
Stop stops the processing.
type Option ¶
type Option func(*options) error
func WithLogger ¶
func WithLogger(l *zap.SugaredLogger) Option
WithLogger is used to return logger information
func WithReadBatchSize ¶
WithReadBatchSize sets the read batch size
func WithRetryInterval ¶
WithRetryInterval sets the retry interval
func WithSourceWatermarkPublisher ¶ added in v0.7.2
func WithSourceWatermarkPublisher(p isb.SourceWatermarkPublisher) Option
WithSourceWatermarkPublisher sets the source watermark publisher
func WithUDFConcurrency ¶
WithUDFConcurrency sets concurrency for UDF processing
func WithUDFStreaming ¶ added in v0.8.1
WithUDFStreaming sets streaming for UDF processing
func WithVertexType ¶
func WithVertexType(t dfv1.VertexType) Option
WithVertexType sets the type of the vertex
type Shutdown ¶
type Shutdown struct {
// contains filtered or unexported fields
}
Shutdown tracks and enforces the shutdown activity.
type StarterStopper ¶
type StarterStopper interface { Start() <-chan struct{} Stop() ForceStop() }
StarterStopper starts/stops the forwarding.
type ToWhichStepDecider ¶
type ToWhichStepDecider interface { // WhereTo decides where to forward the result to based on the name of the step it returns. // It supports 2 addition keywords which need not be a step name. They are "ALL" and "DROP" // where former means, forward to all the neighbouring steps and latter means do not forward anywhere. WhereTo([]string, []string) ([]VertexBuffer, error) }
ToWhichStepDecider decides which step to forward after applying the WhereTo function.
type VertexBuffer ¶ added in v0.9.0
VertexBuffer points to the partition of a buffer owned by the vertex.