Documentation ¶
Index ¶
- Variables
- type CancelEvent
- type CompleteOpt
- type Decision
- type Driver
- type ImmutableWorkflowInstance
- type IntervalSuggester
- type ListWorkflowOpts
- type Manager
- func (m *Manager) CancelWorkflow(ctx context.Context, workflowName WorkflowName, instanceName string) error
- func (m *Manager) CreateWorkflowSchedule(ctx context.Context, instanceName string, workflowName WorkflowName, ...) error
- func (m *Manager) EnqueueWorkflow(ctx context.Context, workflowName WorkflowName, instanceName string, ...) error
- func (m *Manager) GetWorkflowInstanceByName(ctx context.Context, instanceName string, workflowName WorkflowName) (ImmutableWorkflowInstance, error)
- func (m *Manager) GetWorkflowScheduleByName(ctx context.Context, instanceName string, workflowName WorkflowName) (*Schedule, error)
- func (m *Manager) KillWorkflow(ctx context.Context, workflowName WorkflowName, instanceName string) error
- func (m *Manager) ListWorkflowSchedules(ctx context.Context) ([]*Schedule, error)
- func (m *Manager) RegisterTaskExecutor(taskName TaskName, executor TaskExecutor, opts TaskExecutorOpts) error
- func (m *Manager) RegisterWorkflowExecutor(workflowName WorkflowName, workflowExecutor WorkflowExecutor) error
- func (m *Manager) Start(ctx context.Context) error
- func (m *Manager) Stop() error
- func (m *Manager) UpdateWorkflowScheduleByName(ctx context.Context, instanceName string, workflowName WorkflowName, ...) error
- func (m *Manager) WakeupTaskPollerByTaskName(taskName string)
- func (m *Manager) WakeupWorkflowExecutor()
- type ManagerOpt
- func WithOnWorkflowCompleteCallback(c OnWorkflowCompleteCallback) ManagerOpt
- func WithTaskDequeueWorkers(count int) ManagerOpt
- func WithTaskPollInterval(interval time.Duration) ManagerOpt
- func WithTaskPollIntervalMaxJitter(jitter time.Duration) ManagerOpt
- func WithWorkflowPollInterval(interval time.Duration) ManagerOpt
- type OnWorkflowCompleteCallback
- type Schedule
- type ScheduledWorkflowCompleter
- type SchedulerDriver
- type StartEvent
- type StartGuard
- type Task
- type TaskCompleteEvent
- type TaskCompleter
- type TaskData
- type TaskDequeuer
- type TaskEnqueueOpt
- type TaskEnqueueOptions
- type TaskExecutor
- type TaskExecutorOpts
- type TaskMetadata
- type TaskName
- type TaskResult
- type TaskResultData
- type TaskStatusType
- type WorkflowCompleter
- type WorkflowEvent
- type WorkflowEventType
- type WorkflowExecutor
- type WorkflowInstance
- type WorkflowInstanceData
- type WorkflowInstanceStatus
- type WorkflowName
- type WorkflowScheduleUpdateOpt
- type WorkflowScheduleUpdateOptions
- type WorkflowScheduler
Constants ¶
This section is empty.
Variables ¶
var ( ErrNoTasks = errors.New("no tasks in queue") ErrNoWorkflowInstances = errors.New("no workflow instances in queue") ErrWorkflowScheduleExists = errors.New("workflow schedule already exists") ErrWorkflowInstanceExists = errors.New("workflow instance already exists") ErrNoDueWorkflows = errors.New("no due workflows") ErrNoScheduledWorkflows = errors.New("no workflows are scheduled") ErrInvalidSchedule = errors.New("workflow schedule is not valid") ErrWorkflowInstanceNotFound = errors.New("workflow instance not found") ErrWorkflowScheduleNotFound = errors.New("workflow schedule not found") ErrWorkflowNotComplete = errors.New("workflow instance is still running") ErrTaskLost = errors.New("task lost before reporting its status") )
var ( // maxWakeupInterval is the maximum amount of time we will // sleep between checking the recurrence table. MaxWakeupInterval = 60 * time.Second // lateWarningThreshold is how late a job can be before we // will log a warning. LateWarningThreshold = 10 * time.Second )
Functions ¶
This section is empty.
Types ¶
type CancelEvent ¶
type CancelEvent struct { // The time this time complete event was added to the workflow // event queue. EnqueuedAt time.Time }
CancelEvent is passed to the OnCancel callback of the WorkflowExecutor when the workflow is signaled for cancellation.
type CompleteOpt ¶
type CompleteOpt func(*Decision)
func WithResult ¶
func WithResult(obj interface{}) CompleteOpt
type Decision ¶
type Decision struct {
// contains filtered or unexported fields
}
Decision indicates how the execution of a workflow instance is to proceed. This struct should not be created by the user. Instead, use the methods that return Decision on the WorkflowInstance.
func NewCompleteDecision ¶
func NewCompleteDecision(result interface{}) Decision
func NewContinueDecision ¶
func NewContinueDecision(payload interface{}) Decision
func NewFailDecision ¶
func (*Decision) IsComplete ¶
func (*Decision) IsContinuing ¶
type Driver ¶
type Driver interface { TaskDequeuer EnqueueWorkflow(ctx context.Context, workflow *WorkflowInstanceData) error DequeueWorkflow(ctx context.Context, workflowNames []string) (*WorkflowEvent, WorkflowCompleter, error) CancelWorkflow(ctx context.Context, instanceName string, workflowName string) error KillWorkflow(ctx context.Context, instanceName string, workflowName string) error CreateWorkflowSchedule(ctx context.Context, instanceName string, workflowName string, parameters []byte, enabled bool, recurrence string, nextRunAt time.Time) error ListWorkflowSchedules(ctx context.Context) ([]*Schedule, error) GetWorkflowScheduleByName(ctx context.Context, instanceName string, workflowName string) (*Schedule, error) UpdateWorkflowScheduleByName(ctx context.Context, instanceName string, workflowName string, opts WorkflowScheduleUpdateOptions) error GetWorkflowInstanceByName(ctx context.Context, instanceName string, workflowName string) (*WorkflowInstanceData, error) ListWorkflowInstances(ctx context.Context, opts ListWorkflowOpts) ([]*WorkflowInstanceData, error) Init() error Close() error }
type ImmutableWorkflowInstance ¶
type ImmutableWorkflowInstance interface { // GetPayload unmarshals the payload of the workflow instance into the // value pointed at by obj. This payload is any state the user wishes // to keep. GetPayload(obj interface{}) error // GetParameters unmarshals the parameters the workflow instance was // started with into the value pointed at by obj. GetParameters(obj interface{}) error IsRunning() bool GetResult(obj interface{}) error Err() error }
type IntervalSuggester ¶
type IntervalSuggester interface { DefaultTaskPollInterval() time.Duration DefaultWorkflowPollInterval() time.Duration }
IntervalSuggester is an interface that backends can optionally implement to suggest a default TaskPollInterval and WorkflowPollInterval to the rest of the cereal library.
type ListWorkflowOpts ¶
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager is responsible for calling WorkflowExecutors and TaskExecutors when they need to be processed, along with managing the scheduling of workflows.
func NewManager ¶
func NewManager(b Driver, opts ...ManagerOpt) (*Manager, error)
NewManager creates a new Manager with the given Driver. If the driver fails to initialize, an error is returned.
func (*Manager) CancelWorkflow ¶
func (*Manager) CreateWorkflowSchedule ¶
func (m *Manager) CreateWorkflowSchedule( ctx context.Context, instanceName string, workflowName WorkflowName, parameters interface{}, enabled bool, recurRule *rrule.RRule, ) error
CreateWorkflowSchedule creates a recurring workflow based on the recurrence rule provided. The first run will happen during when the recurrence is first due from Now.
func (*Manager) EnqueueWorkflow ¶
func (m *Manager) EnqueueWorkflow(ctx context.Context, workflowName WorkflowName, instanceName string, parameters interface{}) error
EnqueueWorkflow enqueues a workflow of type workflowName. Only one instance of (workflowName, instanceName) can be running at a time.
func (*Manager) GetWorkflowInstanceByName ¶
func (m *Manager) GetWorkflowInstanceByName(ctx context.Context, instanceName string, workflowName WorkflowName) (ImmutableWorkflowInstance, error)
func (*Manager) GetWorkflowScheduleByName ¶
func (*Manager) KillWorkflow ¶
func (*Manager) ListWorkflowSchedules ¶
ListWorkflowSchedules list all the scheduled workflows.
func (*Manager) RegisterTaskExecutor ¶
func (m *Manager) RegisterTaskExecutor(taskName TaskName, executor TaskExecutor, opts TaskExecutorOpts) error
RegisterTaskExecutor registers a TaskExecutor to execute tasks of type taskName. This is not safe to call concurrently and should be done from only one thread of the process. This must be called before Start.
func (*Manager) RegisterWorkflowExecutor ¶
func (m *Manager) RegisterWorkflowExecutor(workflowName WorkflowName, workflowExecutor WorkflowExecutor) error
RegisterWorkflowExecutor registers a WorkflowExecutor to execute workflows of type workflowName. This is not safe to call concurrently and should be done from only one thread of the process. This must be called before Start.
func (*Manager) Start ¶
Start starts the Manager. No workflows, tasks, or schedules will be processed before Start is called. This should only be called once.
func (*Manager) UpdateWorkflowScheduleByName ¶
func (m *Manager) UpdateWorkflowScheduleByName(ctx context.Context, instanceName string, workflowName WorkflowName, opts ...WorkflowScheduleUpdateOpt) error
UpdateWorkflowScheduleByName updates the scheduled workflow identified by (instanceName, workflowName).
func (*Manager) WakeupTaskPollerByTaskName ¶
func (*Manager) WakeupWorkflowExecutor ¶
func (m *Manager) WakeupWorkflowExecutor()
type ManagerOpt ¶
type ManagerOpt func(*Manager)
ManagerOpt is an option that can be passed to NewManager.
func WithOnWorkflowCompleteCallback ¶
func WithOnWorkflowCompleteCallback(c OnWorkflowCompleteCallback) ManagerOpt
WithOnWorkflowCompleteCallback sets an OnWorkflowComplete callback that will be called whenever a workflow is finished (i.e. the workflow returns a failure or completion decision). This is intended for testing and debugging purposes ONLY.
func WithTaskDequeueWorkers ¶
func WithTaskDequeueWorkers(count int) ManagerOpt
WithTaskDequeueWorkers sets the number of workers that will be used to dequeue task. This limits the number of concurrent calls being made to the backend.
func WithTaskPollInterval ¶
func WithTaskPollInterval(interval time.Duration) ManagerOpt
WithTaskPollInterval sets the polling interval for all TaskExecutor workers. Each worker will poll the database at least every interval for new jobs.
func WithTaskPollIntervalMaxJitter ¶
func WithTaskPollIntervalMaxJitter(jitter time.Duration) ManagerOpt
WithTaskPollIntervalMaxJitter is the maximum amount of time before the configured interval that we can wake up to prevent TaskPollers from always waking up at the same time.
func WithWorkflowPollInterval ¶
func WithWorkflowPollInterval(interval time.Duration) ManagerOpt
WithWorkflowPollInterval sets the polling interval for the main workflow processing loop. The loop will wake up at least once every interval to check for new workflow events.
type OnWorkflowCompleteCallback ¶
type OnWorkflowCompleteCallback func(*WorkflowEvent)
A OnWorkflowCompleteCallback is a function that can be called at the completion of workflows for debugging and testing purposes. The function should not be used for application logic.
type Schedule ¶
type Schedule struct { // TODO(ssd) 2019-07-19: ID was originally placed on backends // because it was unclear whether (workflow_name, // instance_name) can actually be unique in the case of // scheduled workflows. We currently have a unique constraint // on them, so we could remove this, but since it was on this // struct it ended up in a few queries that would need to // change. ID int64 Enabled bool InstanceName string WorkflowName string Parameters []byte Recurrence string NextDueAt time.Time LastEnqueuedAt time.Time // These come from the latest result LastStart *time.Time LastEnd *time.Time }
Schedule represents a recurring workflow. TODO(jaym): we should wrap this in the workflow package and provide a getter for the parameters
func (*Schedule) GetParameters ¶
type SchedulerDriver ¶
type StartEvent ¶
type StartEvent struct { // The time this start event was added to the workflow event // queue. EnqueuedAt time.Time }
StartEvent is passed to the OnStart callback of the WorkflowExecutor when a workflow instance is signaled to be started.
type StartGuard ¶
type StartGuard struct {
// contains filtered or unexported fields
}
StartGuard is used to enforce that Start is only called once for various structures inside cereal.
We were previously using sync.WaitGroup for this, but this allows us to customize the error message a bit.
The zero-value of this should work as expected, but you can create a StartGuard with a custom message using NewStartGuard.
func NewStartGuard ¶
func NewStartGuard(msg string) StartGuard
func (*StartGuard) Started ¶
func (s *StartGuard) Started()
type Task ¶
type Task interface { // GetParameters unmarshals the parameters the task was started with into // the value pointed at by obj. GetParameters(obj interface{}) error // GetMetadata returns metadata about the task, that the task // might use to make decisions about whether to continue // running this task. GetMetadata() TaskMetadata }
Task is an interface to an object representing a running Task. This will be provided to TaskExecutor implementations when the Run method is called.
type TaskCompleteEvent ¶
type TaskCompleteEvent struct { // TaskName is the type of the task that completed TaskName TaskName // Result contains information representing the completion of the // task such as if it errored or returned a value. Result TaskResult // The time this time complete event was added to the workflow // event queue. EnqueuedAt time.Time }
TaskCompleteEvent is passed to the OnTaskComplete callback of the WorkflowExecutor when a task for a workflow instance completes.
type TaskCompleter ¶
type TaskData ¶
type TaskData struct { Name string Parameters []byte Metadata TaskMetadata }
TODO(ssd) 2019-10-04: Probably remove this?
func (*TaskData) GetMetadata ¶
func (t *TaskData) GetMetadata() TaskMetadata
func (*TaskData) GetParameters ¶
type TaskDequeuer ¶
type TaskEnqueueOpt ¶
type TaskEnqueueOpt func(*TaskEnqueueOptions)
A TaskEnqueueOpt is an optional parameters for enqueuing a task
func StartAfter ¶
func StartAfter(startAfter time.Time) TaskEnqueueOpt
StartAfter indicates when the task should start running
type TaskEnqueueOptions ¶
This is annoying public so that it can be referred to by the backends.
type TaskExecutor ¶
type TaskExecutor interface { // Run implements the logic for running the task. The returned result/err // will be provided to the workflow instance that started this task on // completion. This method should strive to be retryable / idempotent // as there is a possibility that it can run multiple times. Run(ctx context.Context, task Task) (result interface{}, err error) }
TaskExecutor is the interface implemented by objects that can run tasks of a certain type. TODO(ssd) 2019-05-10: How do we want to handle cancellation?
type TaskExecutorOpts ¶
type TaskExecutorOpts struct { // Timeout is how long to wait before canceling a running task. Timeout time.Duration // Workers specifies the max concurrently executing tasks the Manager // will launch for the registered TaskExecutor. Workers int }
TaskExecutorOpts are options that describe how a TaskExecutor should run tasks.
type TaskMetadata ¶
type TaskName ¶
type TaskName struct {
// contains filtered or unexported fields
}
TaskName identifies tasks registered with the cereal library. Many functions in the cereal library take or return TaskNames.
func NewTaskName ¶
NewTaskName constructs a TaskName from the given string. The resulting TaskName is comparable to other WorkflowName. Two TaskNames are equal if they were constructed with strings that were equal.
type TaskResult ¶
type TaskResult interface { GetParameters(obj interface{}) error // Get unmarshals the result returned by the task into the value pointed at // by obj. Get(obj interface{}) error // Err returns an error if the task returned an error upon completion, // otherwise nil. The exact error types will not be preserved. Err() error }
TaskResult is an interface to an object representing a completed Task. This will be provided to the OnTaskComplete callback method of WorkflowExecutor implementations.
type TaskResultData ¶
type TaskResultData struct { TaskName string Parameters []byte Status TaskStatusType ErrorText string Result []byte }
func (*TaskResultData) Err ¶
func (r *TaskResultData) Err() error
func (*TaskResultData) Get ¶
func (r *TaskResultData) Get(obj interface{}) error
func (*TaskResultData) GetParameters ¶
func (r *TaskResultData) GetParameters(obj interface{}) error
type TaskStatusType ¶
type TaskStatusType string
const ( TaskStatusSuccess TaskStatusType = "success" TaskStatusFailed TaskStatusType = "failed" TaskStatusLost TaskStatusType = "lost" TaskStatusUnusableResult TaskStatusType = "unusable_result" )
type WorkflowCompleter ¶
type WorkflowEvent ¶
type WorkflowEvent struct { Instance WorkflowInstanceData Type WorkflowEventType EnqueuedTaskCount int CompletedTaskCount int EnqueuedAt time.Time TaskResult *TaskResultData }
type WorkflowEventType ¶
type WorkflowEventType string
const ( WorkflowStart WorkflowEventType = "start" TaskComplete WorkflowEventType = "task_complete" WorkflowCancel WorkflowEventType = "cancel" )
type WorkflowExecutor ¶
type WorkflowExecutor interface { // OnStart is called when a StartEvent for this type of workflow is to // be processed. OnStart(w WorkflowInstance, ev StartEvent) Decision // OnTaskComplete is called when a TaskCompleteEvent for this type of // workflow is to be processed. This event will never be received before // OnStart for the given WorkflowInstance. OnTaskComplete(w WorkflowInstance, ev TaskCompleteEvent) Decision // OnCancel is called when a workflow instance is to be canceled. OnCancel(w WorkflowInstance, ev CancelEvent) Decision }
WorkflowExecutor is the interface implemented by objects that can process a workflow of a certain type.
type WorkflowInstance ¶
type WorkflowInstance interface { // GetPayload unmarshals the payload of the workflow instance into the // value pointed at by obj. This payload is any state the user wishes // to keep. GetPayload(obj interface{}) error // GetParameters unmarshals the parameters the workflow instance was // started with into the value pointed at by obj. GetParameters(obj interface{}) error // EnqueueTask requests that a task of the type taskName be started // with the given parameters. Any enqueued tasks will be started // after the currently running callback of the WorkflowExecutor // returns. EnqueueTask(taskName TaskName, parameters interface{}, opts ...TaskEnqueueOpt) error // Complete returns a decision to end execution of the workflow for // the running workflow instance. Complete(...CompleteOpt) Decision // Continue returns a decision to continue execution of the workflow for // the running workflow instance. The provided payload will available when // this workflow instance is processed by a WorkflowExecutor next. Continue(payload interface{}) Decision // Fail returns a decision to end the execution of the // workflow because of an error that occurred when processing // the workflow event. Fail(error) Decision // InstanceName returns the workflow instance name InstanceName() string // TotalEnqueuedTasks returns the total number of tasks that // have been enqueued for the lifetime of the running workflow // instance. TotalEnqueuedTasks() int // TotalCompletedTasks returns the total number of tasks that have finished // execution and been seen by the workflow instance. TotalCompletedTasks() int }
WorkflowInstance is an interface to an object representing a running workflow instance. A workflow instance keeps state of a currently running workflow, and decides what decisions need to be made based and that state. This will be passed to the callback methods of WorkflowExecutor. Only one instance of a WorkflowInstance can execute at a time.
type WorkflowInstanceData ¶
type WorkflowInstanceStatus ¶
type WorkflowInstanceStatus string
const ( WorkflowInstanceStatusStarting WorkflowInstanceStatus = "starting" WorkflowInstanceStatusRunning WorkflowInstanceStatus = "running" WorkflowInstanceStatusCompleted WorkflowInstanceStatus = "completed" )
type WorkflowName ¶
type WorkflowName struct {
// contains filtered or unexported fields
}
WorkflowName identifies workflows registered with the cereal library. Many functions in the cereal library take or return WorkflowNames.
func NewWorkflowName ¶
func NewWorkflowName(name string) WorkflowName
NewWorkflowName constructs a WorkflowName from the given string. The resulting WorkflowName is comparable to other WorkflowName. Two WorkflowNames are equal if they were constructed with strings that were equal.
func (WorkflowName) String ¶
func (w WorkflowName) String() string
type WorkflowScheduleUpdateOpt ¶
type WorkflowScheduleUpdateOpt func(*WorkflowScheduleUpdateOptions) error
WorkflowScheduleUpdateOpts represents changes that can be made to a scheduled workflow. The changes will be committed atomically.
func UpdateEnabled ¶
func UpdateEnabled(enabled bool) WorkflowScheduleUpdateOpt
UpdateEnabled allows enabling or disabling a scheduled workflow.
func UpdateParameters ¶
func UpdateParameters(parameters interface{}) WorkflowScheduleUpdateOpt
UpdateParameters allows changing the parameters a workflow will be started with.
func UpdateRecurrence ¶
func UpdateRecurrence(recurRule *rrule.RRule) WorkflowScheduleUpdateOpt
UpdateRecurrence changes the recurrence rule for the scheduled workflow. The next run will happen when the recurrence is first due from Now.
type WorkflowScheduler ¶
type WorkflowScheduler struct {
// contains filtered or unexported fields
}
func NewWorkflowScheduler ¶
func NewWorkflowScheduler(b SchedulerDriver, w func()) *WorkflowScheduler
func (*WorkflowScheduler) Run ¶
func (w *WorkflowScheduler) Run(ctx context.Context)
func (*WorkflowScheduler) Trigger ¶
func (w *WorkflowScheduler) Trigger()