flow

package
v0.0.0-...-21a93f5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 11, 2024 License: Apache-2.0 Imports: 20 Imported by: 211

Documentation

Overview

Package flow contains data structure for computation. Mostly Dataset operations such as Map/Reduce/Join/Sort etc.

Index

Constants

This section is empty.

Variables

View Source
var (
	Local *localDriver
)

Functions

This section is empty.

Types

type DasetsetHint

type DasetsetHint func(d *Dataset)

func PartitionSize

func PartitionSize(n int64) DasetsetHint

PartitionSize hints the partition size in MB. This is usually used when sorting is needed.

func TotalSize

func TotalSize(n int64) DasetsetHint

TotalSize hints the total size in MB for all the partitions. This is usually used when sorting is needed.

type DasetsetMetadata

type DasetsetMetadata struct {
	TotalSize int64
	OnDisk    ModeIO
}

type DasetsetShardMetadata

type DasetsetShardMetadata struct {
	TotalSize int64
	Timestamp time.Time
	URI       string
	Status    DatasetShardStatus
	Error     error
}

type Dataset

type Dataset struct {
	Flow            *Flow
	Id              int
	Shards          []*DatasetShard
	Step            *Step
	ReadingSteps    []*Step
	IsPartitionedBy []int
	IsLocalSorted   []instruction.OrderBy
	Meta            *DasetsetMetadata
	RunLocked
}

func (*Dataset) Broadcast

func (d *Dataset) Broadcast(name string, shardCount int) *Dataset

Broadcast replicates itself to all shards.

func (*Dataset) CoGroup

func (d *Dataset) CoGroup(name string, other *Dataset, sortOption *SortOption) *Dataset

CoGroup joins two datasets by the key, Each result row becomes this format:

(key, []left_rows, []right_rows)

func (*Dataset) CoGroupPartitionedSorted

func (this *Dataset) CoGroupPartitionedSorted(name string, that *Dataset, indexes []int) (ret *Dataset)

CoGroupPartitionedSorted joins 2 datasets that are sharded by the same key and already locally sorted within each shard.

func (*Dataset) Distinct

func (d *Dataset) Distinct(name string, sortOption *SortOption) *Dataset

Distinct sort on specific fields and pick the unique ones. Required Memory: about same size as each partition. example usage: Distinct(Field(1,2)) means distinct on field 1 and 2. TODO: optimize for low cardinality case.

func (*Dataset) Do

func (d *Dataset) Do(fn func(*Dataset) *Dataset) *Dataset

Do accepts a function to transform a dataset into a new dataset. This allows custom complicated pre-built logic.

func (*Dataset) DoJoin

func (d *Dataset) DoJoin(name string, other *Dataset, leftOuter, rightOuter bool, sortOption *SortOption) *Dataset

func (*Dataset) Fprintf

func (d *Dataset) Fprintf(writer io.Writer, format string) *Dataset

Fprintf formats using the format for each row and writes to writer.

func (*Dataset) Fprintlnf

func (d *Dataset) Fprintlnf(writer io.Writer, format string) *Dataset

Fprintlnf add "\n" at the end of each format

func (*Dataset) GetIsOnDiskIO

func (d *Dataset) GetIsOnDiskIO() bool

GetIsOnDiskIO returns true if the dataset is persisted to disk in distributed mode.

func (*Dataset) GetPartitionSize

func (d *Dataset) GetPartitionSize() int64

GetPartitionSize returns the size in MB for each partition of the dataset. This is based on the hinted total size divided by the number of partitions.

func (*Dataset) GetShards

func (d *Dataset) GetShards() []*DatasetShard

func (*Dataset) GetTotalSize

func (d *Dataset) GetTotalSize() int64

GetTotalSize returns the total size in MB for the dataset. This is based on the given hint.

func (*Dataset) GroupBy

func (d *Dataset) GroupBy(name string, sortOption *SortOption) *Dataset

GroupBy e.g. GroupBy("", Field(1,2,3)) group data by field 1,2,3

func (*Dataset) HashJoin

func (bigger *Dataset) HashJoin(name string, smaller *Dataset, sortOption *SortOption) *Dataset

HashJoin joins two datasets by putting the smaller dataset in memory on all executors and streams through the bigger dataset.

func (*Dataset) Hint

func (d *Dataset) Hint(options ...DasetsetHint) *Dataset

Hint adds options for previous dataset.

func (*Dataset) Join

func (d *Dataset) Join(name string, other *Dataset, sortOption *SortOption) *Dataset

Join joins two datasets by the key.

func (*Dataset) JoinByKey

func (d *Dataset) JoinByKey(name string, other *Dataset) *Dataset

func (*Dataset) JoinPartitionedSorted

func (this *Dataset) JoinPartitionedSorted(name string, that *Dataset, sortOption *SortOption,
	isLeftOuterJoin, isRightOuterJoin bool) *Dataset

JoinPartitionedSorted Join multiple datasets that are sharded by the same key, and locally sorted within the shard

func (*Dataset) LeftOuterJoin

func (d *Dataset) LeftOuterJoin(name string, other *Dataset, sortOption *SortOption) *Dataset

func (*Dataset) LeftOuterJoinByKey

func (d *Dataset) LeftOuterJoinByKey(name string, other *Dataset) *Dataset

func (*Dataset) LocalDistinct

func (d *Dataset) LocalDistinct(name string, sortOption *SortOption) *Dataset

func (*Dataset) LocalGroupBy

func (d *Dataset) LocalGroupBy(name string, sortOption *SortOption) *Dataset

func (*Dataset) LocalHashAndJoinWith

func (this *Dataset) LocalHashAndJoinWith(name string, that *Dataset, sortOption *SortOption) *Dataset

func (*Dataset) LocalLimit

func (d *Dataset) LocalLimit(name string, n int, offset int) *Dataset

LocalLimit take the local first n rows and skip all other rows.

func (*Dataset) LocalReduceBy

func (d *Dataset) LocalReduceBy(name string, reducerId gio.ReducerId, sortOption *SortOption) *Dataset

func (*Dataset) LocalSort

func (d *Dataset) LocalSort(name string, sortOption *SortOption) *Dataset

func (*Dataset) LocalTop

func (d *Dataset) LocalTop(name string, n int, sortOption *SortOption) *Dataset

func (*Dataset) Map

func (d *Dataset) Map(name string, mapperId gio.MapperId) *Dataset

Mapper runs the mapper registered to the mapperId. This is used to execute pure Go code.

func (*Dataset) MergeSortedTo

func (d *Dataset) MergeSortedTo(name string, partitionCount int) (ret *Dataset)

func (*Dataset) MergeTo

func (d *Dataset) MergeTo(name string, partitionCount int) (ret *Dataset)

func (*Dataset) OnDisk

func (d *Dataset) OnDisk(fn func(*Dataset) *Dataset) *Dataset

OnDisk ensure the intermediate dataset are persisted to disk. This allows executors to run not in parallel if executors are limited.

func (*Dataset) Output

func (d *Dataset) Output(f func(io.Reader) error) *Dataset

Output concurrently collects outputs from previous step to the driver.

func (*Dataset) OutputRow

func (d *Dataset) OutputRow(f func(*util.Row) error) *Dataset

func (*Dataset) Partition

func (d *Dataset) Partition(name string, shard int, sortOption *SortOption) *Dataset

hash data or by data key, return a new dataset This is divided into 2 steps: 1. Each record is sharded to a local shard 2. The destination shard will collect its child shards and merge into one

func (*Dataset) PartitionByKey

func (d *Dataset) PartitionByKey(name string, shard int) *Dataset

func (*Dataset) Pipe

func (d *Dataset) Pipe(name, code string) *Dataset

Pipe runs the code as an external program, which processes the tab-separated input from the program's stdin, and outout to stdout also in tab-separated lines.

func (*Dataset) PipeAsArgs

func (d *Dataset) PipeAsArgs(name, code string) *Dataset

PipeAsArgs takes each row of input, bind to variables in parameter code. The variables are specified via $1, $2, etc. The code is run as the command for an external program for each row of input.

Watch for performance impact since it starts one Os process for each line of input.

func (*Dataset) Printf

func (d *Dataset) Printf(format string) *Dataset

Printf prints to os.Stdout in the specified format

func (*Dataset) Printlnf

func (d *Dataset) Printlnf(format string) *Dataset

Printlnf prints to os.Stdout in the specified format, adding an "\n" at the end of each format

func (*Dataset) Reduce

func (d *Dataset) Reduce(name string, reducerId gio.ReducerId) (ret *Dataset)

Reduce runs the reducer registered to the reducerId, combining all rows into one row

func (*Dataset) ReduceBy

func (d *Dataset) ReduceBy(name string, reducerId gio.ReducerId, keyFields *SortOption) (ret *Dataset)

func (*Dataset) ReduceByKey

func (d *Dataset) ReduceByKey(name string, reducerId gio.ReducerId) (ret *Dataset)

ReduceByKey runs the reducer registered to the reducerId, combining rows with the same key fields into one row

func (*Dataset) RightOuterJoin

func (d *Dataset) RightOuterJoin(name string, other *Dataset, sortOption *SortOption) *Dataset

func (*Dataset) RightOuterJoinByKey

func (d *Dataset) RightOuterJoinByKey(name string, other *Dataset) *Dataset

func (*Dataset) RoundRobin

func (d *Dataset) RoundRobin(name string, n int) *Dataset

func (*Dataset) Run

func (d *Dataset) Run(option ...FlowOption)

Run starts the whole flow. This is a convenient method, same as *Flow.Run()

func (*Dataset) RunContext

func (d *Dataset) RunContext(ctx context.Context, option ...FlowOption)

Run starts the whole flow. This is a convenient method, same as *Flow.RunContext()

func (*Dataset) SaveFirstRowTo

func (d *Dataset) SaveFirstRowTo(decodedObjects ...interface{}) *Dataset

SaveFirstRowTo saves the first row's values into the operands.

func (*Dataset) Select

func (d *Dataset) Select(name string, sortOption *SortOption) *Dataset

Select selects multiple fields into the next dataset. The index starts from 1. The first one is the key

func (*Dataset) SelectKV

func (d *Dataset) SelectKV(name string, keys, values *SortOption) *Dataset

Select selects multiple fields into the next dataset. The index starts from 1.

func (*Dataset) Sort

func (d *Dataset) Sort(name string, sortOption *SortOption) *Dataset

Sort sort on specific fields, default to the first field. Required Memory: about same size as each partition. example usage: Sort(Field(1,2)) means sorting on field 1 and 2.

func (*Dataset) SortByKey

func (d *Dataset) SortByKey(name string) *Dataset

func (*Dataset) Top

func (d *Dataset) Top(name string, k int, sortOption *SortOption) *Dataset

Top streams through total n items, picking reverse ordered k items with O(n*log(k)) complexity. Required Memory: about same size as n items in memory

func (*Dataset) TreeMergeSortedTo

func (d *Dataset) TreeMergeSortedTo(name string, partitionCount int, factor int) (ret *Dataset)

func (*Dataset) Union

func (this *Dataset) Union(name string, others []*Dataset, isParallel bool) *Dataset

Union union multiple Datasets as one Dataset

type DatasetShard

type DatasetShard struct {
	Id            int
	Dataset       *Dataset
	ReadingTasks  []*Task
	IncomingChan  *util.Piper
	OutgoingChans []*util.Piper
	Counter       int64
	ReadyTime     time.Time
	CloseTime     time.Time
	Meta          *DasetsetShardMetadata
}

func (*DatasetShard) Closed

func (s *DatasetShard) Closed() bool

func (*DatasetShard) Name

func (s *DatasetShard) Name() string

func (*DatasetShard) TimeTaken

func (s *DatasetShard) TimeTaken() time.Duration

type DatasetShardStatus

type DatasetShardStatus int
const (
	Untouched DatasetShardStatus = iota
	LocationAssigned
	InProgress
	InRetry
	Failed
	Successful
)

type Flow

type Flow struct {
	Name     string
	Steps    []*Step
	Datasets []*Dataset
	HashCode uint32
}

func New

func New(name string) (fc *Flow)

func (*Flow) AddAllToAllStep

func (f *Flow) AddAllToAllStep(input *Dataset, output *Dataset) (step *Step)

func (*Flow) AddAllToOneStep

func (f *Flow) AddAllToOneStep(input *Dataset, output *Dataset) (step *Step)

the task should run on the destination dataset shard

func (*Flow) AddLinkedNToOneStep

func (f *Flow) AddLinkedNToOneStep(input *Dataset, m int, output *Dataset) (step *Step)

func (*Flow) AddOneToAllStep

func (f *Flow) AddOneToAllStep(input *Dataset, output *Dataset) (step *Step)

the task should run on the source dataset shard input is nil for initial source dataset

func (*Flow) AddOneToEveryNStep

func (f *Flow) AddOneToEveryNStep(input *Dataset, n int, output *Dataset) (step *Step)

func (*Flow) AddOneToOneStep

func (f *Flow) AddOneToOneStep(input *Dataset, output *Dataset) (step *Step)

the tasks should run on the source dataset shard

func (*Flow) Bytes

func (fc *Flow) Bytes(slice [][]byte) (ret *Dataset)

Bytes begins a flow with an [][]byte

func (*Flow) Channel

func (fc *Flow) Channel(ch chan interface{}) (ret *Dataset)

Channel accepts a channel to feed into the flow.

func (*Flow) Hint

func (d *Flow) Hint(options ...FlowHintOption)

Hint adds hints to the flow.

func (*Flow) Ints

func (fc *Flow) Ints(numbers []int) (ret *Dataset)

Ints begins a flow with an []int

func (*Flow) Listen

func (fc *Flow) Listen(network, address string) (ret *Dataset)

Listen receives textual inputs via a socket. Multiple parameters are separated via tab.

func (*Flow) MergeDatasets1ShardTo1Step

func (f *Flow) MergeDatasets1ShardTo1Step(inputs []*Dataset, output *Dataset) (step *Step)

All dataset should have the same number of shards.

func (*Flow) NewNextDataset

func (fc *Flow) NewNextDataset(shardSize int) (ret *Dataset)

func (*Flow) NewStep

func (fc *Flow) NewStep() (step *Step)

func (*Flow) OnInterrupt

func (fc *Flow) OnInterrupt()

func (*Flow) Read

func (fc *Flow) Read(s Sourcer) (ret *Dataset)

Read accepts a function to read data into the flow, creating a new dataset. This allows custom complicated pre-built logic for new data sources.

func (*Flow) Run

func (fc *Flow) Run(options ...FlowOption)

func (*Flow) RunContext

func (fc *Flow) RunContext(ctx context.Context, options ...FlowOption)

func (*Flow) Slices

func (fc *Flow) Slices(slices [][]interface{}) (ret *Dataset)

Slices begins a flow with an [][]interface{}

func (*Flow) Source

func (fc *Flow) Source(name string, f func(io.Writer, *pb.InstructionStat) error) (ret *Dataset)

Source produces data feeding into the flow. Function f writes to this writer. The written bytes should be MsgPack encoded []byte. Use util.EncodeRow(...) to encode the data before sending to this channel

func (*Flow) Strings

func (fc *Flow) Strings(lines []string) (ret *Dataset)

Strings begins a flow with an []string

type FlowConfig

type FlowConfig struct {
	OnDisk bool
}

type FlowHintOption

type FlowHintOption func(c *FlowConfig)

type FlowOption

type FlowOption interface {
	GetFlowRunner() FlowRunner
}

type FlowRunner

type FlowRunner interface {
	RunFlowContext(context.Context, *Flow)
}

type ModeIO

type ModeIO int
const (
	ModeInMemory ModeIO = iota
	ModeOnDisk
)

type NetworkType

type NetworkType int
const (
	OneShardToOneShard NetworkType = iota
	OneShardToAllShard
	AllShardToOneShard
	OneShardToEveryNShard
	LinkedNShardToOneShard
	MergeTwoShardToOneShard
	AllShardTOAllShard
)

type RunLocked

type RunLocked struct {
	sync.Mutex
	StartTime time.Time
}

type SortOption

type SortOption struct {
	// contains filtered or unexported fields
}

func Field

func Field(indexes ...int) *SortOption

func OrderBy

func OrderBy(index int, ascending bool) *SortOption

func (*SortOption) By

func (o *SortOption) By(index int, ascending bool) *SortOption

OrderBy chains a list of sorting order by

func (*SortOption) Indexes

func (o *SortOption) Indexes() []int

return a list of indexes

func (*SortOption) String

func (o *SortOption) String() string

type Sourcer

type Sourcer interface {
	Generate(*Flow) *Dataset
}

type Step

type Step struct {
	Id             int
	Flow           *Flow
	InputDatasets  []*Dataset
	OutputDataset  *Dataset
	Function       func([]io.Reader, []io.Writer, *pb.InstructionStat) error
	Instruction    instruction.Instruction
	Tasks          []*Task
	Name           string
	Description    string
	NetworkType    NetworkType
	IsOnDriverSide bool
	IsPipe         bool
	IsGoCode       bool
	Script         script.Script
	Command        *script.Command // used in Pipe()
	Meta           *StepMetadata
	Params         map[string]interface{}
	RunLocked
}

func (*Step) GetScriptCommand

func (step *Step) GetScriptCommand() *script.Command

func (*Step) NewTask

func (step *Step) NewTask() (task *Task)

func (*Step) RunFunction

func (step *Step) RunFunction(task *Task) error

func (*Step) SetInstruction

func (step *Step) SetInstruction(prefix string, ins instruction.Instruction)

type StepMetadata

type StepMetadata struct {
	IsRestartable bool
	IsIdempotent  bool
}

type Task

type Task struct {
	Id           int
	Step         *Step
	InputShards  []*DatasetShard
	InputChans   []*util.Piper // task specific input chans. InputShard may have multiple reading tasks
	OutputShards []*DatasetShard
	Stat         *pb.InstructionStat
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL