Documentation ¶
Overview ¶
Package flow contains data structure for computation. Mostly Dataset operations such as Map/Reduce/Join/Sort etc.
Index ¶
- Variables
- type DasetsetHint
- type DasetsetMetadata
- type DasetsetShardMetadata
- type Dataset
- func (d *Dataset) Broadcast(name string, shardCount int) *Dataset
- func (d *Dataset) CoGroup(name string, other *Dataset, sortOption *SortOption) *Dataset
- func (this *Dataset) CoGroupPartitionedSorted(name string, that *Dataset, indexes []int) (ret *Dataset)
- func (d *Dataset) Distinct(name string, sortOption *SortOption) *Dataset
- func (d *Dataset) Do(fn func(*Dataset) *Dataset) *Dataset
- func (d *Dataset) DoJoin(name string, other *Dataset, leftOuter, rightOuter bool, ...) *Dataset
- func (d *Dataset) Fprintf(writer io.Writer, format string) *Dataset
- func (d *Dataset) Fprintlnf(writer io.Writer, format string) *Dataset
- func (d *Dataset) GetIsOnDiskIO() bool
- func (d *Dataset) GetPartitionSize() int64
- func (d *Dataset) GetShards() []*DatasetShard
- func (d *Dataset) GetTotalSize() int64
- func (d *Dataset) GroupBy(name string, sortOption *SortOption) *Dataset
- func (bigger *Dataset) HashJoin(name string, smaller *Dataset, sortOption *SortOption) *Dataset
- func (d *Dataset) Hint(options ...DasetsetHint) *Dataset
- func (d *Dataset) Join(name string, other *Dataset, sortOption *SortOption) *Dataset
- func (d *Dataset) JoinByKey(name string, other *Dataset) *Dataset
- func (this *Dataset) JoinPartitionedSorted(name string, that *Dataset, sortOption *SortOption, ...) *Dataset
- func (d *Dataset) LeftOuterJoin(name string, other *Dataset, sortOption *SortOption) *Dataset
- func (d *Dataset) LeftOuterJoinByKey(name string, other *Dataset) *Dataset
- func (d *Dataset) LocalDistinct(name string, sortOption *SortOption) *Dataset
- func (d *Dataset) LocalGroupBy(name string, sortOption *SortOption) *Dataset
- func (this *Dataset) LocalHashAndJoinWith(name string, that *Dataset, sortOption *SortOption) *Dataset
- func (d *Dataset) LocalLimit(name string, n int, offset int) *Dataset
- func (d *Dataset) LocalReduceBy(name string, reducerId gio.ReducerId, sortOption *SortOption) *Dataset
- func (d *Dataset) LocalSort(name string, sortOption *SortOption) *Dataset
- func (d *Dataset) LocalTop(name string, n int, sortOption *SortOption) *Dataset
- func (d *Dataset) Map(name string, mapperId gio.MapperId) *Dataset
- func (d *Dataset) MergeSortedTo(name string, partitionCount int) (ret *Dataset)
- func (d *Dataset) MergeTo(name string, partitionCount int) (ret *Dataset)
- func (d *Dataset) OnDisk(fn func(*Dataset) *Dataset) *Dataset
- func (d *Dataset) Output(f func(io.Reader) error) *Dataset
- func (d *Dataset) OutputRow(f func(*util.Row) error) *Dataset
- func (d *Dataset) Partition(name string, shard int, sortOption *SortOption) *Dataset
- func (d *Dataset) PartitionByKey(name string, shard int) *Dataset
- func (d *Dataset) Pipe(name, code string) *Dataset
- func (d *Dataset) PipeAsArgs(name, code string) *Dataset
- func (d *Dataset) Printf(format string) *Dataset
- func (d *Dataset) Printlnf(format string) *Dataset
- func (d *Dataset) Reduce(name string, reducerId gio.ReducerId) (ret *Dataset)
- func (d *Dataset) ReduceBy(name string, reducerId gio.ReducerId, keyFields *SortOption) (ret *Dataset)
- func (d *Dataset) ReduceByKey(name string, reducerId gio.ReducerId) (ret *Dataset)
- func (d *Dataset) RightOuterJoin(name string, other *Dataset, sortOption *SortOption) *Dataset
- func (d *Dataset) RightOuterJoinByKey(name string, other *Dataset) *Dataset
- func (d *Dataset) RoundRobin(name string, n int) *Dataset
- func (d *Dataset) Run(option ...FlowOption)
- func (d *Dataset) RunContext(ctx context.Context, option ...FlowOption)
- func (d *Dataset) SaveFirstRowTo(decodedObjects ...interface{}) *Dataset
- func (d *Dataset) Select(name string, sortOption *SortOption) *Dataset
- func (d *Dataset) SelectKV(name string, keys, values *SortOption) *Dataset
- func (d *Dataset) Sort(name string, sortOption *SortOption) *Dataset
- func (d *Dataset) SortByKey(name string) *Dataset
- func (d *Dataset) Top(name string, k int, sortOption *SortOption) *Dataset
- func (d *Dataset) TreeMergeSortedTo(name string, partitionCount int, factor int) (ret *Dataset)
- func (this *Dataset) Union(name string, others []*Dataset, isParallel bool) *Dataset
- type DatasetShard
- type DatasetShardStatus
- type Flow
- func (f *Flow) AddAllToAllStep(input *Dataset, output *Dataset) (step *Step)
- func (f *Flow) AddAllToOneStep(input *Dataset, output *Dataset) (step *Step)
- func (f *Flow) AddLinkedNToOneStep(input *Dataset, m int, output *Dataset) (step *Step)
- func (f *Flow) AddOneToAllStep(input *Dataset, output *Dataset) (step *Step)
- func (f *Flow) AddOneToEveryNStep(input *Dataset, n int, output *Dataset) (step *Step)
- func (f *Flow) AddOneToOneStep(input *Dataset, output *Dataset) (step *Step)
- func (fc *Flow) Bytes(slice [][]byte) (ret *Dataset)
- func (fc *Flow) Channel(ch chan interface{}) (ret *Dataset)
- func (d *Flow) Hint(options ...FlowHintOption)
- func (fc *Flow) Ints(numbers []int) (ret *Dataset)
- func (fc *Flow) Listen(network, address string) (ret *Dataset)
- func (f *Flow) MergeDatasets1ShardTo1Step(inputs []*Dataset, output *Dataset) (step *Step)
- func (fc *Flow) NewNextDataset(shardSize int) (ret *Dataset)
- func (fc *Flow) NewStep() (step *Step)
- func (fc *Flow) OnInterrupt()
- func (fc *Flow) Read(s Sourcer) (ret *Dataset)
- func (fc *Flow) Run(options ...FlowOption)
- func (fc *Flow) RunContext(ctx context.Context, options ...FlowOption)
- func (fc *Flow) Slices(slices [][]interface{}) (ret *Dataset)
- func (fc *Flow) Source(name string, f func(io.Writer, *pb.InstructionStat) error) (ret *Dataset)
- func (fc *Flow) Strings(lines []string) (ret *Dataset)
- type FlowConfig
- type FlowHintOption
- type FlowOption
- type FlowRunner
- type ModeIO
- type NetworkType
- type RunLocked
- type SortOption
- type Sourcer
- type Step
- type StepMetadata
- type Task
Constants ¶
This section is empty.
Variables ¶
var (
Local *localDriver
)
Functions ¶
This section is empty.
Types ¶
type DasetsetHint ¶
type DasetsetHint func(d *Dataset)
func PartitionSize ¶
func PartitionSize(n int64) DasetsetHint
PartitionSize hints the partition size in MB. This is usually used when sorting is needed.
func TotalSize ¶
func TotalSize(n int64) DasetsetHint
TotalSize hints the total size in MB for all the partitions. This is usually used when sorting is needed.
type DasetsetMetadata ¶
type DasetsetShardMetadata ¶
type Dataset ¶
type Dataset struct { Flow *Flow Id int Shards []*DatasetShard Step *Step ReadingSteps []*Step IsPartitionedBy []int IsLocalSorted []instruction.OrderBy Meta *DasetsetMetadata RunLocked }
func (*Dataset) CoGroup ¶
func (d *Dataset) CoGroup(name string, other *Dataset, sortOption *SortOption) *Dataset
CoGroup joins two datasets by the key, Each result row becomes this format:
(key, []left_rows, []right_rows)
func (*Dataset) CoGroupPartitionedSorted ¶
func (this *Dataset) CoGroupPartitionedSorted(name string, that *Dataset, indexes []int) (ret *Dataset)
CoGroupPartitionedSorted joins 2 datasets that are sharded by the same key and already locally sorted within each shard.
func (*Dataset) Distinct ¶
func (d *Dataset) Distinct(name string, sortOption *SortOption) *Dataset
Distinct sort on specific fields and pick the unique ones. Required Memory: about same size as each partition. example usage: Distinct(Field(1,2)) means distinct on field 1 and 2. TODO: optimize for low cardinality case.
func (*Dataset) Do ¶
Do accepts a function to transform a dataset into a new dataset. This allows custom complicated pre-built logic.
func (*Dataset) GetIsOnDiskIO ¶
GetIsOnDiskIO returns true if the dataset is persisted to disk in distributed mode.
func (*Dataset) GetPartitionSize ¶
GetPartitionSize returns the size in MB for each partition of the dataset. This is based on the hinted total size divided by the number of partitions.
func (*Dataset) GetShards ¶
func (d *Dataset) GetShards() []*DatasetShard
func (*Dataset) GetTotalSize ¶
GetTotalSize returns the total size in MB for the dataset. This is based on the given hint.
func (*Dataset) GroupBy ¶
func (d *Dataset) GroupBy(name string, sortOption *SortOption) *Dataset
GroupBy e.g. GroupBy("", Field(1,2,3)) group data by field 1,2,3
func (*Dataset) HashJoin ¶
func (bigger *Dataset) HashJoin(name string, smaller *Dataset, sortOption *SortOption) *Dataset
HashJoin joins two datasets by putting the smaller dataset in memory on all executors and streams through the bigger dataset.
func (*Dataset) Hint ¶
func (d *Dataset) Hint(options ...DasetsetHint) *Dataset
Hint adds options for previous dataset.
func (*Dataset) Join ¶
func (d *Dataset) Join(name string, other *Dataset, sortOption *SortOption) *Dataset
Join joins two datasets by the key.
func (*Dataset) JoinPartitionedSorted ¶
func (this *Dataset) JoinPartitionedSorted(name string, that *Dataset, sortOption *SortOption, isLeftOuterJoin, isRightOuterJoin bool) *Dataset
JoinPartitionedSorted Join multiple datasets that are sharded by the same key, and locally sorted within the shard
func (*Dataset) LeftOuterJoin ¶
func (d *Dataset) LeftOuterJoin(name string, other *Dataset, sortOption *SortOption) *Dataset
func (*Dataset) LeftOuterJoinByKey ¶
func (*Dataset) LocalDistinct ¶
func (d *Dataset) LocalDistinct(name string, sortOption *SortOption) *Dataset
func (*Dataset) LocalGroupBy ¶
func (d *Dataset) LocalGroupBy(name string, sortOption *SortOption) *Dataset
func (*Dataset) LocalHashAndJoinWith ¶
func (this *Dataset) LocalHashAndJoinWith(name string, that *Dataset, sortOption *SortOption) *Dataset
func (*Dataset) LocalLimit ¶
LocalLimit take the local first n rows and skip all other rows.
func (*Dataset) LocalReduceBy ¶
func (*Dataset) LocalSort ¶
func (d *Dataset) LocalSort(name string, sortOption *SortOption) *Dataset
func (*Dataset) LocalTop ¶
func (d *Dataset) LocalTop(name string, n int, sortOption *SortOption) *Dataset
func (*Dataset) Map ¶
Mapper runs the mapper registered to the mapperId. This is used to execute pure Go code.
func (*Dataset) MergeSortedTo ¶
func (*Dataset) OnDisk ¶
OnDisk ensure the intermediate dataset are persisted to disk. This allows executors to run not in parallel if executors are limited.
func (*Dataset) Partition ¶
func (d *Dataset) Partition(name string, shard int, sortOption *SortOption) *Dataset
hash data or by data key, return a new dataset This is divided into 2 steps: 1. Each record is sharded to a local shard 2. The destination shard will collect its child shards and merge into one
func (*Dataset) Pipe ¶
Pipe runs the code as an external program, which processes the tab-separated input from the program's stdin, and outout to stdout also in tab-separated lines.
func (*Dataset) PipeAsArgs ¶
PipeAsArgs takes each row of input, bind to variables in parameter code. The variables are specified via $1, $2, etc. The code is run as the command for an external program for each row of input.
Watch for performance impact since it starts one Os process for each line of input.
func (*Dataset) Printlnf ¶
Printlnf prints to os.Stdout in the specified format, adding an "\n" at the end of each format
func (*Dataset) Reduce ¶
Reduce runs the reducer registered to the reducerId, combining all rows into one row
func (*Dataset) ReduceByKey ¶
ReduceByKey runs the reducer registered to the reducerId, combining rows with the same key fields into one row
func (*Dataset) RightOuterJoin ¶
func (d *Dataset) RightOuterJoin(name string, other *Dataset, sortOption *SortOption) *Dataset
func (*Dataset) RightOuterJoinByKey ¶
func (*Dataset) Run ¶
func (d *Dataset) Run(option ...FlowOption)
Run starts the whole flow. This is a convenient method, same as *Flow.Run()
func (*Dataset) RunContext ¶
func (d *Dataset) RunContext(ctx context.Context, option ...FlowOption)
Run starts the whole flow. This is a convenient method, same as *Flow.RunContext()
func (*Dataset) SaveFirstRowTo ¶
SaveFirstRowTo saves the first row's values into the operands.
func (*Dataset) Select ¶
func (d *Dataset) Select(name string, sortOption *SortOption) *Dataset
Select selects multiple fields into the next dataset. The index starts from 1. The first one is the key
func (*Dataset) SelectKV ¶
func (d *Dataset) SelectKV(name string, keys, values *SortOption) *Dataset
Select selects multiple fields into the next dataset. The index starts from 1.
func (*Dataset) Sort ¶
func (d *Dataset) Sort(name string, sortOption *SortOption) *Dataset
Sort sort on specific fields, default to the first field. Required Memory: about same size as each partition. example usage: Sort(Field(1,2)) means sorting on field 1 and 2.
func (*Dataset) Top ¶
func (d *Dataset) Top(name string, k int, sortOption *SortOption) *Dataset
Top streams through total n items, picking reverse ordered k items with O(n*log(k)) complexity. Required Memory: about same size as n items in memory
func (*Dataset) TreeMergeSortedTo ¶
type DatasetShard ¶
type DatasetShard struct { Id int Dataset *Dataset ReadingTasks []*Task IncomingChan *util.Piper OutgoingChans []*util.Piper Counter int64 ReadyTime time.Time CloseTime time.Time Meta *DasetsetShardMetadata }
func (*DatasetShard) Closed ¶
func (s *DatasetShard) Closed() bool
func (*DatasetShard) Name ¶
func (s *DatasetShard) Name() string
func (*DatasetShard) TimeTaken ¶
func (s *DatasetShard) TimeTaken() time.Duration
type DatasetShardStatus ¶
type DatasetShardStatus int
const ( Untouched DatasetShardStatus = iota LocationAssigned InProgress InRetry Failed Successful )
type Flow ¶
func (*Flow) AddAllToAllStep ¶
func (*Flow) AddAllToOneStep ¶
the task should run on the destination dataset shard
func (*Flow) AddLinkedNToOneStep ¶
func (*Flow) AddOneToAllStep ¶
the task should run on the source dataset shard input is nil for initial source dataset
func (*Flow) AddOneToEveryNStep ¶
func (*Flow) AddOneToOneStep ¶
the tasks should run on the source dataset shard
func (*Flow) Listen ¶
Listen receives textual inputs via a socket. Multiple parameters are separated via tab.
func (*Flow) MergeDatasets1ShardTo1Step ¶
All dataset should have the same number of shards.
func (*Flow) NewNextDataset ¶
func (*Flow) OnInterrupt ¶
func (fc *Flow) OnInterrupt()
func (*Flow) Read ¶
Read accepts a function to read data into the flow, creating a new dataset. This allows custom complicated pre-built logic for new data sources.
func (*Flow) Run ¶
func (fc *Flow) Run(options ...FlowOption)
func (*Flow) RunContext ¶
func (fc *Flow) RunContext(ctx context.Context, options ...FlowOption)
type FlowConfig ¶
type FlowConfig struct {
OnDisk bool
}
type FlowHintOption ¶
type FlowHintOption func(c *FlowConfig)
type FlowOption ¶
type FlowOption interface {
GetFlowRunner() FlowRunner
}
type FlowRunner ¶
type NetworkType ¶
type NetworkType int
const ( OneShardToOneShard NetworkType = iota OneShardToAllShard AllShardToOneShard OneShardToEveryNShard LinkedNShardToOneShard MergeTwoShardToOneShard AllShardTOAllShard )
type SortOption ¶
type SortOption struct {
// contains filtered or unexported fields
}
func Field ¶
func Field(indexes ...int) *SortOption
func OrderBy ¶
func OrderBy(index int, ascending bool) *SortOption
func (*SortOption) By ¶
func (o *SortOption) By(index int, ascending bool) *SortOption
OrderBy chains a list of sorting order by
func (*SortOption) String ¶
func (o *SortOption) String() string
type Step ¶
type Step struct { Id int Flow *Flow InputDatasets []*Dataset OutputDataset *Dataset Function func([]io.Reader, []io.Writer, *pb.InstructionStat) error Instruction instruction.Instruction Tasks []*Task Name string Description string NetworkType NetworkType IsOnDriverSide bool IsPipe bool IsGoCode bool Script script.Script Command *script.Command // used in Pipe() Meta *StepMetadata Params map[string]interface{} RunLocked }
func (*Step) GetScriptCommand ¶
func (*Step) RunFunction ¶
func (*Step) SetInstruction ¶
func (step *Step) SetInstruction(prefix string, ins instruction.Instruction)
type StepMetadata ¶
type Task ¶
type Task struct { Id int Step *Step InputShards []*DatasetShard InputChans []*util.Piper // task specific input chans. InputShard may have multiple reading tasks OutputShards []*DatasetShard Stat *pb.InstructionStat }
Source Files ¶
- context.go
- context_hint.go
- dataset.go
- dataset_cogroup.go
- dataset_do.go
- dataset_group.go
- dataset_hint.go
- dataset_join.go
- dataset_join_hash.go
- dataset_map.go
- dataset_merge.go
- dataset_output.go
- dataset_partition.go
- dataset_pipe.go
- dataset_reduce.go
- dataset_sort.go
- dataset_sort_option.go
- dataset_source.go
- dataset_union.go
- runner.go
- runner_on_interrupt.go
- step.go
- structure.go