Documentation
¶
Overview ¶
Package executor contains basic implementation of an executor framework. The client facing interface is provided by the ExecutionService which creates ExecutorPool and Dispatcher. Task interface is defined so client would implement these methods on client specific Type to undertake specific work. When a task is submitted to the ExecutionService it submits it to the Dispatcher which assigns a channel on which task execution result / response is received. Along with attaching a channel to the task, it creates a separate routine to listen to response arrival. If the submitted task is blocking the calling routine is actually waiting for the response so Dispatcher returns the response upon receiving as well as undertakes the house keeping work of recycling the channel for another task. If the submitted task is asynchronous, caller is returned upon task submission even though there is a routine waiting for the response to undertake the house keeping.
ExecutorPool maintains a set of executors for async tasks and another one for blocking tasks.
MIT License Author: Umesh Patil, Neosemantix, Inc.
Index ¶
- Constants
- Variables
- func GetRandomBoolean() bool
- func RandomTestTaskExecTime() int
- func SetupRand()
- func SetupTestTask()
- type Dispatcher
- type DispatcherCfg
- type ExecCfg
- type ExecPoolCfg
- type ExecServiceCfg
- type ExecutionService
- type Executor
- type ExecutorPool
- type MonitoringCfg
- type Response
- type Task
- type TaskStats
- type TestTask
Constants ¶
const DefaultCfgFileName = "default-cfg.json"
Name of a configuration file which contains default values; in the same folder where you would find execution-service.go. If user allows to use the default configuration, then in absence of user provided configuration values in this file will be used.
const ExecServiceCfgJsonElementName = "ExecServiceSettings"
Name of the Json element in any Json Configuration file which contains ExecServiceCfg structure value. Note that we do not support only part settings, the constant refers to a Json segment which will contain values for all 3 config structures.
const TaskStatusCompletedFailed = 500
const TaskStatusCompletedSuccessfully = 200
const TaskStatusFailedToSubmit = 1
const TaskStatusNotSubmitted = 0
const TaskStatusSubmitted = 100
const TestTaskDurationRange = TestTaskExecDurationUpperLimit - TestTaskExecDurationLowerLimit
const TestTaskExecDurationLowerLimit = 8
const TestTaskExecDurationUpperLimit = 128
Variables ¶
var GlobalRand *rand.Rand
var StaticBox *packr.Box
Functions ¶
func GetRandomBoolean ¶
func GetRandomBoolean() bool
func RandomTestTaskExecTime ¶
func RandomTestTaskExecTime() int
func SetupTestTask ¶
func SetupTestTask()
Types ¶
type Dispatcher ¶
type Dispatcher struct { JobStats *TaskStats // contains filtered or unexported fields }
Dispatcher type which hold reference to executor pool, channels used for getting back task execution results and go routines waiting on task results.
func NewDispatcher ¶
func NewDispatcher(cfg DispatcherCfg, ep *ExecutorPool) *Dispatcher
create a dispatcher with the given number of Response channel counts max channel count should be equal to number of executors in the executor pool if tasks are generally very short running, channels can be less we each dedicated channel for each task execution
task submission on the dispatcher happens in the calling 'go routine / thread' (Alternate design could be waiting tasks are in map, we use one single fixed channel on which responses for all tasks are published and on the receiving side based on task id of the response object, we match waiting tasks in the map (key task id) and release the response.)
cc: how many channels to create to listen back task result
cp: capacity - buffer size - for each channel
ep: executor pool
wfc: whether to block the submission for availability a channel to hear back the task result. It does not apply for async tasks.
func (*Dispatcher) Start ¶
func (disp *Dispatcher) Start()
func (*Dispatcher) Stop ¶
func (disp *Dispatcher) Stop()
type DispatcherCfg ¶
type DispatcherCfg struct { // Number of channels used to receive back task execution results ChannelCount int `json:"channel_count"` // Channel buffer size ChannelCapacity int `json:"channel_capacity"` // Whether caller should wait for response channel availability while // submitting a task WaitForChanAvail bool `json:"wait_for_chan_avail"` }
type ExecCfg ¶
type ExecCfg struct { // How many maximum number tasks accepted by the executor when it is // already executing a task. These tasks will form the queue. TaskQueueCapacity int `json:"task_queue_capacity"` // If true, despite the full task queue capacity, caller invoking // Submit method will wait i.e. will be blocked. By default we keep it // false. So once the task queue is full, subsequent attempts to add a task // will fail as long as the queue if filled. WaitForAvailability bool `json:"wait_for_availability"` }
Executor configuration parameters
type ExecPoolCfg ¶
type ExecPoolCfg struct { // Number of executors which will be used to handle async tasks AsyncTaskExecutorCount int `json:"async_task_executor_count"` // Number of executors which will be used to hand blocking task, // caller is waiting for the execution result. BlockingTaskExecutorCount int `json:"blocking_task_executor_count"` }
type ExecServiceCfg ¶
type ExecServiceCfg struct { Dispatcher DispatcherCfg `json:"DispatcherSettings"` ExexPool ExecPoolCfg `json:"ExecPoolSettings"` Executor ExecCfg `json:"ExecutorSettings"` Monitoring MonitoringCfg `json:"MonitoringSettings"` }
Configuration for the entire execution service which comprises of configuration for Dispatcher, Executor Pool and for each Executor.
func (*ExecServiceCfg) MakeExecServiceFromCfg ¶ added in v0.0.4
func (esc *ExecServiceCfg) MakeExecServiceFromCfg() *ExecutionService
Start a new execution service from the given configuration. For the returned execution service, the given cfg is in use.
type ExecutionService ¶
type ExecutionService struct { Monitor *util.Monitor // exposed for testing purposes ServiceCfgInUse *ExecServiceCfg // contains filtered or unexported fields }
func NewExecutionService ¶
func NewExecutionService(cfgFileName string, useDefault bool) *ExecutionService
Caller can pass the configuration file name which will contain all parameters needed to start the execution service. The file will be searched in the directory as pointed by the environmental variable GO_CFG_HOME. If the environmental variable is not set or file is not found; caller can indicate whether default configuration file is to be used or not. If configuration is found, method returns with a Fatal Log call.
func (*ExecutionService) CloneCfg ¶ added in v0.0.4
func (es *ExecutionService) CloneCfg() *ExecServiceCfg
Clone the configuration in use of the given execution service
func (ExecutionService) GetData ¶ added in v0.0.4
func (es ExecutionService) GetData() util.Blob
func (ExecutionService) Name ¶ added in v0.0.4
func (es ExecutionService) Name() string
func (*ExecutionService) Start ¶
func (es *ExecutionService) Start()
func (*ExecutionService) Stop ¶
func (es *ExecutionService) Stop()
type Executor ¶
type Executor interface { Start() Submit(t Task) error HowManyInQueue() int WaitForAvailability(wfa bool) Stop() }
We start with core Executor contract as an interface. As expected it has common methods like Start, Stop and Submit to receive a task. User can also specify whether we wait for availability of an internal buffer to accept the incoming task.
func NewExecutor ¶
type ExecutorPool ¶
type ExecutorPool struct {
// contains filtered or unexported fields
}
Holds two separate arrays of executors - one for blocking tasks and the other for async execution. ExecutorPool also fulfills the actual Executor contract: Start, Stop, Submit and other methods. That makes it consistent.
func NewExecutorPool ¶
func NewExecutorPool(epCfg ExecPoolCfg, cfg ExecCfg) *ExecutorPool
async: how many executors for execution of async tasks blocked: how many executors for execution of blocked tasks wfa: wait for availability in the queue for an executor
func (*ExecutorPool) HowManyInQueue ¶
func (es *ExecutorPool) HowManyInQueue() int
func (*ExecutorPool) Start ¶
func (es *ExecutorPool) Start()
func (*ExecutorPool) Stop ¶
func (es *ExecutorPool) Stop()
func (*ExecutorPool) Submit ¶
func (es *ExecutorPool) Submit(tsk Task) error
func (*ExecutorPool) TotalExecutorCount ¶
func (es *ExecutorPool) TotalExecutorCount() int
type MonitoringCfg ¶ added in v0.0.4
type MonitoringCfg struct { MonitoringFrequency int `json:"MonitoringFrequency"` MonDataChanBufSz int `json:"ChannelBufferSize"` }
Configuration about how the monitoring is done at runtime.
type Response ¶
type Response struct { // Id of the task to which this response corresponds to TaskId int // Task status - whether it succeeded or failed or some other state Status int // Output of the successfully executed task. // For now we assume it will be a JSON string Result string Errors []error }
func FailedToSubmitResponse ¶
func NewResponse ¶
type Task ¶
type Task interface { GetId() int Execute() Response SetRespChan(rc chan Response) GetRespChan() chan Response IsBlocking() bool }
Basic interface client of executor module should implement so as to get the work done. It has standard id and core execute methods. Also it needs to carry the channel with it on which the result of execution will be reported. It will also reveal whether it is a blocking task or not.
type TaskStats ¶ added in v0.0.4
type TaskStats struct { sync.Mutex UpSinceWhen time.Time `json:"up_since_when"` TotalTasksSubmitted int `json:"total_tasks_submitted"` BlockingTasksSubmitted int `json:"blocking_tasks_submitted"` AsyncTasksSubmitted int `json:"async_tasks_submitted"` TasksInExecution int `json:"tasks_in_execution"` }
It tracks various task statistics submitted and executed through a dispatcher.
type TestTask ¶
type TestTask struct {
// contains filtered or unexported fields
}