jstream

package
v1.2.14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 10, 2023 License: BSD-2-Clause Imports: 5 Imported by: 0

README

Job Stream

Job Stream allows to create and manipulate control streams of wfl jobs.

Example


    func print(j *wfl.Job) *wfl.Job {
	    fmt.Printf("Processing job %s\n", j.JobID())
	    // you can wait for the job here and submit
	    // another task
	    return j
    })

    template := wfl.NewTemplate(drmaa2interface.JobTemplate{
		RemoteCommand: "/bin/sh",
		Args:          []string{"-c", `echo Executing task $TASK_ID`},
    }).AddIterator("tasks", wfl.NewEnvSequenceIterator("TASK_ID", 1, 1))

    config := jstream.Config{
    	Template: template,
    	Workflow: wfl.NewWorkflow(wfl.NewProcessContext()),
    	BufferSize: 16,
    }
    jstream.NewStream(config, nil).Apply(print).Synchronize().Consume()

Creates a stream of jobs based on the given configuration and a method which defines the abort criteria. If set to nil the stream is infinite.

The configuration contains a Template on which Next() is called for getting a JobTemplate which is submitted with RunT(). The configuration also requires a workflow which defines the processor of the tasks (OS, Docker, ...). Optionally a BufferSize can be specified which defines a limit of how many jobs can be executed in parallel in each step of the stream. Per default the buffer limit is 0 (due to struct initialization, not because it is a selected value) which means a new process based on template.Next() can only be executed if the consumer of the stream takes a task.

Synchronize() forwards finished (synchronized) jobs. Consume() is required to remove all jobs from the internal channel (so that the buffer does not block)

Apply() can be seen as processing stations. Per default only one task at a given time is executed by Apply(). This is independent of the communication channel limit given by BufferSize. In order to increase the parallelism of Apply() the ApplyAsyncN() function has to be used.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Break

type Break func(drmaa2interface.JobTemplate) bool

Break is a function which returns true if the job stream has to be continued and false if not. It is up to the implementer if the job template (which is going to be executed next) is evaluated or not.

func NewSequenceBreaker

func NewSequenceBreaker(seqLength int) Break

NewSequenceBreaker returns a function which stops execution of a job stream after _seqLength_ job submissions.

type Config

type Config struct {
	Workflow   *wfl.Workflow
	Template   *wfl.Template
	BufferSize int
}

type Filter

type Filter func(*wfl.Job) bool

Filter returns true if the job is ok and should not be filtered out.

type JobMap

type JobMap func(*wfl.Job) *wfl.Job

JobMap takes a job and returns a job. Input job and output job does not need to be the same.

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream defines a sequence of jobs emitted through a channel.

func NewStream

func NewStream(cfg Config, b Break) *Stream

NewStream creates a new job stream based on the given template. The job submission template for each job is created by Template.Next(), i.e. defined by the registered iterators. The Break function defines the end of the stream. If Break is nil an infinite job stream is created.

func (*Stream) Apply

func (g *Stream) Apply(apply JobMap) *Stream

Apply returns a newly created job stream. The job stream contains jobs which are the result of applying the JobMap function on the jobs of the input stream. If the JobMap returns nil no job is forwarded.

The JobMap function is applied to the next job only when the previous execution is completed.

func (*Stream) ApplyAsync

func (g *Stream) ApplyAsync(apply JobMap) *Stream

func (*Stream) ApplyAsyncN

func (g *Stream) ApplyAsyncN(apply JobMap, n int) *Stream

func (*Stream) Collect

func (g *Stream) Collect() []*wfl.Job

Collect stores all jobs of the stream in an array and returns the array. The job stream must be finit and small enough to fit in memory.

func (*Stream) CollectN

func (g *Stream) CollectN(size int) []*wfl.Job

CollectN takes the next _size_ jobs out of the job stream and returns them as an array. If the stream ends before the array is filled, the array is smaller than then given size.

func (*Stream) Consume

func (g *Stream) Consume()

func (*Stream) Error

func (g *Stream) Error() error

Error returns the error of the last operation.

func (*Stream) Filter

func (g *Stream) Filter(filter Filter) *Stream

Filter is a non-blocking call which returns a stream containing only jobs which are allowed by the filter.

func (*Stream) HasError

func (g *Stream) HasError() bool

func (*Stream) JobChannel

func (g *Stream) JobChannel() chan *wfl.Job

JobChannel returns the job channel of the job stream for direct processing.

func (*Stream) Join

func (g *Stream) Join(s *Stream)

Join consumes all jobs of two streams of the same length (like streams created by Tee()),

func (*Stream) Merge

func (g *Stream) Merge(s ...*Stream) *Stream

Merge combines the current stream with the given streams into one stream. The order in which jobs in the output stream appear is undefined. The order within each stream is preserved but the processing order between different streams is undefined. If one input stream blocks the other input streams are still processed.

func (*Stream) MultiSync

func (g *Stream) MultiSync(s ...*Stream) []*Stream

MultiSync starts _n_ coroutines which synchronizes the jobs from the _n_ streams (returns only finished jobs in the output stream).

func (*Stream) OnError

func (g *Stream) OnError(f func(e error)) *Stream

OnError executes the given function in case an error happened in the last function which throws errors.

func (*Stream) Synchronize

func (g *Stream) Synchronize() *Stream

Synchronize is a non-blocking call which starts a coroutine which loops over all jobs in the stream and waits for each job until it is finished and then returns the job in the output stream. The newly created output stream contains only finished jobs. The order of the output stream is the same as in the incoming stream.

func (*Stream) Tee

func (g *Stream) Tee() (*Stream, *Stream)

Tee creates two streams out of one. Note that both streams needs to be consumed in parallel otherwise job emission will block when the internal buffer of one stream is full.

See also: MultiSync() and Join().

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL