backend

package
v0.18.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 5, 2023 License: MIT Imports: 14 Imported by: 30

Documentation

Index

Constants

View Source
const TracerName = "go-workflow"

Variables

View Source
var ErrInstanceAlreadyExists = errors.New("workflow instance already exists")
View Source
var ErrInstanceNotFinished = errors.New("workflow instance is not finished")
View Source
var ErrInstanceNotFound = errors.New("workflow instance not found")

Functions

This section is empty.

Types

type ActivityTask added in v0.17.0

type ActivityTask struct {
	ID string

	WorkflowInstance *core.WorkflowInstance

	Event *history.Event
}

ActivityTask represents one activity execution.

type Backend

type Backend interface {
	// CreateWorkflowInstance creates a new workflow instance
	CreateWorkflowInstance(ctx context.Context, instance *workflow.Instance, event *history.Event) error

	// CancelWorkflowInstance cancels a running workflow instance
	CancelWorkflowInstance(ctx context.Context, instance *workflow.Instance, cancelEvent *history.Event) error

	// RemoveWorkflowInstance removes a workflow instance
	RemoveWorkflowInstance(ctx context.Context, instance *workflow.Instance) error

	// GetWorkflowInstanceState returns the state of the given workflow instance
	GetWorkflowInstanceState(ctx context.Context, instance *workflow.Instance) (core.WorkflowInstanceState, error)

	// GetWorkflowInstanceHistory returns the workflow history for the given instance. When lastSequenceID
	// is given, only events after that event are returned. Otherwise the full history is returned.
	GetWorkflowInstanceHistory(ctx context.Context, instance *workflow.Instance, lastSequenceID *int64) ([]*history.Event, error)

	// SignalWorkflow signals a running workflow instance
	//
	// If the given instance does not exist, it will return an error
	SignalWorkflow(ctx context.Context, instanceID string, event *history.Event) error

	// GetWorkflowTask returns a pending workflow task or nil if there are no pending workflow executions
	GetWorkflowTask(ctx context.Context) (*WorkflowTask, error)

	// ExtendWorkflowTask extends the lock of a workflow task
	ExtendWorkflowTask(ctx context.Context, taskID string, instance *core.WorkflowInstance) error

	// CompleteWorkflowTask checkpoints a workflow task retrieved using GetWorkflowTask
	//
	// This checkpoints the execution. events are new events from the last workflow execution
	// which will be added to the workflow instance history. workflowEvents are new events for the
	// completed or other workflow instances.
	CompleteWorkflowTask(
		ctx context.Context, task *WorkflowTask, instance *workflow.Instance, state core.WorkflowInstanceState,
		executedEvents, activityEvents, timerEvents []*history.Event, workflowEvents []history.WorkflowEvent) error

	// GetActivityTask returns a pending activity task or nil if there are no pending activities
	GetActivityTask(ctx context.Context) (*ActivityTask, error)

	// CompleteActivityTask completes an activity task retrieved using GetActivityTask
	CompleteActivityTask(ctx context.Context, instance *workflow.Instance, activityID string, event *history.Event) error

	// ExtendActivityTask extends the lock of an activity task
	ExtendActivityTask(ctx context.Context, activityID string) error

	// GetStats returns stats about the backend
	GetStats(ctx context.Context) (*Stats, error)

	// Logger returns the configured logger for the backend
	Logger() *slog.Logger

	// Tracer returns the configured trace provider for the backend
	Tracer() trace.Tracer

	// Metrics returns the configured metrics client for the backend
	Metrics() metrics.Client

	// Converter returns the configured converter for the backend
	Converter() converter.Converter

	// ContextPropagators returns the configured context propagators for the backend
	ContextPropagators() []workflow.ContextPropagator

	// Close closes any underlying resources
	Close() error
}

type BackendOption

type BackendOption func(*Options)

func WithContextPropagator added in v0.14.0

func WithContextPropagator(prop workflow.ContextPropagator) BackendOption

func WithConverter added in v0.9.0

func WithConverter(converter converter.Converter) BackendOption

func WithLogger added in v0.0.9

func WithLogger(logger *slog.Logger) BackendOption

func WithMetrics added in v0.6.0

func WithMetrics(client metrics.Client) BackendOption

func WithStickyTimeout

func WithStickyTimeout(timeout time.Duration) BackendOption

func WithTracerProvider added in v0.4.0

func WithTracerProvider(tp trace.TracerProvider) BackendOption

type MockBackend

type MockBackend struct {
	mock.Mock
}

MockBackend is an autogenerated mock type for the Backend type

func NewMockBackend added in v0.4.0

func NewMockBackend(t mockConstructorTestingTNewMockBackend) *MockBackend

NewMockBackend creates a new instance of MockBackend. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.

func (*MockBackend) CancelWorkflowInstance

func (_m *MockBackend) CancelWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance, cancelEvent *history.Event) error

CancelWorkflowInstance provides a mock function with given fields: ctx, instance, cancelEvent

func (*MockBackend) Close added in v0.18.0

func (_m *MockBackend) Close() error

Close provides a mock function with given fields:

func (*MockBackend) CompleteActivityTask

func (_m *MockBackend) CompleteActivityTask(ctx context.Context, instance *core.WorkflowInstance, activityID string, event *history.Event) error

CompleteActivityTask provides a mock function with given fields: ctx, instance, activityID, event

func (*MockBackend) CompleteWorkflowTask

func (_m *MockBackend) CompleteWorkflowTask(ctx context.Context, task *WorkflowTask, instance *core.WorkflowInstance, state core.WorkflowInstanceState, executedEvents []*history.Event, activityEvents []*history.Event, timerEvents []*history.Event, workflowEvents []history.WorkflowEvent) error

CompleteWorkflowTask provides a mock function with given fields: ctx, task, instance, state, executedEvents, activityEvents, timerEvents, workflowEvents

func (*MockBackend) ContextPropagators added in v0.14.0

func (_m *MockBackend) ContextPropagators() []workflow.ContextPropagator

ContextPropagators provides a mock function with given fields:

func (*MockBackend) Converter added in v0.9.0

func (_m *MockBackend) Converter() converter.Converter

Converter provides a mock function with given fields:

func (*MockBackend) CreateWorkflowInstance

func (_m *MockBackend) CreateWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance, event *history.Event) error

CreateWorkflowInstance provides a mock function with given fields: ctx, instance, event

func (*MockBackend) ExtendActivityTask

func (_m *MockBackend) ExtendActivityTask(ctx context.Context, activityID string) error

ExtendActivityTask provides a mock function with given fields: ctx, activityID

func (*MockBackend) ExtendWorkflowTask

func (_m *MockBackend) ExtendWorkflowTask(ctx context.Context, taskID string, instance *core.WorkflowInstance) error

ExtendWorkflowTask provides a mock function with given fields: ctx, taskID, instance

func (*MockBackend) GetActivityTask

func (_m *MockBackend) GetActivityTask(ctx context.Context) (*ActivityTask, error)

GetActivityTask provides a mock function with given fields: ctx

func (*MockBackend) GetStats added in v0.16.1

func (_m *MockBackend) GetStats(ctx context.Context) (*Stats, error)

GetStats provides a mock function with given fields: ctx

func (*MockBackend) GetWorkflowInstanceHistory added in v0.0.4

func (_m *MockBackend) GetWorkflowInstanceHistory(ctx context.Context, instance *core.WorkflowInstance, lastSequenceID *int64) ([]*history.Event, error)

GetWorkflowInstanceHistory provides a mock function with given fields: ctx, instance, lastSequenceID

func (*MockBackend) GetWorkflowInstanceState added in v0.0.4

func (_m *MockBackend) GetWorkflowInstanceState(ctx context.Context, instance *core.WorkflowInstance) (core.WorkflowInstanceState, error)

GetWorkflowInstanceState provides a mock function with given fields: ctx, instance

func (*MockBackend) GetWorkflowTask

func (_m *MockBackend) GetWorkflowTask(ctx context.Context) (*WorkflowTask, error)

GetWorkflowTask provides a mock function with given fields: ctx

func (*MockBackend) Logger added in v0.0.9

func (_m *MockBackend) Logger() *slog.Logger

Logger provides a mock function with given fields:

func (*MockBackend) Metrics added in v0.6.0

func (_m *MockBackend) Metrics() metrics.Client

Metrics provides a mock function with given fields:

func (*MockBackend) RemoveWorkflowInstance added in v0.12.0

func (_m *MockBackend) RemoveWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance) error

RemoveWorkflowInstance provides a mock function with given fields: ctx, instance

func (*MockBackend) SignalWorkflow

func (_m *MockBackend) SignalWorkflow(ctx context.Context, instanceID string, event *history.Event) error

SignalWorkflow provides a mock function with given fields: ctx, instanceID, event

func (*MockBackend) Tracer added in v0.4.0

func (_m *MockBackend) Tracer() trace.Tracer

Tracer provides a mock function with given fields:

type Options

type Options struct {
	Logger *slog.Logger

	Metrics metrics.Client

	TracerProvider trace.TracerProvider

	// Converter is the converter to use for serializing and deserializing inputs and results. If not explicitly set
	// converter.DefaultConverter is used.
	Converter converter.Converter

	// ContextPropagators is a list of context propagators to use for passing context into workflows and activities.
	ContextPropagators []workflow.ContextPropagator

	StickyTimeout time.Duration

	// WorkflowLockTimeout determines how long a workflow task can be locked for. If the workflow task is not completed
	// by that timeframe, it's considered abandoned and another worker might pick it up.
	//
	// For long running workflow tasks, combine this with heartbearts.
	WorkflowLockTimeout time.Duration

	// ActivityLockTimeout determines how long an activity task can be locked for. If the activity task is not completed
	// by that timeframe, it's considered abandoned and another worker might pick it up
	ActivityLockTimeout time.Duration
}
var DefaultOptions Options = Options{
	StickyTimeout:       30 * time.Second,
	WorkflowLockTimeout: time.Minute,
	ActivityLockTimeout: time.Minute * 2,

	Logger:         slog.Default(),
	Metrics:        mi.NewNoopMetricsClient(),
	TracerProvider: trace.NewNoopTracerProvider(),
	Converter:      converter.DefaultConverter,

	ContextPropagators: []workflow.ContextPropagator{&tracing.TracingContextPropagator{}},
}

func ApplyOptions

func ApplyOptions(opts ...BackendOption) Options

type Stats added in v0.16.1

type Stats struct {
	ActiveWorkflowInstances int64

	// PendingWorkflowTasks are the number of workflow tasks that are currently in the queue,
	// waiting to be processed by a worker
	PendingWorkflowTasks int64

	// PendingActivities are the number of activities that are currently in the queue,
	// waiting to be processed by a worker
	PendingActivities int64
}

type WorkflowTask added in v0.17.0

type WorkflowTask struct {
	// ID is an identifier for this task. It's set by the backend
	ID string

	// WorkflowInstance is the workflow instance that this task is for
	WorkflowInstance *core.WorkflowInstance

	WorkflowInstanceState core.WorkflowInstanceState

	Metadata *metadata.WorkflowMetadata

	// LastSequenceID is the sequence ID of the newest event in the workflow instances's history
	LastSequenceID int64

	// NewEvents are new events since the last task execution
	NewEvents []*history.Event

	// Backend specific data, only the producer of the task should rely on this.
	CustomData any
}

WorkflowTask represents work for one workflow execution slice.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL