Documentation ¶
Overview ¶
Package exec implements compilation, evaluation, and execution of Bigslice slice operations.
Index ¶
- Constants
- Variables
- func Eval(ctx context.Context, executor Executor, roots []*Task, group *status.Group) error
- type CompileEnv
- type Executor
- type Option
- type Result
- type Session
- func (s *Session) Discard(ctx context.Context, roots []*Task)
- func (s *Session) HandleDebug(handler *http.ServeMux)
- func (s *Session) MaxLoad() float64
- func (s *Session) Must(ctx context.Context, funcv *bigslice.FuncValue, args ...interface{}) *Result
- func (s *Session) Parallelism() int
- func (s *Session) Run(ctx context.Context, funcv *bigslice.FuncValue, args ...interface{}) (*Result, error)
- func (s *Session) Shutdown()
- func (s *Session) Status() *status.Status
- type Store
- type Task
- func (t *Task) All() []*Task
- func (t *Task) Broadcast()
- func (t *Task) Err() error
- func (t *Task) Error(err error)
- func (t *Task) Errorf(format string, v ...interface{})
- func (t *Task) GraphString() string
- func (t *Task) Head() *Task
- func (t *Task) Phase() []*Task
- func (t *Task) Set(state TaskState)
- func (t *Task) State() TaskState
- func (t *Task) String() string
- func (t *Task) Subscribe(s *TaskSubscriber)
- func (t *Task) Unsubscribe(s *TaskSubscriber)
- func (t *Task) Wait(ctx context.Context) error
- func (t *Task) WaitState(ctx context.Context, state TaskState) (TaskState, error)
- func (t *Task) WriteGraph(w io.Writer)
- type TaskDep
- type TaskName
- type TaskState
- type TaskSubscriber
Constants ¶
const BigmachineStatusGroup = "bigmachine"
const DefaultMaxLoad = 0.95
DefaultMaxLoad is the default machine max load.
Variables ¶
var DoShuffleReaders = true
DoShuffleReaders determines whether reader tasks should be shuffled in order to avoid potential thundering herd issues. This should only be used in testing when deterministic ordering matters.
TODO(marius): make this a session option instead.
var ErrTaskLost = errors.New("task was lost")
ErrTaskLost indicates that a Task was in TaskLost state.
var ProbationTimeout = 30 * time.Second
ProbationTimeout is the amount of time that a machine will remain in probation without being explicitly marked healthy.
Functions ¶
func Eval ¶
Eval simultaneously evaluates a set of task graphs from the provided set of roots. Eval uses the provided executor to dispatch tasks when their dependencies have been satisfied. Eval returns on evaluation error or else when all roots are fully evaluated.
TODO(marius): we can often stream across shuffle boundaries. This would complicate scheduling, but may be worth doing.
Types ¶
type CompileEnv ¶
type CompileEnv struct { // Writable is true if this environment is writable. It is only // exported so that it can be gob-{en,dec}oded. Writable bool // Cached indicates whether a task operation's results can be read from // cache. An "operation" is one of the pipelined elements that a task // may perform. It is only exported so that it can be gob-{en,dec}oded. Cached map[taskOp]bool }
CompileEnv is the environment for compilation. This environment should capture all external state that can affect compilation of an invocation. It is shared across compilations of the same invocation (e.g. on worker nodes) to guarantee consistent compilation results. This is a requirement of bigslice's computation model, as we assume that all nodes share the same view of the task graph. It must be gob-encodable for transport to workers.
func (*CompileEnv) Freeze ¶
func (e *CompileEnv) Freeze()
Freeze freezes the state, marking e no longer writable.
func (CompileEnv) IsCached ¶
func (e CompileEnv) IsCached(n TaskName, opIdx int) bool
IsCached returns whether the (task, operation) given by (n, opIdx) is cached.
func (CompileEnv) IsWritable ¶
func (e CompileEnv) IsWritable() bool
IsWritable returns whether this environment is writable.
func (CompileEnv) MarkCached ¶
func (e CompileEnv) MarkCached(n TaskName, opIdx int)
MarkCached marks the (task, operation) given by (n, opIdx) as cached.
type Executor ¶
type Executor interface { // Name returns a human-friendly name for this executor. Name() string // Start starts the executor. It is called before evaluation has started // and after all funcs have been registered. Start need not return: // for example, the Bigmachine implementation of Executor uses // Start as an entry point for worker processes. Start(*Session) (shutdown func()) // Run runs a task. The executor sets the state of the task as it // progresses. The task should enter in state TaskWaiting; by the // time Run returns the task state is >= TaskOk. Run(*Task) // Reader returns a locally accessible ReadCloser for the requested task. Reader(*Task, int) sliceio.ReadCloser // Discard discards the storage resources held by a computed task. // Discarding is best-effort, so no error is returned. Discard(context.Context, *Task) // Eventer returns the eventer used to log events relevant to this executor. Eventer() eventlog.Eventer // HandleDebug adds executor-specific debug handlers to the provided // http.ServeMux. This is used to serve diagnostic information relating // to the executor. HandleDebug(handler *http.ServeMux) }
Executor defines an interface used to provide implementations of task runners. An Executor is responsible for running single tasks, partitioning their outputs, and instantiating readers to retrieve the output of any given task.
type Option ¶
type Option func(s *Session)
An Option represents a session configuration parameter value.
Local configures a session with the local in-binary executor.
MachineCombiners is a session option that turns on machine-local combine buffers. If turned on, each combiner task that belongs to the same shard-set and runs on the same machine combines values into a single, machine-local combine buffer. This can be a big performance optimization for tasks that have low key cardinality, or a key-set with very hot keys. However, due to the way it is implemented, error recovery is currently not implemented for such tasks.
func Bigmachine ¶
func Bigmachine(system bigmachine.System, params ...bigmachine.Param) Option
Bigmachine configures a session using the bigmachine executor configured with the provided system. If any params are provided, they are applied to each bigmachine allocated by Bigslice.
func Eventer ¶
Eventer configures the session with an Eventer that will be used to log session events (for analytics).
func Parallelism ¶
Parallelism configures the session with the provided target parallelism.
type Result ¶
A Result is the output of a Slice evaluation. It is the only type implementing bigslice.Slice that is a legal argument to a bigslice.Func.
func (*Result) Discard ¶
Discard discards the storage resources held by the subgraph of tasks used to compute r. This should be used to discard results that are no longer needed. If the results are needed by another computation, they will be recomputed. Discarding is best-effort, so no error is returned.
func (*Result) Scanner ¶
Scanner returns a scanner that scans the output. If the output contains multiple shards, they are scanned sequentially. You must call Close on the returned scanner when you are done scanning. You may get and scan multiple scanners concurrently from r.
type Session ¶
Session represents a Bigslice compute session. A session shares a binary and executor, and is valid for the run of the binary. A session can run multiple bigslice functions, allowing for iterative computing.
A session is started by the Start method. Some executors use may launch multiple copies of the binary: these additional binaries are called workers and Start in these Start does not return.
All functions must be created before Start is called, and must be created in a deterministic order. This is provided by default when functions are created as part of package initialization. Registering toplevel functions this way is both safe and encouraged:
var Computation = bigslice.Func(func(..) (slice Slice) { // Build up the computation, parameterized by the function. slice = ... slice = ... return slice }) // Possibly in another package: func main() { sess := exec.Start() if err := sess.Run(ctx, Computation, args...); err != nil { log.Fatal(err) } // Success! }
func Start ¶
Start creates and starts a new bigslice session, configuring it according to the provided options. Only one session may be created in a single binary invocation. The returned session remains valid for the lifetime of the binary. If no executor is configured, the session is configured to use the bigmachine executor.
func (*Session) Discard ¶
Discard discards the storage resources held by the subgraph given by roots. This should be used to discard tasks whose results are no longer needed. If the task results are needed by another computation, they will be recomputed. Discarding is best-effort, so no error is returned.
func (*Session) HandleDebug ¶
func (*Session) Parallelism ¶
Parallelism returns the desired amount of evaluation parallelism.
func (*Session) Run ¶
func (s *Session) Run(ctx context.Context, funcv *bigslice.FuncValue, args ...interface{}) (*Result, error)
Run evaluates the slice returned by the bigslice func funcv applied to the provided arguments. Tasks are run by the session's executor. Run returns when the computation has completed, or else on error. It is safe to make concurrent calls to Run; the underlying computation will be performed in parallel.
type Store ¶
type Store interface { // Create returns a writer that populates data for the given // task name and partition. The data is not be available // to Open until the returned closer has been closed. // // TODO(marius): should we allow writes to be discarded as well? Create(ctx context.Context, task TaskName, partition int) (writeCommitter, error) // Open returns a ReadCloser from which the stored contents of the named task // and partition can be read. If the task and partition are not stored, an // error with kind errors.NotExist is returned. The offset specifies the byte // position from which to read. Open(ctx context.Context, task TaskName, partition int, offset int64) (io.ReadCloser, error) // Stat returns metadata for the stored slice. Stat(ctx context.Context, task TaskName, partition int) (sliceInfo, error) // Discard discards the data stored for task and partition. Subsequent calls // to Open for the given (task, partition) will fail. ReadClosers that // already exist may start returning errors, depending on the // implementation. If no such (task, partition) is stored, returns a non-nil // error. Discard(ctx context.Context, task TaskName, partition int) error }
Store is an abstraction that stores partitioned data as produced by a task.
type Task ¶
type Task struct { slicetype.Type // Invocation is the task's invocation, i.e. the Func invocation // from which this task was compiled. Invocation execInvocation // Name is the name of the task. Tasks are named uniquely inside each // Bigslice session. Name TaskName // Do starts computation for this task, returning a reader that // computes batches of values on demand. Do is invoked with readers // for the task's dependencies. Do func([]sliceio.Reader) sliceio.Reader // Deps are the task's dependencies. See TaskDep for details. Deps []TaskDep // Partitioner is used to partition the task's output. It will only // be called when NumPartition > 1. Partitioner bigslice.Partitioner // NumPartition is the number of partitions that are output by this task. // If NumPartition > 1, then the task must also define a partitioner. NumPartition int // Combiner specifies an (optional) combiner to use for this task's output. // If a Combiner is not Nil, CombineKey names the combine buffer used: // each combine buffer contains combiner outputs from multiple tasks. // If CombineKey is not set, then per-task buffers are used instead. Combiner slicefunc.Func CombineKey string // Pragma comprises the pragmas of all slice operations that // are pipelined into this task. bigslice.Pragma // Slices is the set of slices to which this task directly contributes. Slices []bigslice.Slice // Group stores an ordered list of peer tasks. If Group is nonempty, // it is guaranteed that these sets of tasks constitute a shuffle // dependency, and share a set of shuffle dependencies. This allows // the evaluator to perform optimizations while tracking such // dependencies. Group []*Task // Scopes is the metrics scope for this task. It is populated with the // metrics produced during execution of this task. Scope metrics.Scope sync.Mutex // Status is a status object to which task status is reported. Status *status.Task // contains filtered or unexported fields }
A Task represents a concrete computational task. Tasks form graphs through dependencies; task graphs are compiled from slices.
Tasks also maintain executor state, and are used to coordinate execution between concurrent evaluators and a single executor (which may be evaluating many tasks concurrently). Tasks thus embed a mutex for coordination and provide a context-aware conditional variable to coordinate runtime state changes.
func (*Task) Broadcast ¶
func (t *Task) Broadcast()
Broadcast notifies waiters of a state change. Broadcast must only be called while the task's lock is held.
func (*Task) Err ¶
Err returns an error if the task's state is >= TaskErr. When the state is > TaskErr, Err returns an error describing the task's failed state, otherwise, t.err is returned.
func (*Task) Error ¶
Error sets the task's state to TaskErr and its error to the provided error. Waiters are notified.
func (*Task) Errorf ¶
Errorf formats an error message using fmt.Errorf, sets the task's state to TaskErr and its err to the resulting error message.
func (*Task) GraphString ¶
GraphString returns a schematic string of the task graph rooted at t.
func (*Task) Head ¶
Head returns the head task of this task's phase. If the task does not belong to a phase, Head returns the task t.
func (*Task) Subscribe ¶
func (t *Task) Subscribe(s *TaskSubscriber)
Subscribe subscribes s to be notified of any changes to t's state. If s has already been subscribed, no-op.
func (*Task) Unsubscribe ¶
func (t *Task) Unsubscribe(s *TaskSubscriber)
Unsubscribe unsubscribes previously subscribe s. s will on longer receive task state change notifications. No-op if s was never subscribed.
func (*Task) Wait ¶
Wait returns after the next call to Broadcast, or if the context is complete. The task's lock must be held when calling Wait.
func (*Task) WaitState ¶
WaitState returns when the task's state is at least the provided state, or else when the context is done.
func (*Task) WriteGraph ¶
WriteGraph writes a schematic string of the task graph rooted at t into w.
type TaskDep ¶
type TaskDep struct { // Head holds the underlying task that represents this dependency. // For shuffle dependencies, that task is the head task of the // phase, and the evaluator must expand the phase. Head *Task Partition int // Expand indicates that the task's dependencies for a given // partition should not be merged, but rather passed individually to // the task implementation. Expand bool // CombineKey is an optional label that names the combination key to // be used by this dependency. It is used to name a single combiner // buffer from which is read a number of combined tasks. // // CombineKeys must be provided to tasks that contain combiners. CombineKey string }
A TaskDep describes a single dependency for a task. A dependency comprises one or more tasks and the partition number of the task set that must be read at run time.
type TaskName ¶
type TaskName struct { // InvIndex is the index of the invocation for which the task was compiled. InvIndex uint64 // Op is a unique string describing the operation that is provided // by the task. Op string // Shard and NumShard describe the shard processed by this task // and the total number of shards to be processed. Shard, NumShard int }
A TaskName uniquely names a task by its constituent components. Tasks with 0 shards are taken to be combiner tasks: they are machine-local buffers of combiner outputs for some (non-overlapping) subset of shards for a task.
func (TaskName) IsCombiner ¶
IsCombiner returns whether the named task is a combiner task.
type TaskState ¶
type TaskState int
TaskState represents the runtime state of a Task. TaskState values are defined so that their magnitudes correspond with task progression.
const ( // TaskInit is the initial state of a task. Tasks in state TaskInit // have usually not yet been seen by an executor. TaskInit TaskState = iota // TaskWaiting indicates that a task has been scheduled for // execution (it is runnable) but has not yet been allocated // resources by the executor. TaskWaiting // TaskRunning is the state of a task that's currently being run or // discarded. After a task is in state TaskRunning, it can only enter a // larger-valued state. TaskRunning // TaskOk indicates that a task has successfully completed; // the task's results are available to dependent tasks. // // All TaskState values greater than TaskOk indicate task // errors. TaskOk // TaskErr indicates that the task experienced a failure while // running. TaskErr // TaskLost indicates that the task was lost, usually because // the machine to which the task was assigned failed. TaskLost )
type TaskSubscriber ¶
TaskSubscriber is subscribed to a Task using Subscribe. It is then notified whenever the Task state changes. This is useful for efficiently observing the state changes of many tasks.
func NewTaskSubscriber ¶
func NewTaskSubscriber() *TaskSubscriber
NewTaskSubscriber returns a new TaskSubscriber. It needs to be subscribed to a Task with Subscribe for it to be notified of task state changes.
func (*TaskSubscriber) Notify ¶
func (s *TaskSubscriber) Notify(task *Task)
Notify notifies s of a task whose state has changed.
func (*TaskSubscriber) Ready ¶
func (s *TaskSubscriber) Ready() <-chan struct{}
Ready returns a channel that is closed if a subsequent call to Tasks will return a non-nil slice.
func (*TaskSubscriber) Tasks ¶
func (s *TaskSubscriber) Tasks() []*Task
Tasks returns the tasks whose state has changed since the last call to Tasks.