Documentation
¶
Overview ¶
Package flow contains data structure for computation. Mostly Dataset operations such as Map/Reduce/Join/Sort etc.
Package flow contains data structure for computation. Mostly Dataset operations such as Map/Reduce/Join/Sort etc.
Index ¶
- func FromDatasetShardToTask(shard *DatasetShard, task *Task)
- func FromDatasetToStep(input *Dataset, step *Step)
- func FromStepToDataset(step *Step, output *Dataset)
- func FromTaskToDatasetShard(task *Task, shard *DatasetShard)
- type DasetsetHint
- type DasetsetMetadata
- type DasetsetShardMetadata
- type Dataset
- func (d *Dataset) Broadcast(shardCount int) *Dataset
- func (d *Dataset) CoGroup(other *Dataset, sortOptions ...*SortOption) *Dataset
- func (this *Dataset) CoGroupPartitionedSorted(that *Dataset, indexes []int) (ret *Dataset)
- func (d *Dataset) DoJoin(other *Dataset, leftOuter, rightOuter bool, sortOptions ...*SortOption) *Dataset
- func (d *Dataset) Filter(code string) *Dataset
- func (d *Dataset) FlatMap(code string) *Dataset
- func (d *Dataset) ForEach(code string) *Dataset
- func (d *Dataset) Fprintf(writer io.Writer, format string) *Dataset
- func (d *Dataset) GetIsOnDiskIO() bool
- func (d *Dataset) GetPartitionSize() int64
- func (d *Dataset) GetShards() []*DatasetShard
- func (d *Dataset) GetTotalSize() int64
- func (d *Dataset) GroupBy(sortOptions ...*SortOption) *Dataset
- func (bigger *Dataset) HashJoin(smaller *Dataset, sortOptions ...*SortOption) *Dataset
- func (d *Dataset) Hint(options ...DasetsetHint) *Dataset
- func (d *Dataset) Init(scriptPart string) *Dataset
- func (d *Dataset) Join(other *Dataset, sortOptions ...*SortOption) *Dataset
- func (this *Dataset) JoinPartitionedSorted(that *Dataset, sortOption *SortOption, isLeftOuterJoin, isRightOuterJoin bool) *Dataset
- func (d *Dataset) LeftOuterJoin(other *Dataset, sortOptions ...*SortOption) *Dataset
- func (d *Dataset) LocalGroupBy(sortOptions ...*SortOption) *Dataset
- func (this *Dataset) LocalHashAndJoinWith(that *Dataset, sortOptions ...*SortOption) *Dataset
- func (d *Dataset) LocalLimit(n int) *Dataset
- func (d *Dataset) LocalReduce(code string) *Dataset
- func (d *Dataset) LocalReduceBy(code string, sortOptions ...*SortOption) *Dataset
- func (d *Dataset) LocalSort(sortOptions ...*SortOption) *Dataset
- func (d *Dataset) LocalTop(n int, sortOptions ...*SortOption) *Dataset
- func (d *Dataset) Map(code string) *Dataset
- func (d *Dataset) MergeSortedTo(partitionCount int, sortOptions ...*SortOption) (ret *Dataset)
- func (d *Dataset) OnDisk(fn func(*Dataset) *Dataset) *Dataset
- func (d *Dataset) Output(f func(io.Reader) error) *Dataset
- func (d *Dataset) Partition(shard int, sortOptions ...*SortOption) *Dataset
- func (d *Dataset) Pipe(code string) *Dataset
- func (d *Dataset) PipeAsArgs(code string) *Dataset
- func (d *Dataset) PipeOut(writer io.Writer) *Dataset
- func (d *Dataset) Reduce(code string) (ret *Dataset)
- func (d *Dataset) ReduceBy(code string, sortOptions ...*SortOption) (ret *Dataset)
- func (d *Dataset) RightOuterJoin(other *Dataset, sortOptions ...*SortOption) *Dataset
- func (d *Dataset) RoundRobin(shard int) *Dataset
- func (d *Dataset) Run(option ...FlowOption)
- func (d *Dataset) SaveFirstRowTo(decodedObjects ...interface{}) *Dataset
- func (d *Dataset) Script(scriptType string) *Dataset
- func (d *Dataset) Select(sortOptions ...*SortOption) *Dataset
- func (d *Dataset) SetupShard(n int)
- func (d *Dataset) Sort(sortOptions ...*SortOption) *Dataset
- func (d *Dataset) Top(k int, sortOptions ...*SortOption) *Dataset
- func (d *Dataset) TreeMergeSortedTo(partitionCount int, factor int, sortOptions ...*SortOption) (ret *Dataset)
- type DatasetShard
- type DatasetShardStatus
- type FlowContext
- func (f *FlowContext) AddAllToOneStep(input *Dataset, output *Dataset) (step *Step)
- func (f *FlowContext) AddLinkedNToOneStep(input *Dataset, m int, output *Dataset) (step *Step)
- func (f *FlowContext) AddOneToAllStep(input *Dataset, output *Dataset) (step *Step)
- func (f *FlowContext) AddOneToEveryNStep(input *Dataset, n int, output *Dataset) (step *Step)
- func (f *FlowContext) AddOneToOneStep(input *Dataset, output *Dataset) (step *Step)
- func (fc *FlowContext) Bytes(slice [][]byte) (ret *Dataset)
- func (fc *FlowContext) Channel(ch chan interface{}) (ret *Dataset)
- func (fc *FlowContext) CreateScript() script.Script
- func (d *FlowContext) Hint(options ...FlowContextOption)
- func (fc *FlowContext) Init(scriptPart string) *FlowContext
- func (fc *FlowContext) Ints(numbers []int) (ret *Dataset)
- func (fc *FlowContext) Listen(network, address string) (ret *Dataset)
- func (f *FlowContext) MergeDatasets1ShardTo1Step(inputs []*Dataset, output *Dataset) (step *Step)
- func (fc *FlowContext) NewStep() (step *Step)
- func (fc *FlowContext) OnInterrupt()
- func (fc *FlowContext) Query(connectionId string, query adapter.AdapterQuery) (ret *Dataset)
- func (fc *FlowContext) ReadFile(source adapter.AdapterFileSource) (ret *Dataset)
- func (fc *FlowContext) ReadTsv(reader io.Reader) (ret *Dataset)
- func (fc *FlowContext) Run(options ...FlowOption)
- func (fc *FlowContext) Script(scriptType string) *FlowContext
- func (fc *FlowContext) Source(f func(io.Writer) error) (ret *Dataset)
- func (fc *FlowContext) Strings(lines []string) (ret *Dataset)
- func (fc *FlowContext) TextFile(fname string) (ret *Dataset)
- type FlowContextConfig
- type FlowContextOption
- type FlowOption
- type FlowRunner
- type LocalDriver
- func (r *LocalDriver) RunDataset(wg *sync.WaitGroup, d *Dataset)
- func (r *LocalDriver) RunDatasetShard(wg *sync.WaitGroup, shard *DatasetShard)
- func (r *LocalDriver) RunFlowContext(fc *FlowContext)
- func (r *LocalDriver) RunFlowContextAsync(wg *sync.WaitGroup, fc *FlowContext)
- func (r *LocalDriver) RunStep(wg *sync.WaitGroup, step *Step)
- func (r *LocalDriver) RunTask(wg *sync.WaitGroup, task *Task)
- type ModeIO
- type NetworkType
- type RunLocked
- type SortOption
- type Step
- type StepMetadata
- type Task
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func FromDatasetShardToTask ¶
func FromDatasetShardToTask(shard *DatasetShard, task *Task)
func FromDatasetToStep ¶
func FromStepToDataset ¶
func FromTaskToDatasetShard ¶
func FromTaskToDatasetShard(task *Task, shard *DatasetShard)
Types ¶
type DasetsetHint ¶
type DasetsetHint func(d *Dataset)
func PartitionSize ¶
func PartitionSize(n int64) DasetsetHint
PartitionSize hints the partition size in MB. This is usually used when sorting is needed.
func TotalSize ¶
func TotalSize(n int64) DasetsetHint
TotalSize hints the total size in MB for all the partitions. This is usually used when sorting is needed.
type DasetsetMetadata ¶
type DasetsetShardMetadata ¶
type Dataset ¶
type Dataset struct { FlowContext *FlowContext Id int Shards []*DatasetShard Step *Step ReadingSteps []*Step IsPartitionedBy []int IsLocalSorted []instruction.OrderBy Meta *DasetsetMetadata RunLocked }
func NewDataset ¶
func NewDataset(context *FlowContext) *Dataset
func (*Dataset) CoGroup ¶
func (d *Dataset) CoGroup(other *Dataset, sortOptions ...*SortOption) *Dataset
CoGroup joins two datasets by the key, Each result row becomes this format:
(key, []left_rows, []right_rows)
func (*Dataset) CoGroupPartitionedSorted ¶
CoGroupPartitionedSorted joins 2 datasets that are sharded by the same key and already locally sorted within each shard.
func (*Dataset) DoJoin ¶
func (d *Dataset) DoJoin(other *Dataset, leftOuter, rightOuter bool, sortOptions ...*SortOption) *Dataset
func (*Dataset) Filter ¶
Filter conditionally filter some rows into the next dataset. The code should be a function just returning a boolean result.
func (*Dataset) GetIsOnDiskIO ¶
func (*Dataset) GetPartitionSize ¶
func (*Dataset) GetShards ¶
func (d *Dataset) GetShards() []*DatasetShard
func (*Dataset) GetTotalSize ¶
func (*Dataset) GroupBy ¶
func (d *Dataset) GroupBy(sortOptions ...*SortOption) *Dataset
GroupBy e.g. GroupBy(Field(1,2,3)) group data by field 1,2,3
func (*Dataset) HashJoin ¶
func (bigger *Dataset) HashJoin(smaller *Dataset, sortOptions ...*SortOption) *Dataset
HashJoin joins two datasets by putting the smaller dataset in memory on all executors and streams through the bigger dataset.
func (*Dataset) Hint ¶
func (d *Dataset) Hint(options ...DasetsetHint) *Dataset
Hint adds options for previous dataset.
func (*Dataset) Join ¶
func (d *Dataset) Join(other *Dataset, sortOptions ...*SortOption) *Dataset
Join joins two datasets by the key.
func (*Dataset) JoinPartitionedSorted ¶
func (this *Dataset) JoinPartitionedSorted(that *Dataset, sortOption *SortOption, isLeftOuterJoin, isRightOuterJoin bool) *Dataset
JoinPartitionedSorted Join multiple datasets that are sharded by the same key, and locally sorted within the shard
func (*Dataset) LeftOuterJoin ¶
func (d *Dataset) LeftOuterJoin(other *Dataset, sortOptions ...*SortOption) *Dataset
func (*Dataset) LocalGroupBy ¶
func (d *Dataset) LocalGroupBy(sortOptions ...*SortOption) *Dataset
func (*Dataset) LocalHashAndJoinWith ¶
func (this *Dataset) LocalHashAndJoinWith(that *Dataset, sortOptions ...*SortOption) *Dataset
func (*Dataset) LocalLimit ¶
LocalLimit take the local first n rows and skip all other rows.
func (*Dataset) LocalReduce ¶
func (*Dataset) LocalReduceBy ¶
func (d *Dataset) LocalReduceBy(code string, sortOptions ...*SortOption) *Dataset
func (*Dataset) LocalSort ¶
func (d *Dataset) LocalSort(sortOptions ...*SortOption) *Dataset
func (*Dataset) Map ¶
Map operates on each row, and the returned results are passed to next dataset.
func (*Dataset) MergeSortedTo ¶
func (d *Dataset) MergeSortedTo(partitionCount int, sortOptions ...*SortOption) (ret *Dataset)
func (*Dataset) OnDisk ¶
OnDisk ensure the intermediate dataset are persisted to disk. This allows executors to run not in parallel if executors are limited.
func (*Dataset) Partition ¶
func (d *Dataset) Partition(shard int, sortOptions ...*SortOption) *Dataset
hash data or by data key, return a new dataset This is divided into 2 steps: 1. Each record is sharded to a local shard 2. The destination shard will collect its child shards and merge into one
func (*Dataset) PipeAsArgs ¶
PipeAsArgs is similar to xargs, but simpler
func (*Dataset) PipeOut ¶
PipeOut writes to writer. If previous step is a Pipe() or PipeAsArgs(), the output is written as is. Otherwise, each row of output is written in tab-separated lines.
func (*Dataset) ReduceBy ¶
func (d *Dataset) ReduceBy(code string, sortOptions ...*SortOption) (ret *Dataset)
func (*Dataset) RightOuterJoin ¶
func (d *Dataset) RightOuterJoin(other *Dataset, sortOptions ...*SortOption) *Dataset
func (*Dataset) RoundRobin ¶
func (*Dataset) Run ¶
func (d *Dataset) Run(option ...FlowOption)
Run starts the whole flow. This is a convenient method, same as *FlowContext.Run()
func (*Dataset) SaveFirstRowTo ¶
SaveFirstRowTo saves the first row's values into the operands.
func (*Dataset) Select ¶
func (d *Dataset) Select(sortOptions ...*SortOption) *Dataset
Select selects multiple fields into the next dataset. The index starts from 1.
func (*Dataset) SetupShard ¶
func (*Dataset) Sort ¶
func (d *Dataset) Sort(sortOptions ...*SortOption) *Dataset
Sort sort on specific fields, default to the first field. Required Memory: about same size as each partition. example usage: Sort(Field(1,2)) means sorting on field 1 and 2.
func (*Dataset) Top ¶
func (d *Dataset) Top(k int, sortOptions ...*SortOption) *Dataset
Top streams through total n items, picking reverse ordered k items with O(n*log(k)) complexity. Required Memory: about same size as n items in memory
func (*Dataset) TreeMergeSortedTo ¶
func (d *Dataset) TreeMergeSortedTo(partitionCount int, factor int, sortOptions ...*SortOption) (ret *Dataset)
type DatasetShard ¶
type DatasetShard struct { Id int Dataset *Dataset ReadingTasks []*Task IncomingChan *util.Piper OutgoingChans []*util.Piper Counter int64 ReadyTime time.Time CloseTime time.Time Meta *DasetsetShardMetadata }
func (*DatasetShard) Closed ¶
func (s *DatasetShard) Closed() bool
func (*DatasetShard) Name ¶
func (s *DatasetShard) Name() string
func (*DatasetShard) TimeTaken ¶
func (s *DatasetShard) TimeTaken() time.Duration
type DatasetShardStatus ¶
type DatasetShardStatus int
const ( Untouched DatasetShardStatus = iota LocationAssigned InProgress InRetry Failed Successful )
type FlowContext ¶
type FlowContext struct { PrevScriptType string PrevScriptPart string Scripts map[string]func() script.Script Steps []*Step Datasets []*Dataset HashCode uint32 }
func New ¶
func New() (fc *FlowContext)
func (*FlowContext) AddAllToOneStep ¶
func (f *FlowContext) AddAllToOneStep(input *Dataset, output *Dataset) (step *Step)
the task should run on the destination dataset shard
func (*FlowContext) AddLinkedNToOneStep ¶
func (f *FlowContext) AddLinkedNToOneStep(input *Dataset, m int, output *Dataset) (step *Step)
func (*FlowContext) AddOneToAllStep ¶
func (f *FlowContext) AddOneToAllStep(input *Dataset, output *Dataset) (step *Step)
the task should run on the source dataset shard input is nil for initial source dataset
func (*FlowContext) AddOneToEveryNStep ¶
func (f *FlowContext) AddOneToEveryNStep(input *Dataset, n int, output *Dataset) (step *Step)
func (*FlowContext) AddOneToOneStep ¶
func (f *FlowContext) AddOneToOneStep(input *Dataset, output *Dataset) (step *Step)
the tasks should run on the source dataset shard
func (*FlowContext) Bytes ¶
func (fc *FlowContext) Bytes(slice [][]byte) (ret *Dataset)
Bytes begins a flow with an [][]byte
func (*FlowContext) Channel ¶
func (fc *FlowContext) Channel(ch chan interface{}) (ret *Dataset)
Channel accepts a channel to feed into the flow.
func (*FlowContext) CreateScript ¶
func (fc *FlowContext) CreateScript() script.Script
func (*FlowContext) Hint ¶
func (d *FlowContext) Hint(options ...FlowContextOption)
func (*FlowContext) Init ¶
func (fc *FlowContext) Init(scriptPart string) *FlowContext
func (*FlowContext) Ints ¶
func (fc *FlowContext) Ints(numbers []int) (ret *Dataset)
Ints begins a flow with an []int
func (*FlowContext) Listen ¶
func (fc *FlowContext) Listen(network, address string) (ret *Dataset)
Listen receives textual inputs via a socket. Multiple parameters are separated via tab.
func (*FlowContext) MergeDatasets1ShardTo1Step ¶
func (f *FlowContext) MergeDatasets1ShardTo1Step(inputs []*Dataset, output *Dataset) (step *Step)
All dataset should have the same number of shards.
func (*FlowContext) NewStep ¶
func (fc *FlowContext) NewStep() (step *Step)
func (*FlowContext) OnInterrupt ¶
func (fc *FlowContext) OnInterrupt()
func (*FlowContext) Query ¶
func (fc *FlowContext) Query(connectionId string, query adapter.AdapterQuery) (ret *Dataset)
Query use the connection information specified via connectionId and then run the query to fetch the data as input
func (*FlowContext) ReadFile ¶
func (fc *FlowContext) ReadFile(source adapter.AdapterFileSource) (ret *Dataset)
ReadFile read files according to fileType The file can be on local, hdfs, s3, etc.
func (*FlowContext) ReadTsv ¶
func (fc *FlowContext) ReadTsv(reader io.Reader) (ret *Dataset)
ReadTsv read tab-separated lines from the reader
func (*FlowContext) Run ¶
func (fc *FlowContext) Run(options ...FlowOption)
func (*FlowContext) Script ¶
func (fc *FlowContext) Script(scriptType string) *FlowContext
func (*FlowContext) Source ¶
func (fc *FlowContext) Source(f func(io.Writer) error) (ret *Dataset)
Source produces data feeding into the flow. Function f writes to this writer. The written bytes should be MsgPack encoded []byte. Use util.EncodeRow(...) to encode the data before sending to this channel
func (*FlowContext) Strings ¶
func (fc *FlowContext) Strings(lines []string) (ret *Dataset)
Strings begins a flow with an []string
func (*FlowContext) TextFile ¶
func (fc *FlowContext) TextFile(fname string) (ret *Dataset)
TextFile reads the file content as lines and feed into the flow. The file can be a local file or hdfs://namenode:port/path/to/hdfs/file
type FlowContextConfig ¶
type FlowContextConfig struct {
OnDisk bool
}
type FlowContextOption ¶
type FlowContextOption func(c *FlowContextConfig)
type FlowOption ¶
type FlowOption interface {
GetFlowRunner() FlowRunner
}
type FlowRunner ¶
type FlowRunner interface {
RunFlowContext(*FlowContext)
}
type LocalDriver ¶
type LocalDriver struct{}
var (
Local LocalDriver
)
func (*LocalDriver) RunDataset ¶
func (r *LocalDriver) RunDataset(wg *sync.WaitGroup, d *Dataset)
func (*LocalDriver) RunDatasetShard ¶
func (r *LocalDriver) RunDatasetShard(wg *sync.WaitGroup, shard *DatasetShard)
func (*LocalDriver) RunFlowContext ¶
func (r *LocalDriver) RunFlowContext(fc *FlowContext)
func (*LocalDriver) RunFlowContextAsync ¶
func (r *LocalDriver) RunFlowContextAsync(wg *sync.WaitGroup, fc *FlowContext)
type NetworkType ¶
type NetworkType int
const ( OneShardToOneShard NetworkType = iota OneShardToAllShard AllShardToOneShard OneShardToEveryNShard LinkedNShardToOneShard MergeTwoShardToOneShard )
type SortOption ¶
type SortOption struct {
// contains filtered or unexported fields
}
func Field ¶
func Field(indexes ...int) *SortOption
By groups the indexes, usually start from 1, into a []int
func OrderBy ¶
func OrderBy(index int, ascending bool) *SortOption
func (*SortOption) By ¶
func (o *SortOption) By(index int, ascending bool) *SortOption
OrderBy chains a list of sorting order by
type Step ¶
type Step struct { Id int FlowContext *FlowContext InputDatasets []*Dataset OutputDataset *Dataset Function func([]io.Reader, []io.Writer, *instruction.Stats) error Instruction instruction.Instruction Tasks []*Task Name string NetworkType NetworkType IsOnDriverSide bool IsPipe bool Script script.Script Command *script.Command // used in Pipe() Meta *StepMetadata Params map[string]interface{} RunLocked }
func (*Step) RunFunction ¶
func (*Step) SetInstruction ¶
func (step *Step) SetInstruction(ins instruction.Instruction)
type StepMetadata ¶
type Task ¶
type Task struct { Id int Step *Step InputShards []*DatasetShard InputChans []*util.Piper // task specific input chans. InputShard may have multiple reading tasks OutputShards []*DatasetShard Stats *instruction.Stats }
Source Files
¶
- context.go
- context_hint.go
- context_script.go
- dataset.go
- dataset_cogroup.go
- dataset_group.go
- dataset_hint.go
- dataset_join.go
- dataset_join_hash.go
- dataset_map.go
- dataset_output.go
- dataset_partition.go
- dataset_pipe.go
- dataset_reduce.go
- dataset_sort.go
- dataset_sort_option.go
- dataset_source.go
- dataset_source_query.go
- runner.go
- runner_on_interrupt.go
- step.go
- structure.go