Documentation ¶
Index ¶
- Constants
- Variables
- type ActivityTask
- type Backend
- type BackendOption
- func WithContextPropagator(prop workflow.ContextPropagator) BackendOption
- func WithConverter(converter converter.Converter) BackendOption
- func WithLogger(logger *slog.Logger) BackendOption
- func WithMetrics(client metrics.Client) BackendOption
- func WithStickyTimeout(timeout time.Duration) BackendOption
- func WithTracerProvider(tp trace.TracerProvider) BackendOption
- type MockBackend
- func (_m *MockBackend) CancelWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance, ...) error
- func (_m *MockBackend) Close() error
- func (_m *MockBackend) CompleteActivityTask(ctx context.Context, instance *core.WorkflowInstance, activityID string, ...) error
- func (_m *MockBackend) CompleteWorkflowTask(ctx context.Context, task *WorkflowTask, instance *core.WorkflowInstance, ...) error
- func (_m *MockBackend) ContextPropagators() []workflow.ContextPropagator
- func (_m *MockBackend) Converter() converter.Converter
- func (_m *MockBackend) CreateWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance, event *history.Event) error
- func (_m *MockBackend) ExtendActivityTask(ctx context.Context, activityID string) error
- func (_m *MockBackend) ExtendWorkflowTask(ctx context.Context, taskID string, instance *core.WorkflowInstance) error
- func (_m *MockBackend) GetActivityTask(ctx context.Context) (*ActivityTask, error)
- func (_m *MockBackend) GetStats(ctx context.Context) (*Stats, error)
- func (_m *MockBackend) GetWorkflowInstanceHistory(ctx context.Context, instance *core.WorkflowInstance, lastSequenceID *int64) ([]*history.Event, error)
- func (_m *MockBackend) GetWorkflowInstanceState(ctx context.Context, instance *core.WorkflowInstance) (core.WorkflowInstanceState, error)
- func (_m *MockBackend) GetWorkflowTask(ctx context.Context) (*WorkflowTask, error)
- func (_m *MockBackend) Logger() *slog.Logger
- func (_m *MockBackend) Metrics() metrics.Client
- func (_m *MockBackend) RemoveWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance) error
- func (_m *MockBackend) SignalWorkflow(ctx context.Context, instanceID string, event *history.Event) error
- func (_m *MockBackend) Tracer() trace.Tracer
- type Options
- type Stats
- type WorkflowTask
Constants ¶
const TracerName = "go-workflow"
Variables ¶
var ErrInstanceAlreadyExists = errors.New("workflow instance already exists")
var ErrInstanceNotFinished = errors.New("workflow instance is not finished")
var ErrInstanceNotFound = errors.New("workflow instance not found")
Functions ¶
This section is empty.
Types ¶
type ActivityTask ¶ added in v0.17.0
type ActivityTask struct { ID string WorkflowInstance *core.WorkflowInstance Event *history.Event }
ActivityTask represents one activity execution.
type Backend ¶
type Backend interface { // CreateWorkflowInstance creates a new workflow instance CreateWorkflowInstance(ctx context.Context, instance *workflow.Instance, event *history.Event) error // CancelWorkflowInstance cancels a running workflow instance CancelWorkflowInstance(ctx context.Context, instance *workflow.Instance, cancelEvent *history.Event) error // RemoveWorkflowInstance removes a workflow instance RemoveWorkflowInstance(ctx context.Context, instance *workflow.Instance) error // GetWorkflowInstanceState returns the state of the given workflow instance GetWorkflowInstanceState(ctx context.Context, instance *workflow.Instance) (core.WorkflowInstanceState, error) // GetWorkflowInstanceHistory returns the workflow history for the given instance. When lastSequenceID // is given, only events after that event are returned. Otherwise the full history is returned. GetWorkflowInstanceHistory(ctx context.Context, instance *workflow.Instance, lastSequenceID *int64) ([]*history.Event, error) // SignalWorkflow signals a running workflow instance // // If the given instance does not exist, it will return an error SignalWorkflow(ctx context.Context, instanceID string, event *history.Event) error // GetWorkflowTask returns a pending workflow task or nil if there are no pending workflow executions GetWorkflowTask(ctx context.Context) (*WorkflowTask, error) // ExtendWorkflowTask extends the lock of a workflow task ExtendWorkflowTask(ctx context.Context, taskID string, instance *core.WorkflowInstance) error // CompleteWorkflowTask checkpoints a workflow task retrieved using GetWorkflowTask // // This checkpoints the execution. events are new events from the last workflow execution // which will be added to the workflow instance history. workflowEvents are new events for the // completed or other workflow instances. CompleteWorkflowTask( ctx context.Context, task *WorkflowTask, instance *workflow.Instance, state core.WorkflowInstanceState, executedEvents, activityEvents, timerEvents []*history.Event, workflowEvents []history.WorkflowEvent) error // GetActivityTask returns a pending activity task or nil if there are no pending activities GetActivityTask(ctx context.Context) (*ActivityTask, error) // CompleteActivityTask completes an activity task retrieved using GetActivityTask CompleteActivityTask(ctx context.Context, instance *workflow.Instance, activityID string, event *history.Event) error // ExtendActivityTask extends the lock of an activity task ExtendActivityTask(ctx context.Context, activityID string) error // GetStats returns stats about the backend GetStats(ctx context.Context) (*Stats, error) // Logger returns the configured logger for the backend Logger() *slog.Logger // Tracer returns the configured trace provider for the backend Tracer() trace.Tracer // Metrics returns the configured metrics client for the backend Metrics() metrics.Client // Converter returns the configured converter for the backend Converter() converter.Converter // ContextPropagators returns the configured context propagators for the backend ContextPropagators() []workflow.ContextPropagator // Close closes any underlying resources Close() error }
type BackendOption ¶
type BackendOption func(*Options)
func WithContextPropagator ¶ added in v0.14.0
func WithContextPropagator(prop workflow.ContextPropagator) BackendOption
func WithConverter ¶ added in v0.9.0
func WithConverter(converter converter.Converter) BackendOption
func WithLogger ¶ added in v0.0.9
func WithLogger(logger *slog.Logger) BackendOption
func WithMetrics ¶ added in v0.6.0
func WithMetrics(client metrics.Client) BackendOption
func WithStickyTimeout ¶
func WithStickyTimeout(timeout time.Duration) BackendOption
func WithTracerProvider ¶ added in v0.4.0
func WithTracerProvider(tp trace.TracerProvider) BackendOption
type MockBackend ¶
MockBackend is an autogenerated mock type for the Backend type
func NewMockBackend ¶ added in v0.4.0
func NewMockBackend(t mockConstructorTestingTNewMockBackend) *MockBackend
NewMockBackend creates a new instance of MockBackend. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func (*MockBackend) CancelWorkflowInstance ¶
func (_m *MockBackend) CancelWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance, cancelEvent *history.Event) error
CancelWorkflowInstance provides a mock function with given fields: ctx, instance, cancelEvent
func (*MockBackend) Close ¶ added in v0.18.0
func (_m *MockBackend) Close() error
Close provides a mock function with given fields:
func (*MockBackend) CompleteActivityTask ¶
func (_m *MockBackend) CompleteActivityTask(ctx context.Context, instance *core.WorkflowInstance, activityID string, event *history.Event) error
CompleteActivityTask provides a mock function with given fields: ctx, instance, activityID, event
func (*MockBackend) CompleteWorkflowTask ¶
func (_m *MockBackend) CompleteWorkflowTask(ctx context.Context, task *WorkflowTask, instance *core.WorkflowInstance, state core.WorkflowInstanceState, executedEvents []*history.Event, activityEvents []*history.Event, timerEvents []*history.Event, workflowEvents []history.WorkflowEvent) error
CompleteWorkflowTask provides a mock function with given fields: ctx, task, instance, state, executedEvents, activityEvents, timerEvents, workflowEvents
func (*MockBackend) ContextPropagators ¶ added in v0.14.0
func (_m *MockBackend) ContextPropagators() []workflow.ContextPropagator
ContextPropagators provides a mock function with given fields:
func (*MockBackend) Converter ¶ added in v0.9.0
func (_m *MockBackend) Converter() converter.Converter
Converter provides a mock function with given fields:
func (*MockBackend) CreateWorkflowInstance ¶
func (_m *MockBackend) CreateWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance, event *history.Event) error
CreateWorkflowInstance provides a mock function with given fields: ctx, instance, event
func (*MockBackend) ExtendActivityTask ¶
func (_m *MockBackend) ExtendActivityTask(ctx context.Context, activityID string) error
ExtendActivityTask provides a mock function with given fields: ctx, activityID
func (*MockBackend) ExtendWorkflowTask ¶
func (_m *MockBackend) ExtendWorkflowTask(ctx context.Context, taskID string, instance *core.WorkflowInstance) error
ExtendWorkflowTask provides a mock function with given fields: ctx, taskID, instance
func (*MockBackend) GetActivityTask ¶
func (_m *MockBackend) GetActivityTask(ctx context.Context) (*ActivityTask, error)
GetActivityTask provides a mock function with given fields: ctx
func (*MockBackend) GetStats ¶ added in v0.16.1
func (_m *MockBackend) GetStats(ctx context.Context) (*Stats, error)
GetStats provides a mock function with given fields: ctx
func (*MockBackend) GetWorkflowInstanceHistory ¶ added in v0.0.4
func (_m *MockBackend) GetWorkflowInstanceHistory(ctx context.Context, instance *core.WorkflowInstance, lastSequenceID *int64) ([]*history.Event, error)
GetWorkflowInstanceHistory provides a mock function with given fields: ctx, instance, lastSequenceID
func (*MockBackend) GetWorkflowInstanceState ¶ added in v0.0.4
func (_m *MockBackend) GetWorkflowInstanceState(ctx context.Context, instance *core.WorkflowInstance) (core.WorkflowInstanceState, error)
GetWorkflowInstanceState provides a mock function with given fields: ctx, instance
func (*MockBackend) GetWorkflowTask ¶
func (_m *MockBackend) GetWorkflowTask(ctx context.Context) (*WorkflowTask, error)
GetWorkflowTask provides a mock function with given fields: ctx
func (*MockBackend) Logger ¶ added in v0.0.9
func (_m *MockBackend) Logger() *slog.Logger
Logger provides a mock function with given fields:
func (*MockBackend) Metrics ¶ added in v0.6.0
func (_m *MockBackend) Metrics() metrics.Client
Metrics provides a mock function with given fields:
func (*MockBackend) RemoveWorkflowInstance ¶ added in v0.12.0
func (_m *MockBackend) RemoveWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance) error
RemoveWorkflowInstance provides a mock function with given fields: ctx, instance
func (*MockBackend) SignalWorkflow ¶
func (_m *MockBackend) SignalWorkflow(ctx context.Context, instanceID string, event *history.Event) error
SignalWorkflow provides a mock function with given fields: ctx, instanceID, event
func (*MockBackend) Tracer ¶ added in v0.4.0
func (_m *MockBackend) Tracer() trace.Tracer
Tracer provides a mock function with given fields:
type Options ¶
type Options struct { Logger *slog.Logger Metrics metrics.Client TracerProvider trace.TracerProvider // Converter is the converter to use for serializing and deserializing inputs and results. If not explicitly set // converter.DefaultConverter is used. Converter converter.Converter // ContextPropagators is a list of context propagators to use for passing context into workflows and activities. ContextPropagators []workflow.ContextPropagator StickyTimeout time.Duration // WorkflowLockTimeout determines how long a workflow task can be locked for. If the workflow task is not completed // by that timeframe, it's considered abandoned and another worker might pick it up. // // For long running workflow tasks, combine this with heartbearts. WorkflowLockTimeout time.Duration // ActivityLockTimeout determines how long an activity task can be locked for. If the activity task is not completed // by that timeframe, it's considered abandoned and another worker might pick it up ActivityLockTimeout time.Duration }
var DefaultOptions Options = Options{ StickyTimeout: 30 * time.Second, WorkflowLockTimeout: time.Minute, ActivityLockTimeout: time.Minute * 2, Logger: slog.Default(), Metrics: mi.NewNoopMetricsClient(), TracerProvider: trace.NewNoopTracerProvider(), Converter: converter.DefaultConverter, ContextPropagators: []workflow.ContextPropagator{&tracing.TracingContextPropagator{}}, }
func ApplyOptions ¶
func ApplyOptions(opts ...BackendOption) Options
type Stats ¶ added in v0.16.1
type Stats struct { ActiveWorkflowInstances int64 // PendingWorkflowTasks are the number of workflow tasks that are currently in the queue, // waiting to be processed by a worker PendingWorkflowTasks int64 // PendingActivities are the number of activities that are currently in the queue, // waiting to be processed by a worker PendingActivities int64 }
type WorkflowTask ¶ added in v0.17.0
type WorkflowTask struct { // ID is an identifier for this task. It's set by the backend ID string // WorkflowInstance is the workflow instance that this task is for WorkflowInstance *core.WorkflowInstance WorkflowInstanceState core.WorkflowInstanceState Metadata *metadata.WorkflowMetadata // LastSequenceID is the sequence ID of the newest event in the workflow instances's history LastSequenceID int64 // NewEvents are new events since the last task execution NewEvents []*history.Event // Backend specific data, only the producer of the task should rely on this. CustomData any }
WorkflowTask represents work for one workflow execution slice.