cereal

package
v0.0.0-...-cb6f7a1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 8, 2025 License: Apache-2.0 Imports: 11 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNoTasks                  = errors.New("no tasks in queue")
	ErrNoWorkflowInstances      = errors.New("no workflow instances in queue")
	ErrWorkflowScheduleExists   = errors.New("workflow schedule already exists")
	ErrWorkflowInstanceExists   = errors.New("workflow instance already exists")
	ErrNoDueWorkflows           = errors.New("no due workflows")
	ErrNoScheduledWorkflows     = errors.New("no workflows are scheduled")
	ErrInvalidSchedule          = errors.New("workflow schedule is not valid")
	ErrWorkflowInstanceNotFound = errors.New("workflow instance not found")
	ErrWorkflowScheduleNotFound = errors.New("workflow schedule not found")
	ErrWorkflowNotComplete      = errors.New("workflow instance is still running")
	ErrTaskLost                 = errors.New("task lost before reporting its status")
)
View Source
var (
	// maxWakeupInterval is the maximum amount of time we will
	// sleep between checking the recurrence table.
	MaxWakeupInterval = 60 * time.Second
	// lateWarningThreshold is how late a job can be before we
	// will log a warning.
	LateWarningThreshold = 10 * time.Second
)

Functions

This section is empty.

Types

type CancelEvent

type CancelEvent struct {
	// The time this time complete event was added to the workflow
	// event queue.
	EnqueuedAt time.Time
}

CancelEvent is passed to the OnCancel callback of the WorkflowExecutor when the workflow is signaled for cancellation.

type CompleteOpt

type CompleteOpt func(*Decision)

func WithResult

func WithResult(obj interface{}) CompleteOpt

type Decision

type Decision struct {
	// contains filtered or unexported fields
}

Decision indicates how the execution of a workflow instance is to proceed. This struct should not be created by the user. Instead, use the methods that return Decision on the WorkflowInstance.

func NewCompleteDecision

func NewCompleteDecision(result interface{}) Decision

func NewContinueDecision

func NewContinueDecision(payload interface{}) Decision

func NewFailDecision

func NewFailDecision(err error) Decision

func (*Decision) Err

func (d *Decision) Err() error

func (*Decision) IsComplete

func (d *Decision) IsComplete() bool

func (*Decision) IsContinuing

func (d *Decision) IsContinuing() bool

func (*Decision) IsFailed

func (d *Decision) IsFailed() bool

func (*Decision) Payload

func (d *Decision) Payload() interface{}

func (*Decision) Result

func (d *Decision) Result() interface{}

type Driver

type Driver interface {
	TaskDequeuer

	EnqueueWorkflow(ctx context.Context, workflow *WorkflowInstanceData) error
	DequeueWorkflow(ctx context.Context, workflowNames []string) (*WorkflowEvent, WorkflowCompleter, error)
	CancelWorkflow(ctx context.Context, instanceName string, workflowName string) error
	KillWorkflow(ctx context.Context, instanceName string, workflowName string) error

	CreateWorkflowSchedule(ctx context.Context, instanceName string, workflowName string, parameters []byte, enabled bool, recurrence string, nextRunAt time.Time) error
	ListWorkflowSchedules(ctx context.Context) ([]*Schedule, error)

	GetWorkflowScheduleByName(ctx context.Context, instanceName string, workflowName string) (*Schedule, error)
	UpdateWorkflowScheduleByName(ctx context.Context, instanceName string, workflowName string, opts WorkflowScheduleUpdateOptions) error

	GetWorkflowInstanceByName(ctx context.Context, instanceName string, workflowName string) (*WorkflowInstanceData, error)
	ListWorkflowInstances(ctx context.Context, opts ListWorkflowOpts) ([]*WorkflowInstanceData, error)

	Init() error
	Close() error
}

type ImmutableWorkflowInstance

type ImmutableWorkflowInstance interface {
	// GetPayload unmarshals the payload of the workflow instance into the
	// value pointed at by obj. This payload is any state the user wishes
	// to keep.
	GetPayload(obj interface{}) error

	// GetParameters unmarshals the parameters the workflow instance was
	// started with into the value pointed at by obj.
	GetParameters(obj interface{}) error

	IsRunning() bool

	GetResult(obj interface{}) error

	Err() error
}

type IntervalSuggester

type IntervalSuggester interface {
	DefaultTaskPollInterval() time.Duration
	DefaultWorkflowPollInterval() time.Duration
}

IntervalSuggester is an interface that backends can optionally implement to suggest a default TaskPollInterval and WorkflowPollInterval to the rest of the cereal library.

type ListWorkflowOpts

type ListWorkflowOpts struct {
	WorkflowName *string
	InstanceName *string
	IsRunning    *bool
}

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager is responsible for calling WorkflowExecutors and TaskExecutors when they need to be processed, along with managing the scheduling of workflows.

func NewManager

func NewManager(b Driver, opts ...ManagerOpt) (*Manager, error)

NewManager creates a new Manager with the given Driver. If the driver fails to initialize, an error is returned.

func (*Manager) CancelWorkflow

func (m *Manager) CancelWorkflow(ctx context.Context, workflowName WorkflowName,
	instanceName string) error

func (*Manager) CreateWorkflowSchedule

func (m *Manager) CreateWorkflowSchedule(
	ctx context.Context,
	instanceName string,
	workflowName WorkflowName,
	parameters interface{},
	enabled bool,
	recurRule *rrule.RRule,
) error

CreateWorkflowSchedule creates a recurring workflow based on the recurrence rule provided. The first run will happen during when the recurrence is first due from Now.

func (*Manager) EnqueueWorkflow

func (m *Manager) EnqueueWorkflow(ctx context.Context, workflowName WorkflowName,
	instanceName string, parameters interface{}) error

EnqueueWorkflow enqueues a workflow of type workflowName. Only one instance of (workflowName, instanceName) can be running at a time.

func (*Manager) GetWorkflowInstanceByName

func (m *Manager) GetWorkflowInstanceByName(ctx context.Context, instanceName string, workflowName WorkflowName) (ImmutableWorkflowInstance, error)

func (*Manager) GetWorkflowScheduleByName

func (m *Manager) GetWorkflowScheduleByName(ctx context.Context, instanceName string, workflowName WorkflowName) (*Schedule, error)

func (*Manager) KillWorkflow

func (m *Manager) KillWorkflow(ctx context.Context, workflowName WorkflowName,
	instanceName string) error

func (*Manager) ListWorkflowSchedules

func (m *Manager) ListWorkflowSchedules(ctx context.Context) ([]*Schedule, error)

ListWorkflowSchedules list all the scheduled workflows.

func (*Manager) RegisterTaskExecutor

func (m *Manager) RegisterTaskExecutor(taskName TaskName, executor TaskExecutor, opts TaskExecutorOpts) error

RegisterTaskExecutor registers a TaskExecutor to execute tasks of type taskName. This is not safe to call concurrently and should be done from only one thread of the process. This must be called before Start.

func (*Manager) RegisterWorkflowExecutor

func (m *Manager) RegisterWorkflowExecutor(workflowName WorkflowName,
	workflowExecutor WorkflowExecutor) error

RegisterWorkflowExecutor registers a WorkflowExecutor to execute workflows of type workflowName. This is not safe to call concurrently and should be done from only one thread of the process. This must be called before Start.

func (*Manager) Start

func (m *Manager) Start(ctx context.Context) error

Start starts the Manager. No workflows, tasks, or schedules will be processed before Start is called. This should only be called once.

func (*Manager) Stop

func (m *Manager) Stop() error

func (*Manager) UpdateWorkflowScheduleByName

func (m *Manager) UpdateWorkflowScheduleByName(ctx context.Context,
	instanceName string, workflowName WorkflowName, opts ...WorkflowScheduleUpdateOpt) error

UpdateWorkflowScheduleByName updates the scheduled workflow identified by (instanceName, workflowName).

func (*Manager) WakeupTaskPollerByTaskName

func (m *Manager) WakeupTaskPollerByTaskName(taskName string)

func (*Manager) WakeupWorkflowExecutor

func (m *Manager) WakeupWorkflowExecutor()

type ManagerOpt

type ManagerOpt func(*Manager)

ManagerOpt is an option that can be passed to NewManager.

func WithOnWorkflowCompleteCallback

func WithOnWorkflowCompleteCallback(c OnWorkflowCompleteCallback) ManagerOpt

WithOnWorkflowCompleteCallback sets an OnWorkflowComplete callback that will be called whenever a workflow is finished (i.e. the workflow returns a failure or completion decision). This is intended for testing and debugging purposes ONLY.

func WithTaskDequeueWorkers

func WithTaskDequeueWorkers(count int) ManagerOpt

WithTaskDequeueWorkers sets the number of workers that will be used to dequeue task. This limits the number of concurrent calls being made to the backend.

func WithTaskPollInterval

func WithTaskPollInterval(interval time.Duration) ManagerOpt

WithTaskPollInterval sets the polling interval for all TaskExecutor workers. Each worker will poll the database at least every interval for new jobs.

func WithTaskPollIntervalMaxJitter

func WithTaskPollIntervalMaxJitter(jitter time.Duration) ManagerOpt

WithTaskPollIntervalMaxJitter is the maximum amount of time before the configured interval that we can wake up to prevent TaskPollers from always waking up at the same time.

func WithWorkflowPollInterval

func WithWorkflowPollInterval(interval time.Duration) ManagerOpt

WithWorkflowPollInterval sets the polling interval for the main workflow processing loop. The loop will wake up at least once every interval to check for new workflow events.

type OnWorkflowCompleteCallback

type OnWorkflowCompleteCallback func(*WorkflowEvent)

A OnWorkflowCompleteCallback is a function that can be called at the completion of workflows for debugging and testing purposes. The function should not be used for application logic.

type Schedule

type Schedule struct {
	// TODO(ssd) 2019-07-19: ID was originally placed on backends
	// because it was unclear whether (workflow_name,
	// instance_name) can actually be unique in the case of
	// scheduled workflows. We currently have a unique constraint
	// on them, so we could remove this, but since it was on this
	// struct it ended up in a few queries that would need to
	// change.
	ID             int64
	Enabled        bool
	InstanceName   string
	WorkflowName   string
	Parameters     []byte
	Recurrence     string
	NextDueAt      time.Time
	LastEnqueuedAt time.Time

	// These come from the latest result
	LastStart *time.Time
	LastEnd   *time.Time
}

Schedule represents a recurring workflow. TODO(jaym): we should wrap this in the workflow package and provide a getter for the parameters

func (*Schedule) GetParameters

func (s *Schedule) GetParameters(out interface{}) error

func (*Schedule) GetRRule

func (s *Schedule) GetRRule() (*rrule.RRule, error)

type ScheduledWorkflowCompleter

type ScheduledWorkflowCompleter interface {
	EnqueueAndUpdateScheduledWorkflow(s *Schedule) error
	DisableSchedule(s *Schedule) error
	Close()
}

type SchedulerDriver

type SchedulerDriver interface {
	GetDueScheduledWorkflow(ctx context.Context) (*Schedule, ScheduledWorkflowCompleter, error)
	GetNextScheduledWorkflow(ctx context.Context) (*Schedule, error)
}

type StartEvent

type StartEvent struct {
	// The time this start event was added to the workflow event
	// queue.
	EnqueuedAt time.Time
}

StartEvent is passed to the OnStart callback of the WorkflowExecutor when a workflow instance is signaled to be started.

type StartGuard

type StartGuard struct {
	// contains filtered or unexported fields
}

StartGuard is used to enforce that Start is only called once for various structures inside cereal.

We were previously using sync.WaitGroup for this, but this allows us to customize the error message a bit.

The zero-value of this should work as expected, but you can create a StartGuard with a custom message using NewStartGuard.

func NewStartGuard

func NewStartGuard(msg string) StartGuard

func (*StartGuard) Started

func (s *StartGuard) Started()

type Task

type Task interface {
	// GetParameters unmarshals the parameters the task was started with into
	// the value pointed at by obj.
	GetParameters(obj interface{}) error

	// GetMetadata returns metadata about the task, that the task
	// might use to make decisions about whether to continue
	// running this task.
	GetMetadata() TaskMetadata
}

Task is an interface to an object representing a running Task. This will be provided to TaskExecutor implementations when the Run method is called.

type TaskCompleteEvent

type TaskCompleteEvent struct {
	// TaskName is the type of the task that completed
	TaskName TaskName

	// Result contains information representing the completion of the
	// task such as if it errored or returned a value.
	Result TaskResult

	// The time this time complete event was added to the workflow
	// event queue.
	EnqueuedAt time.Time
}

TaskCompleteEvent is passed to the OnTaskComplete callback of the WorkflowExecutor when a task for a workflow instance completes.

type TaskCompleter

type TaskCompleter interface {
	Context() context.Context
	Fail(err string) error
	Succeed(result []byte) error
}

type TaskData

type TaskData struct {
	Name       string
	Parameters []byte
	Metadata   TaskMetadata
}

TODO(ssd) 2019-10-04: Probably remove this?

func (*TaskData) GetMetadata

func (t *TaskData) GetMetadata() TaskMetadata

func (*TaskData) GetParameters

func (t *TaskData) GetParameters(obj interface{}) error

type TaskDequeuer

type TaskDequeuer interface {
	DequeueTask(ctx context.Context, taskName string) (*TaskData, TaskCompleter, error)
}

type TaskEnqueueOpt

type TaskEnqueueOpt func(*TaskEnqueueOptions)

A TaskEnqueueOpt is an optional parameters for enqueuing a task

func StartAfter

func StartAfter(startAfter time.Time) TaskEnqueueOpt

StartAfter indicates when the task should start running

type TaskEnqueueOptions

type TaskEnqueueOptions struct {
	StartAfter time.Time
}

This is annoying public so that it can be referred to by the backends.

type TaskExecutor

type TaskExecutor interface {
	// Run implements the logic for running the task. The returned result/err
	// will be provided to the workflow instance that started this task on
	// completion. This method should strive to be retryable / idempotent
	// as there is a possibility that it can run multiple times.
	Run(ctx context.Context, task Task) (result interface{}, err error)
}

TaskExecutor is the interface implemented by objects that can run tasks of a certain type. TODO(ssd) 2019-05-10: How do we want to handle cancellation?

type TaskExecutorOpts

type TaskExecutorOpts struct {
	// Timeout is how long to wait before canceling a running task.
	Timeout time.Duration
	// Workers specifies the max concurrently executing tasks the Manager
	// will launch for the registered TaskExecutor.
	Workers int
}

TaskExecutorOpts are options that describe how a TaskExecutor should run tasks.

type TaskMetadata

type TaskMetadata struct {
	EnqueuedAt time.Time
}

type TaskName

type TaskName struct {
	// contains filtered or unexported fields
}

TaskName identifies tasks registered with the cereal library. Many functions in the cereal library take or return TaskNames.

func NewTaskName

func NewTaskName(name string) TaskName

NewTaskName constructs a TaskName from the given string. The resulting TaskName is comparable to other WorkflowName. Two TaskNames are equal if they were constructed with strings that were equal.

func (TaskName) String

func (w TaskName) String() string

type TaskResult

type TaskResult interface {
	GetParameters(obj interface{}) error

	// Get unmarshals the result returned by the task into the value pointed at
	// by obj.
	Get(obj interface{}) error

	// Err returns an error if the task returned an error upon completion,
	// otherwise nil. The exact error types will not be preserved.
	Err() error
}

TaskResult is an interface to an object representing a completed Task. This will be provided to the OnTaskComplete callback method of WorkflowExecutor implementations.

type TaskResultData

type TaskResultData struct {
	TaskName   string
	Parameters []byte

	Status    TaskStatusType
	ErrorText string
	Result    []byte
}

func (*TaskResultData) Err

func (r *TaskResultData) Err() error

func (*TaskResultData) Get

func (r *TaskResultData) Get(obj interface{}) error

func (*TaskResultData) GetParameters

func (r *TaskResultData) GetParameters(obj interface{}) error

type TaskStatusType

type TaskStatusType string
const (
	TaskStatusSuccess        TaskStatusType = "success"
	TaskStatusFailed         TaskStatusType = "failed"
	TaskStatusLost           TaskStatusType = "lost"
	TaskStatusUnusableResult TaskStatusType = "unusable_result"
)

type WorkflowCompleter

type WorkflowCompleter interface {
	EnqueueTask(task *TaskData, opts TaskEnqueueOptions) error

	Continue(payload []byte) error
	Fail(err error) error
	Done(result []byte) error
	Close() error
}

type WorkflowEvent

type WorkflowEvent struct {
	Instance           WorkflowInstanceData
	Type               WorkflowEventType
	EnqueuedTaskCount  int
	CompletedTaskCount int
	EnqueuedAt         time.Time

	TaskResult *TaskResultData
}

type WorkflowEventType

type WorkflowEventType string
const (
	WorkflowStart  WorkflowEventType = "start"
	TaskComplete   WorkflowEventType = "task_complete"
	WorkflowCancel WorkflowEventType = "cancel"
)

type WorkflowExecutor

type WorkflowExecutor interface {
	// OnStart is called when a StartEvent for this type of workflow is to
	// be processed.
	OnStart(w WorkflowInstance, ev StartEvent) Decision

	// OnTaskComplete is called when a TaskCompleteEvent for this type of
	// workflow is to be processed. This event will never be received before
	// OnStart for the given WorkflowInstance.
	OnTaskComplete(w WorkflowInstance, ev TaskCompleteEvent) Decision

	// OnCancel is called when a workflow instance is to be canceled.
	OnCancel(w WorkflowInstance, ev CancelEvent) Decision
}

