reduce

package
v0.10.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 15, 2023 License: Apache-2.0 Imports: 23 Imported by: 0

Documentation

Overview

Package reduce reads messages from isb and is responsible for the first part of reduce subsystem. It is responsible for feeding in the data to the second part of the reduce subsystem called processAndForward and processes the read data from the ISB, writes to the outbound channel called PBQ so the message can be asynchronously processed by `processAndForward`, and then closes the partition if possible based on watermark progression. To write to the outbound channel (PBQ), it has to partition the message first. Hence, also contains partitioning logic. Partitioner identifies a set of elements with a common key and time, and buckets them in to a common window. A partition is uniquely identified using a tuple {window, key}. Type of window does not matter. Partitioner is responsible for managing the persistence and processing of each partition. It uses PBQ for durable persistence of elements that belong to a partition and orchestrates the processing of elements using processAndForward function. Partitioner tracks active partitions, closes the partitions based on watermark progression and co-ordinates the materialization and forwarding the results to the next vertex in the pipeline.

Index

Constants

View Source
const (
	LabelReason = "reason"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type DataForward

type DataForward struct {
	// contains filtered or unexported fields
}

DataForward is responsible for reading and forwarding the message from ISB to PBQ.

func NewDataForward

func NewDataForward(ctx context.Context,
	vertexInstance *dfv1.VertexInstance,
	fromBuffer isb.BufferReader,
	toBuffers map[string][]isb.BufferWriter,
	pbqManager *pbq.Manager,
	whereToDecider forward.ToWhichStepDecider,
	fw fetch.Fetcher,
	watermarkPublishers map[string]publish.Publisher,
	windowingStrategy window.Windower,
	idleManager wmb.IdleManager,
	of *pnf.OrderedProcessor,
	opts ...Option) (*DataForward, error)

NewDataForward creates a new DataForward

func (*DataForward) ClosePartitions added in v0.8.1

func (df *DataForward) ClosePartitions(partitions []partition.ID)

ClosePartitions closes the partitions by invoking close-of-book (COB).

func (*DataForward) Process added in v0.8.1

func (df *DataForward) Process(ctx context.Context, messages []*isb.ReadMessage)

Process is one iteration of the read loop which writes the messages to the PBQs followed by acking the messages, and then closing the windows that can closed.

func (*DataForward) ReplayPersistedMessages added in v0.8.1

func (df *DataForward) ReplayPersistedMessages(ctx context.Context) error

ReplayPersistedMessages replays persisted messages, because during boot up, it has to replay the data from the persistent store of PBQ before it can start reading from ISB. ReplayPersistedMessages will return only after the replay has been completed.

func (*DataForward) ShutDown added in v0.8.1

func (df *DataForward) ShutDown(ctx context.Context)

ShutDown shutdowns the read-loop.

func (*DataForward) Start

func (df *DataForward) Start()

Start starts reading messages from ISG

type Option

type Option func(*Options) error

func WithAllowedLateness added in v0.8.0

func WithAllowedLateness(t time.Duration) Option

WithAllowedLateness sets allowedLateness

func WithReadBatchSize

func WithReadBatchSize(f int64) Option

WithReadBatchSize sets the read batch size

type Options

type Options struct {
	// contains filtered or unexported fields
}

Options for forwarding the message

func DefaultOptions

func DefaultOptions() *Options

Directories

Path Synopsis
pbq
store/wal
Package wal implements write-ahead-log.
Package wal implements write-ahead-log.
Package pnf processes and then forwards messages belonging to a window.
Package pnf processes and then forwards messages belonging to a window.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL