Documentation
¶
Overview ¶
Package flow contains data structure for computation. Mostly Dataset operations such as Map/Reduce/Join/Sort etc.
Index ¶
- func FromDatasetShardToTask(shard *DatasetShard, task *Task)
- func FromDatasetToStep(input *Dataset, step *Step)
- func FromStepToDataset(step *Step, output *Dataset)
- func FromTaskToDatasetShard(task *Task, shard *DatasetShard)
- type DasetsetHint
- type DasetsetMetadata
- type DasetsetShardMetadata
- type Dataset
- func (d *Dataset) Broadcast(shardCount int) *Dataset
- func (d *Dataset) CoGroup(other *Dataset, sortOptions ...*SortOption) *Dataset
- func (this *Dataset) CoGroupPartitionedSorted(that *Dataset, indexes []int) (ret *Dataset)
- func (d *Dataset) Do(fn func(*Dataset) *Dataset) *Dataset
- func (d *Dataset) DoJoin(other *Dataset, leftOuter, rightOuter bool, sortOptions ...*SortOption) *Dataset
- func (d *Dataset) Filter(code string) *Dataset
- func (d *Dataset) FlatMap(code string) *Dataset
- func (d *Dataset) ForEach(code string) *Dataset
- func (d *Dataset) Fprintf(writer io.Writer, format string) *Dataset
- func (d *Dataset) GetIsOnDiskIO() bool
- func (d *Dataset) GetPartitionSize() int64
- func (d *Dataset) GetShards() []*DatasetShard
- func (d *Dataset) GetTotalSize() int64
- func (d *Dataset) GroupBy(sortOptions ...*SortOption) *Dataset
- func (bigger *Dataset) HashJoin(smaller *Dataset, sortOptions ...*SortOption) *Dataset
- func (d *Dataset) Hint(options ...DasetsetHint) *Dataset
- func (d *Dataset) Init(scriptPart string) *Dataset
- func (d *Dataset) Join(other *Dataset, sortOptions ...*SortOption) *Dataset
- func (this *Dataset) JoinPartitionedSorted(that *Dataset, sortOption *SortOption, isLeftOuterJoin, isRightOuterJoin bool) *Dataset
- func (d *Dataset) LeftOuterJoin(other *Dataset, sortOptions ...*SortOption) *Dataset
- func (d *Dataset) LocalGroupBy(sortOptions ...*SortOption) *Dataset
- func (this *Dataset) LocalHashAndJoinWith(that *Dataset, sortOptions ...*SortOption) *Dataset
- func (d *Dataset) LocalLimit(n int) *Dataset
- func (d *Dataset) LocalReduce(code string) *Dataset
- func (d *Dataset) LocalReduceBy(code string, sortOptions ...*SortOption) *Dataset
- func (d *Dataset) LocalReducerBy(reducerId gio.ReducerId, sortOptions ...*SortOption) *Dataset
- func (d *Dataset) LocalSort(sortOptions ...*SortOption) *Dataset
- func (d *Dataset) LocalTop(n int, sortOptions ...*SortOption) *Dataset
- func (d *Dataset) Map(code string) *Dataset
- func (d *Dataset) Mapper(mapperId gio.MapperId) *Dataset
- func (d *Dataset) MergeSortedTo(partitionCount int, sortOptions ...*SortOption) (ret *Dataset)
- func (d *Dataset) OnDisk(fn func(*Dataset) *Dataset) *Dataset
- func (d *Dataset) Output(f func(io.Reader) error) *Dataset
- func (d *Dataset) Partition(shard int, sortOptions ...*SortOption) *Dataset
- func (d *Dataset) Pipe(code string) *Dataset
- func (d *Dataset) PipeAsArgs(code string) *Dataset
- func (d *Dataset) PipeOut(writer io.Writer) *Dataset
- func (d *Dataset) Reduce(code string) (ret *Dataset)
- func (d *Dataset) ReduceBy(code string, sortOptions ...*SortOption) (ret *Dataset)
- func (d *Dataset) ReducerBy(reducerId gio.ReducerId, sortOptions ...*SortOption) (ret *Dataset)
- func (d *Dataset) RightOuterJoin(other *Dataset, sortOptions ...*SortOption) *Dataset
- func (d *Dataset) RoundRobin(shard int) *Dataset
- func (d *Dataset) Run(option ...FlowOption)
- func (d *Dataset) SaveFirstRowTo(decodedObjects ...interface{}) *Dataset
- func (d *Dataset) Script(scriptType string) *Dataset
- func (d *Dataset) Select(sortOptions ...*SortOption) *Dataset
- func (d *Dataset) Sort(sortOptions ...*SortOption) *Dataset
- func (d *Dataset) Top(k int, sortOptions ...*SortOption) *Dataset
- func (d *Dataset) TreeMergeSortedTo(partitionCount int, factor int, sortOptions ...*SortOption) (ret *Dataset)
- type DatasetShard
- type DatasetShardStatus
- type FlowContext
- func (f *FlowContext) AddAllToOneStep(input *Dataset, output *Dataset) (step *Step)
- func (f *FlowContext) AddLinkedNToOneStep(input *Dataset, m int, output *Dataset) (step *Step)
- func (f *FlowContext) AddOneToAllStep(input *Dataset, output *Dataset) (step *Step)
- func (f *FlowContext) AddOneToEveryNStep(input *Dataset, n int, output *Dataset) (step *Step)
- func (f *FlowContext) AddOneToOneStep(input *Dataset, output *Dataset) (step *Step)
- func (fc *FlowContext) Bytes(slice [][]byte) (ret *Dataset)
- func (fc *FlowContext) Channel(ch chan interface{}) (ret *Dataset)
- func (d *FlowContext) Hint(options ...FlowContextOption)
- func (fc *FlowContext) Init(scriptPart string) *FlowContext
- func (fc *FlowContext) Ints(numbers []int) (ret *Dataset)
- func (fc *FlowContext) Listen(network, address string) (ret *Dataset)
- func (f *FlowContext) MergeDatasets1ShardTo1Step(inputs []*Dataset, output *Dataset) (step *Step)
- func (fc *FlowContext) NewStep() (step *Step)
- func (fc *FlowContext) OnInterrupt()
- func (fc *FlowContext) Query(connectionId string, query adapter.AdapterQuery) (ret *Dataset)
- func (fc *FlowContext) ReadFile(source adapter.AdapterFileSource) (ret *Dataset)
- func (fc *FlowContext) ReadTsv(reader io.Reader) (ret *Dataset)
- func (fc *FlowContext) Run(options ...FlowOption)
- func (fc *FlowContext) Script(scriptType string) *FlowContext
- func (fc *FlowContext) Source(f func(io.Writer) error) (ret *Dataset)
- func (fc *FlowContext) Strings(lines []string) (ret *Dataset)
- func (fc *FlowContext) TextFile(fname string) (ret *Dataset)
- type FlowContextConfig
- type FlowContextOption
- type FlowOption
- type FlowRunner
- type ModeIO
- type NetworkType
- type RunLocked
- type SortOption
- type Step
- type StepMetadata
- type Task
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func FromDatasetShardToTask ¶
func FromDatasetShardToTask(shard *DatasetShard, task *Task)
func FromDatasetToStep ¶
func FromStepToDataset ¶
func FromTaskToDatasetShard ¶
func FromTaskToDatasetShard(task *Task, shard *DatasetShard)
Types ¶
type DasetsetHint ¶
type DasetsetHint func(d *Dataset)
func PartitionSize ¶
func PartitionSize(n int64) DasetsetHint
PartitionSize hints the partition size in MB. This is usually used when sorting is needed.
func TotalSize ¶
func TotalSize(n int64) DasetsetHint
TotalSize hints the total size in MB for all the partitions. This is usually used when sorting is needed.
type DasetsetMetadata ¶
type DasetsetShardMetadata ¶
type Dataset ¶
type Dataset struct { FlowContext *FlowContext Id int Shards []*DatasetShard Step *Step ReadingSteps []*Step IsPartitionedBy []int IsLocalSorted []instruction.OrderBy Meta *DasetsetMetadata RunLocked }
func (*Dataset) CoGroup ¶
func (d *Dataset) CoGroup(other *Dataset, sortOptions ...*SortOption) *Dataset
CoGroup joins two datasets by the key, Each result row becomes this format:
(key, []left_rows, []right_rows)
func (*Dataset) CoGroupPartitionedSorted ¶
CoGroupPartitionedSorted joins 2 datasets that are sharded by the same key and already locally sorted within each shard.
func (*Dataset) Do ¶
Do accepts a function to transform a dataset into a new dataset. This allows custom complicated pre-built logic.
func (*Dataset) DoJoin ¶
func (d *Dataset) DoJoin(other *Dataset, leftOuter, rightOuter bool, sortOptions ...*SortOption) *Dataset
func (*Dataset) Filter ¶
Filter conditionally filter some rows into the next dataset. The code should be a function just returning a boolean result.
func (*Dataset) ForEach ¶
ForEach operates on each row, but the results are not collected. This is used to create some side effects.
func (*Dataset) GetIsOnDiskIO ¶
GetIsOnDiskIO returns true if the dataset is persisted to disk in distributed mode.
func (*Dataset) GetPartitionSize ¶
GetPartitionSize returns the size in MB for each partition of the dataset. This is based on the hinted total size divided by the number of partitions.
func (*Dataset) GetShards ¶
func (d *Dataset) GetShards() []*DatasetShard
func (*Dataset) GetTotalSize ¶
GetTotalSize returns the total size in MB for the dataset. This is based on the given hint.
func (*Dataset) GroupBy ¶
func (d *Dataset) GroupBy(sortOptions ...*SortOption) *Dataset
GroupBy e.g. GroupBy(Field(1,2,3)) group data by field 1,2,3
func (*Dataset) HashJoin ¶
func (bigger *Dataset) HashJoin(smaller *Dataset, sortOptions ...*SortOption) *Dataset
HashJoin joins two datasets by putting the smaller dataset in memory on all executors and streams through the bigger dataset.
func (*Dataset) Hint ¶
func (d *Dataset) Hint(options ...DasetsetHint) *Dataset
Hint adds options for previous dataset.
func (*Dataset) Join ¶
func (d *Dataset) Join(other *Dataset, sortOptions ...*SortOption) *Dataset
Join joins two datasets by the key.
func (*Dataset) JoinPartitionedSorted ¶
func (this *Dataset) JoinPartitionedSorted(that *Dataset, sortOption *SortOption, isLeftOuterJoin, isRightOuterJoin bool) *Dataset
JoinPartitionedSorted Join multiple datasets that are sharded by the same key, and locally sorted within the shard
func (*Dataset) LeftOuterJoin ¶
func (d *Dataset) LeftOuterJoin(other *Dataset, sortOptions ...*SortOption) *Dataset
func (*Dataset) LocalGroupBy ¶
func (d *Dataset) LocalGroupBy(sortOptions ...*SortOption) *Dataset
func (*Dataset) LocalHashAndJoinWith ¶
func (this *Dataset) LocalHashAndJoinWith(that *Dataset, sortOptions ...*SortOption) *Dataset
func (*Dataset) LocalLimit ¶
LocalLimit take the local first n rows and skip all other rows.
func (*Dataset) LocalReduce ¶
func (*Dataset) LocalReduceBy ¶
func (d *Dataset) LocalReduceBy(code string, sortOptions ...*SortOption) *Dataset
func (*Dataset) LocalReducerBy ¶
func (d *Dataset) LocalReducerBy(reducerId gio.ReducerId, sortOptions ...*SortOption) *Dataset
func (*Dataset) LocalSort ¶
func (d *Dataset) LocalSort(sortOptions ...*SortOption) *Dataset
func (*Dataset) Map ¶
Map operates on each row, and the returned results are passed to next dataset.
func (*Dataset) Mapper ¶
Mapper runs the mapper registered to the mapperId. This is used to execute pure Go code.
func (*Dataset) MergeSortedTo ¶
func (d *Dataset) MergeSortedTo(partitionCount int, sortOptions ...*SortOption) (ret *Dataset)
func (*Dataset) OnDisk ¶
OnDisk ensure the intermediate dataset are persisted to disk. This allows executors to run not in parallel if executors are limited.
func (*Dataset) Partition ¶
func (d *Dataset) Partition(shard int, sortOptions ...*SortOption) *Dataset
hash data or by data key, return a new dataset This is divided into 2 steps: 1. Each record is sharded to a local shard 2. The destination shard will collect its child shards and merge into one
func (*Dataset) Pipe ¶
Pipe runs the code as an external program, which processes the tab-separated input from the program's stdin, and outout to stdout also in tab-separated lines.
func (*Dataset) PipeAsArgs ¶
PipeAsArgs takes each row of input, bind to variables in parameter code. The variables are specified via $1, $2, etc. The code is run as the command for an external program for each row of input.
Watch for performance impact since it starts one Os process for each line of input.
func (*Dataset) PipeOut ¶
PipeOut writes to writer. If previous step is a Pipe() or PipeAsArgs(), the output is written as is. Otherwise, each row of output is written in tab-separated lines.
func (*Dataset) ReduceBy ¶
func (d *Dataset) ReduceBy(code string, sortOptions ...*SortOption) (ret *Dataset)
func (*Dataset) ReducerBy ¶
func (d *Dataset) ReducerBy(reducerId gio.ReducerId, sortOptions ...*SortOption) (ret *Dataset)
ReducerBy runs the reducer registered to the reducerId. This is used to execute pure Go code.
func (*Dataset) RightOuterJoin ¶
func (d *Dataset) RightOuterJoin(other *Dataset, sortOptions ...*SortOption) *Dataset
func (*Dataset) RoundRobin ¶
func (*Dataset) Run ¶
func (d *Dataset) Run(option ...FlowOption)
Run starts the whole flow. This is a convenient method, same as *FlowContext.Run()
func (*Dataset) SaveFirstRowTo ¶
SaveFirstRowTo saves the first row's values into the operands.
func (*Dataset) Select ¶
func (d *Dataset) Select(sortOptions ...*SortOption) *Dataset
Select selects multiple fields into the next dataset. The index starts from 1.
func (*Dataset) Sort ¶
func (d *Dataset) Sort(sortOptions ...*SortOption) *Dataset
Sort sort on specific fields, default to the first field. Required Memory: about same size as each partition. example usage: Sort(Field(1,2)) means sorting on field 1 and 2.
func (*Dataset) Top ¶
func (d *Dataset) Top(k int, sortOptions ...*SortOption) *Dataset
Top streams through total n items, picking reverse ordered k items with O(n*log(k)) complexity. Required Memory: about same size as n items in memory
func (*Dataset) TreeMergeSortedTo ¶
func (d *Dataset) TreeMergeSortedTo(partitionCount int, factor int, sortOptions ...*SortOption) (ret *Dataset)
type DatasetShard ¶
type DatasetShard struct { Id int Dataset *Dataset ReadingTasks []*Task IncomingChan *util.Piper OutgoingChans []*util.Piper Counter int64 ReadyTime time.Time CloseTime time.Time Meta *DasetsetShardMetadata }
func (*DatasetShard) Closed ¶
func (s *DatasetShard) Closed() bool
func (*DatasetShard) Name ¶
func (s *DatasetShard) Name() string
func (*DatasetShard) TimeTaken ¶
func (s *DatasetShard) TimeTaken() time.Duration
type DatasetShardStatus ¶
type DatasetShardStatus int
const ( Untouched DatasetShardStatus = iota LocationAssigned InProgress InRetry Failed Successful )
type FlowContext ¶
type FlowContext struct { PrevScriptType string PrevScriptPart string Scripts map[string]func() script.Script Steps []*Step Datasets []*Dataset HashCode uint32 }
func New ¶
func New() (fc *FlowContext)
func (*FlowContext) AddAllToOneStep ¶
func (f *FlowContext) AddAllToOneStep(input *Dataset, output *Dataset) (step *Step)
the task should run on the destination dataset shard
func (*FlowContext) AddLinkedNToOneStep ¶
func (f *FlowContext) AddLinkedNToOneStep(input *Dataset, m int, output *Dataset) (step *Step)
func (*FlowContext) AddOneToAllStep ¶
func (f *FlowContext) AddOneToAllStep(input *Dataset, output *Dataset) (step *Step)
the task should run on the source dataset shard input is nil for initial source dataset
func (*FlowContext) AddOneToEveryNStep ¶
func (f *FlowContext) AddOneToEveryNStep(input *Dataset, n int, output *Dataset) (step *Step)
func (*FlowContext) AddOneToOneStep ¶
func (f *FlowContext) AddOneToOneStep(input *Dataset, output *Dataset) (step *Step)
the tasks should run on the source dataset shard
func (*FlowContext) Bytes ¶
func (fc *FlowContext) Bytes(slice [][]byte) (ret *Dataset)
Bytes begins a flow with an [][]byte
func (*FlowContext) Channel ¶
func (fc *FlowContext) Channel(ch chan interface{}) (ret *Dataset)
Channel accepts a channel to feed into the flow.
func (*FlowContext) Hint ¶
func (d *FlowContext) Hint(options ...FlowContextOption)
Hint adds hints to the flow.
func (*FlowContext) Init ¶
func (fc *FlowContext) Init(scriptPart string) *FlowContext
Init defines or declares variables or functions for the script. This piece of code is executed first, before each function that invokes a script.
func (*FlowContext) Ints ¶
func (fc *FlowContext) Ints(numbers []int) (ret *Dataset)
Ints begins a flow with an []int
func (*FlowContext) Listen ¶
func (fc *FlowContext) Listen(network, address string) (ret *Dataset)
Listen receives textual inputs via a socket. Multiple parameters are separated via tab.
func (*FlowContext) MergeDatasets1ShardTo1Step ¶
func (f *FlowContext) MergeDatasets1ShardTo1Step(inputs []*Dataset, output *Dataset) (step *Step)
All dataset should have the same number of shards.
func (*FlowContext) NewStep ¶
func (fc *FlowContext) NewStep() (step *Step)
func (*FlowContext) OnInterrupt ¶
func (fc *FlowContext) OnInterrupt()
func (*FlowContext) Query ¶
func (fc *FlowContext) Query(connectionId string, query adapter.AdapterQuery) (ret *Dataset)
Query use the connection information specified via connectionId and then run the query to fetch the data as input
func (*FlowContext) ReadFile ¶
func (fc *FlowContext) ReadFile(source adapter.AdapterFileSource) (ret *Dataset)
ReadFile read files according to fileType The file can be on local, hdfs, s3, etc.
func (*FlowContext) ReadTsv ¶
func (fc *FlowContext) ReadTsv(reader io.Reader) (ret *Dataset)
ReadTsv read tab-separated lines from the reader
func (*FlowContext) Run ¶
func (fc *FlowContext) Run(options ...FlowOption)
func (*FlowContext) Script ¶
func (fc *FlowContext) Script(scriptType string) *FlowContext
Script defines the code to execute to generate the next dataset.
func (*FlowContext) Source ¶
func (fc *FlowContext) Source(f func(io.Writer) error) (ret *Dataset)
Source produces data feeding into the flow. Function f writes to this writer. The written bytes should be MsgPack encoded []byte. Use util.EncodeRow(...) to encode the data before sending to this channel
func (*FlowContext) Strings ¶
func (fc *FlowContext) Strings(lines []string) (ret *Dataset)
Strings begins a flow with an []string
func (*FlowContext) TextFile ¶
func (fc *FlowContext) TextFile(fname string) (ret *Dataset)
TextFile reads the file content as lines and feed into the flow. The file can be a local file or hdfs://namenode:port/path/to/hdfs/file
type FlowContextConfig ¶
type FlowContextConfig struct {
OnDisk bool
}
type FlowContextOption ¶
type FlowContextOption func(c *FlowContextConfig)
type FlowOption ¶
type FlowOption interface {
GetFlowRunner() FlowRunner
}
type FlowRunner ¶
type FlowRunner interface {
RunFlowContext(*FlowContext)
}
type NetworkType ¶
type NetworkType int
const ( OneShardToOneShard NetworkType = iota OneShardToAllShard AllShardToOneShard OneShardToEveryNShard LinkedNShardToOneShard MergeTwoShardToOneShard )
type SortOption ¶
type SortOption struct {
// contains filtered or unexported fields
}
func Field ¶
func Field(indexes ...int) *SortOption
By groups the indexes, usually start from 1, into a []int
func OrderBy ¶
func OrderBy(index int, ascending bool) *SortOption
func (*SortOption) By ¶
func (o *SortOption) By(index int, ascending bool) *SortOption
OrderBy chains a list of sorting order by
type Step ¶
type Step struct { Id int FlowContext *FlowContext InputDatasets []*Dataset OutputDataset *Dataset Function func([]io.Reader, []io.Writer, *instruction.Stats) error Instruction instruction.Instruction Tasks []*Task Name string NetworkType NetworkType IsOnDriverSide bool IsPipe bool Script script.Script Command *script.Command // used in Pipe() Meta *StepMetadata Params map[string]interface{} RunLocked }
func (*Step) GetScriptCommand ¶
func (*Step) RunFunction ¶
func (*Step) SetInstruction ¶
func (step *Step) SetInstruction(ins instruction.Instruction)
type StepMetadata ¶
type Task ¶
type Task struct { Id int Step *Step InputShards []*DatasetShard InputChans []*util.Piper // task specific input chans. InputShard may have multiple reading tasks OutputShards []*DatasetShard Stats *instruction.Stats }
Source Files
¶
- context.go
- context_hint.go
- context_script.go
- dataset.go
- dataset_cogroup.go
- dataset_do.go
- dataset_group.go
- dataset_hint.go
- dataset_join.go
- dataset_join_hash.go
- dataset_map.go
- dataset_output.go
- dataset_partition.go
- dataset_pipe.go
- dataset_reduce.go
- dataset_sort.go
- dataset_sort_option.go
- dataset_source.go
- dataset_source_query.go
- runner.go
- runner_on_interrupt.go
- step.go
- structure.go