Documentation ¶
Index ¶
- type Break
- type Config
- type Filter
- type JobMap
- type Stream
- func (g *Stream) Apply(apply JobMap) *Stream
- func (g *Stream) ApplyAsync(apply JobMap) *Stream
- func (g *Stream) ApplyAsyncN(apply JobMap, n int) *Stream
- func (g *Stream) Collect() []*wfl.Job
- func (g *Stream) CollectN(size int) []*wfl.Job
- func (g *Stream) Consume()
- func (g *Stream) Error() error
- func (g *Stream) Filter(filter Filter) *Stream
- func (g *Stream) HasError() bool
- func (g *Stream) JobChannel() chan *wfl.Job
- func (g *Stream) Join(s *Stream)
- func (g *Stream) Merge(s ...*Stream) *Stream
- func (g *Stream) MultiSync(s ...*Stream) []*Stream
- func (g *Stream) OnError(f func(e error)) *Stream
- func (g *Stream) Synchronize() *Stream
- func (g *Stream) Tee() (*Stream, *Stream)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Break ¶
type Break func(drmaa2interface.JobTemplate) bool
Break is a function which returns true if the job stream has to be continued and false if not. It is up to the implementer if the job template (which is going to be executed next) is evaluated or not.
func NewSequenceBreaker ¶
NewSequenceBreaker returns a function which stops execution of a job stream after _seqLength_ job submissions.
type JobMap ¶
JobMap takes a job and returns a job. Input job and output job does not need to be the same.
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
Stream defines a sequence of jobs emitted through a channel.
func NewStream ¶
NewStream creates a new job stream based on the given template. The job submission template for each job is created by Template.Next(), i.e. defined by the registered iterators. The Break function defines the end of the stream. If Break is nil an infinite job stream is created.
func (*Stream) Apply ¶
Apply returns a newly created job stream. The job stream contains jobs which are the result of applying the JobMap function on the jobs of the input stream. If the JobMap returns nil no job is forwarded.
The JobMap function is applied to the next job only when the previous execution is completed.
func (*Stream) ApplyAsync ¶
func (*Stream) Collect ¶
Collect stores all jobs of the stream in an array and returns the array. The job stream must be finit and small enough to fit in memory.
func (*Stream) CollectN ¶
CollectN takes the next _size_ jobs out of the job stream and returns them as an array. If the stream ends before the array is filled, the array is smaller than then given size.
func (*Stream) Filter ¶
Filter is a non-blocking call which returns a stream containing only jobs which are allowed by the filter.
func (*Stream) JobChannel ¶
JobChannel returns the job channel of the job stream for direct processing.
func (*Stream) Join ¶
Join consumes all jobs of two streams of the same length (like streams created by Tee()),
func (*Stream) Merge ¶
Merge combines the current stream with the given streams into one stream. The order in which jobs in the output stream appear is undefined. The order within each stream is preserved but the processing order between different streams is undefined. If one input stream blocks the other input streams are still processed.
func (*Stream) MultiSync ¶
MultiSync starts _n_ coroutines which synchronizes the jobs from the _n_ streams (returns only finished jobs in the output stream).
func (*Stream) OnError ¶
OnError executes the given function in case an error happened in the last function which throws errors.
func (*Stream) Synchronize ¶
Synchronize is a non-blocking call which starts a coroutine which loops over all jobs in the stream and waits for each job until it is finished and then returns the job in the output stream. The newly created output stream contains only finished jobs. The order of the output stream is the same as in the incoming stream.