WorkflowExecutor is the interface implemented by objects that can process a workflow of a certain type.

type WorkflowInstance

type WorkflowInstance interface {
	// GetPayload unmarshals the payload of the workflow instance into the
	// value pointed at by obj. This payload is any state the user wishes
	// to keep.
	GetPayload(obj interface{}) error

	// GetParameters unmarshals the parameters the workflow instance was
	// started with into the value pointed at by obj.
	GetParameters(obj interface{}) error

	// EnqueueTask requests that a task of the type taskName be started
	// with the given parameters. Any enqueued tasks will be started
	// after the currently running callback of the WorkflowExecutor
	// returns.
	EnqueueTask(taskName TaskName, parameters interface{}, opts ...TaskEnqueueOpt) error

	// Complete returns a decision to end execution of the workflow for
	// the running workflow instance.
	Complete(...CompleteOpt) Decision

	// Continue returns a decision to continue execution of the workflow for
	// the running workflow instance. The provided payload will available when
	// this workflow instance is processed by a WorkflowExecutor next.
	Continue(payload interface{}) Decision

	// Fail returns a decision to end the execution of the
	// workflow because of an error that occurred when processing
	// the workflow event.
	Fail(error) Decision

	// InstanceName returns the workflow instance name
	InstanceName() string

	// TotalEnqueuedTasks returns the total number of tasks that
	// have been enqueued for the lifetime of the running workflow
	// instance.
	TotalEnqueuedTasks() int

	// TotalCompletedTasks returns the total number of tasks that have finished
	// execution and been seen by the workflow instance.
	TotalCompletedTasks() int
}

WorkflowInstance is an interface to an object representing a running workflow instance. A workflow instance keeps state of a currently running workflow, and decides what decisions need to be made based and that state. This will be passed to the callback methods of WorkflowExecutor. Only one instance of a WorkflowInstance can execute at a time.

type WorkflowInstanceData

type WorkflowInstanceData struct {
	InstanceName string
	WorkflowName string
	Status       WorkflowInstanceStatus
	Parameters   []byte
	Payload      []byte

	Err    error
	Result []byte
}

type WorkflowInstanceStatus

type WorkflowInstanceStatus string
const (
	WorkflowInstanceStatusStarting  WorkflowInstanceStatus = "starting"
	WorkflowInstanceStatusRunning   WorkflowInstanceStatus = "running"
	WorkflowInstanceStatusCompleted WorkflowInstanceStatus = "completed"
)

type WorkflowName

type WorkflowName struct {
	// contains filtered or unexported fields
}

WorkflowName identifies workflows registered with the cereal library. Many functions in the cereal library take or return WorkflowNames.

func NewWorkflowName

func NewWorkflowName(name string) WorkflowName

NewWorkflowName constructs a WorkflowName from the given string. The resulting WorkflowName is comparable to other WorkflowName. Two WorkflowNames are equal if they were constructed with strings that were equal.

func (WorkflowName) String

func (w WorkflowName) String() string

type WorkflowScheduleUpdateOpt

type WorkflowScheduleUpdateOpt func(*WorkflowScheduleUpdateOptions) error

WorkflowScheduleUpdateOpts represents changes that can be made to a scheduled workflow. The changes will be committed atomically.

func UpdateEnabled

func UpdateEnabled(enabled bool) WorkflowScheduleUpdateOpt

UpdateEnabled allows enabling or disabling a scheduled workflow.

func UpdateParameters

func UpdateParameters(parameters interface{}) WorkflowScheduleUpdateOpt

UpdateParameters allows changing the parameters a workflow will be started with.

func UpdateRecurrence

func UpdateRecurrence(recurRule *rrule.RRule) WorkflowScheduleUpdateOpt

UpdateRecurrence changes the recurrence rule for the scheduled workflow. The next run will happen when the recurrence is first due from Now.

type WorkflowScheduleUpdateOptions

type WorkflowScheduleUpdateOptions struct {
	UpdateEnabled bool
	Enabled       bool

	UpdateParameters bool
	Parameters       []byte

	UpdateRecurrence bool
	Recurrence       string
	NextRunAt        time.Time
}

type WorkflowScheduler

type WorkflowScheduler struct {
	// contains filtered or unexported fields
}

func NewWorkflowScheduler

func NewWorkflowScheduler(b SchedulerDriver, w func()) *WorkflowScheduler

func (*WorkflowScheduler) Run

func (w *WorkflowScheduler) Run(ctx context.Context)

func (*WorkflowScheduler) Trigger

func (w *WorkflowScheduler) Trigger()

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL