backend

package
v0.19.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 27, 2024 License: MIT Imports: 16 Imported by: 30

Documentation

Index

Constants

View Source
const TracerName = "go-workflow"

Variables

View Source
var DefaultRemovalOptions = RemovalOptions{
	BatchSize: 100,
}
View Source
var ErrInstanceAlreadyExists = errors.New("workflow instance already exists")
View Source
var ErrInstanceNotFinished = errors.New("workflow instance is not finished")
View Source
var ErrInstanceNotFound = errors.New("workflow instance not found")

Functions

This section is empty.

Types

type ActivityTask added in v0.17.0

type ActivityTask struct {
	ID string

	// ActivityID is the ID of the activity event
	ActivityID string

	Queue workflow.Queue

	WorkflowInstance *core.WorkflowInstance

	Event *history.Event
}

ActivityTask represents one activity execution.

type Backend

type Backend interface {
	// CreateWorkflowInstance creates a new workflow instance
	CreateWorkflowInstance(ctx context.Context, instance *workflow.Instance, event *history.Event) error

	// CancelWorkflowInstance cancels a running workflow instance
	CancelWorkflowInstance(ctx context.Context, instance *workflow.Instance, cancelEvent *history.Event) error

	// RemoveWorkflowInstance removes a workflow instance
	RemoveWorkflowInstance(ctx context.Context, instance *workflow.Instance) error

	// RemoveWorkflowInstances removes multiple workflow instances
	RemoveWorkflowInstances(ctx context.Context, options ...RemovalOption) error

	// GetWorkflowInstanceState returns the state of the given workflow instance
	GetWorkflowInstanceState(ctx context.Context, instance *workflow.Instance) (core.WorkflowInstanceState, error)

	// GetWorkflowInstanceHistory returns the workflow history for the given instance. When lastSequenceID
	// is given, only events after that event are returned. Otherwise the full history is returned.
	GetWorkflowInstanceHistory(ctx context.Context, instance *workflow.Instance, lastSequenceID *int64) ([]*history.Event, error)

	// SignalWorkflow signals a running workflow instance
	//
	// If the given instance does not exist, it will return an error
	SignalWorkflow(ctx context.Context, instanceID string, event *history.Event) error

	// PrepareWorkflowQueues prepares workflow queues for later consumption using this backend instane
	PrepareWorkflowQueues(ctx context.Context, queues []workflow.Queue) error

	// PrepareActivityQueues prepares activity queues for later consumption using this backend instance
	PrepareActivityQueues(ctx context.Context, queues []workflow.Queue) error

	// GetWorkflowTask returns a pending workflow task or nil if there are no pending workflow executions
	GetWorkflowTask(ctx context.Context, queues []workflow.Queue) (*WorkflowTask, error)

	// ExtendWorkflowTask extends the lock of a workflow task
	ExtendWorkflowTask(ctx context.Context, task *WorkflowTask) error

	// CompleteWorkflowTask checkpoints a workflow task retrieved using GetWorkflowTask
	//
	// This checkpoints the execution. events are new events from the last workflow execution
	// which will be added to the workflow instance history. workflowEvents are new events for the
	// completed or other workflow instances.
	CompleteWorkflowTask(
		ctx context.Context, task *WorkflowTask, state core.WorkflowInstanceState,
		executedEvents, activityEvents, timerEvents []*history.Event, workflowEvents []*history.WorkflowEvent) error

	// GetActivityTask returns a pending activity task or nil if there are no pending activities
	GetActivityTask(ctx context.Context, queues []workflow.Queue) (*ActivityTask, error)

	// ExtendActivityTask extends the lock of an activity task
	ExtendActivityTask(ctx context.Context, task *ActivityTask) error

	// CompleteActivityTask completes an activity task retrieved using GetActivityTask
	CompleteActivityTask(ctx context.Context, task *ActivityTask, result *history.Event) error

	// GetStats returns stats about the backend
	GetStats(ctx context.Context) (*Stats, error)

	// Tracer returns the configured trace provider for the backend
	Tracer() trace.Tracer

	// Metrics returns the configured metrics client for the backend
	Metrics() metrics.Client

	// Options returns the configured options for the backend
	Options() *Options

	// Close closes any underlying resources
	Close() error

	// FeatureSupported returns true if the given feature is supported by the backend
	FeatureSupported(feature Feature) bool
}

type BackendOption

type BackendOption func(*Options)

func WithContextPropagator added in v0.14.0

func WithContextPropagator(prop workflow.ContextPropagator) BackendOption

func WithConverter added in v0.9.0

func WithConverter(converter converter.Converter) BackendOption

func WithLogger added in v0.0.9

func WithLogger(logger *slog.Logger) BackendOption

func WithMetrics added in v0.6.0

func WithMetrics(client metrics.Client) BackendOption

func WithRemoveContinuedAsNewInstances added in v0.19.0

func WithRemoveContinuedAsNewInstances() BackendOption

func WithStickyTimeout

func WithStickyTimeout(timeout time.Duration) BackendOption

func WithTracerProvider added in v0.4.0

func WithTracerProvider(tp trace.TracerProvider) BackendOption

type ErrNotSupported added in v0.19.0

type ErrNotSupported struct {
	Message string
}

func (ErrNotSupported) Error added in v0.19.0

func (e ErrNotSupported) Error() string

type Feature added in v0.19.0

type Feature int
const (
	Feature_Expiration Feature = iota
)

type MockBackend

type MockBackend struct {
	mock.Mock
}

MockBackend is an autogenerated mock type for the Backend type

func NewMockBackend added in v0.4.0

func NewMockBackend(t mockConstructorTestingTNewMockBackend) *MockBackend

NewMockBackend creates a new instance of MockBackend. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.

func (*MockBackend) CancelWorkflowInstance

func (_m *MockBackend) CancelWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance, cancelEvent *history.Event) error

CancelWorkflowInstance provides a mock function with given fields: ctx, instance, cancelEvent

func (*MockBackend) Close added in v0.18.0

func (_m *MockBackend) Close() error

Close provides a mock function with given fields:

func (*MockBackend) CompleteActivityTask

func (_m *MockBackend) CompleteActivityTask(ctx context.Context, task *ActivityTask, result *history.Event) error

CompleteActivityTask provides a mock function with given fields: ctx, task, result

func (*MockBackend) CompleteWorkflowTask

func (_m *MockBackend) CompleteWorkflowTask(ctx context.Context, task *WorkflowTask, state core.WorkflowInstanceState, executedEvents []*history.Event, activityEvents []*history.Event, timerEvents []*history.Event, workflowEvents []*history.WorkflowEvent) error

CompleteWorkflowTask provides a mock function with given fields: ctx, task, state, executedEvents, activityEvents, timerEvents, workflowEvents

func (*MockBackend) CreateWorkflowInstance

func (_m *MockBackend) CreateWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance, event *history.Event) error

CreateWorkflowInstance provides a mock function with given fields: ctx, instance, event

func (*MockBackend) ExtendActivityTask

func (_m *MockBackend) ExtendActivityTask(ctx context.Context, task *ActivityTask) error

ExtendActivityTask provides a mock function with given fields: ctx, task

func (*MockBackend) ExtendWorkflowTask

func (_m *MockBackend) ExtendWorkflowTask(ctx context.Context, task *WorkflowTask) error

ExtendWorkflowTask provides a mock function with given fields: ctx, task

func (*MockBackend) FeatureSupported added in v0.19.0

func (_m *MockBackend) FeatureSupported(feature Feature) bool

FeatureSupported provides a mock function with given fields: feature

func (*MockBackend) GetActivityTask

func (_m *MockBackend) GetActivityTask(ctx context.Context, queues []core.Queue) (*ActivityTask, error)

GetActivityTask provides a mock function with given fields: ctx, queues

func (*MockBackend) GetStats added in v0.16.1

func (_m *MockBackend) GetStats(ctx context.Context) (*Stats, error)

GetStats provides a mock function with given fields: ctx

func (*MockBackend) GetWorkflowInstanceHistory added in v0.0.4

func (_m *MockBackend) GetWorkflowInstanceHistory(ctx context.Context, instance *core.WorkflowInstance, lastSequenceID *int64) ([]*history.Event, error)

GetWorkflowInstanceHistory provides a mock function with given fields: ctx, instance, lastSequenceID

func (*MockBackend) GetWorkflowInstanceState added in v0.0.4

func (_m *MockBackend) GetWorkflowInstanceState(ctx context.Context, instance *core.WorkflowInstance) (core.WorkflowInstanceState, error)

GetWorkflowInstanceState provides a mock function with given fields: ctx, instance

func (*MockBackend) GetWorkflowTask

func (_m *MockBackend) GetWorkflowTask(ctx context.Context, queues []core.Queue) (*WorkflowTask, error)

GetWorkflowTask provides a mock function with given fields: ctx, queues

func (*MockBackend) Metrics added in v0.6.0

func (_m *MockBackend) Metrics() metrics.Client

Metrics provides a mock function with given fields:

func (*MockBackend) Options added in v0.19.0

func (_m *MockBackend) Options() *Options

Options provides a mock function with given fields:

func (*MockBackend) PrepareActivityQueues added in v0.19.0

func (_m *MockBackend) PrepareActivityQueues(ctx context.Context, queues []core.Queue) error

PrepareActivityQueues provides a mock function with given fields: ctx, queues

func (*MockBackend) PrepareWorkflowQueues added in v0.19.0

func (_m *MockBackend) PrepareWorkflowQueues(ctx context.Context, queues []core.Queue) error

PrepareWorkflowQueues provides a mock function with given fields: ctx, queues

func (*MockBackend) RemoveWorkflowInstance added in v0.12.0

func (_m *MockBackend) RemoveWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance) error

RemoveWorkflowInstance provides a mock function with given fields: ctx, instance

func (*MockBackend) RemoveWorkflowInstances added in v0.19.0

func (_m *MockBackend) RemoveWorkflowInstances(ctx context.Context, options ...RemovalOption) error

RemoveWorkflowInstances provides a mock function with given fields: ctx, options

func (*MockBackend) SignalWorkflow

func (_m *MockBackend) SignalWorkflow(ctx context.Context, instanceID string, event *history.Event) error

SignalWorkflow provides a mock function with given fields: ctx, instanceID, event

func (*MockBackend) Tracer added in v0.4.0

func (_m *MockBackend) Tracer() trace.Tracer

Tracer provides a mock function with given fields:

type Options

type Options struct {
	Logger *slog.Logger

	Metrics metrics.Client

	TracerProvider trace.TracerProvider

	// Converter is the converter to use for serializing and deserializing inputs and results. If not explicitly set
	// converter.DefaultConverter is used.
	Converter converter.Converter

	// ContextPropagators is a list of context propagators to use for passing context into workflows and activities.
	ContextPropagators []workflow.ContextPropagator

	StickyTimeout time.Duration

	// WorkflowLockTimeout determines how long a workflow task can be locked for. If the workflow task is not completed
	// by that timeframe, it's considered abandoned and another worker might pick it up.
	//
	// For long running workflow tasks, combine this with heartbearts.
	WorkflowLockTimeout time.Duration

	// ActivityLockTimeout determines how long an activity task can be locked for. If the activity task is not completed
	// by that timeframe, it's considered abandoned and another worker might pick it up
	ActivityLockTimeout time.Duration

	// RemoveContinuedAsNewInstances determines whether instances that were completed using ContinueAsNew should be
	// removed immediately, including their history. If set to false, the instance will be removed after the configured
	// retention period or never.
	RemoveContinuedAsNewInstances bool
}
var DefaultOptions Options = Options{
	StickyTimeout:       30 * time.Second,
	WorkflowLockTimeout: time.Minute,
	ActivityLockTimeout: time.Minute * 2,

	Logger:         slog.Default(),
	Metrics:        mi.NewNoopMetricsClient(),
	TracerProvider: noop.NewTracerProvider(),
	Converter:      converter.DefaultConverter,

	ContextPropagators: []workflow.ContextPropagator{&propagators.TracingContextPropagator{}},

	RemoveContinuedAsNewInstances: false,
}

func ApplyOptions

func ApplyOptions(opts ...BackendOption) *Options

type RemovalOption added in v0.19.0

type RemovalOption func(o *RemovalOptions)

func RemoveFinishedBatchSize added in v0.19.0

func RemoveFinishedBatchSize(size int) RemovalOption

func RemoveFinishedBefore added in v0.19.0

func RemoveFinishedBefore(t time.Time) RemovalOption

type RemovalOptions added in v0.19.0

type RemovalOptions struct {
	FinishedBefore time.Time
	BatchSize      int
}

type Stats added in v0.16.1

type Stats struct {
	ActiveWorkflowInstances int64

	// PendingActivities are the number of activities that are currently in the queue,
	// waiting to be processed by a worker
	PendingActivityTasks map[workflow.Queue]int64

	// PendingWorkflowTasks are the number of workflow tasks that are currently in the queue,
	// waiting to be processed by a worker
	PendingWorkflowTasks map[workflow.Queue]int64
}

type WorkflowTask added in v0.17.0

type WorkflowTask struct {
	// ID is an identifier for this task. It's set by the backend
	ID string

	// Queue is the queue of the workflow instance
	Queue workflow.Queue

	// WorkflowInstance is the workflow instance that this task is for
	WorkflowInstance *core.WorkflowInstance

	// WorkflowInstanceState is the state of the workflow instance when the task was dequeued
	WorkflowInstanceState core.WorkflowInstanceState

	// Metadata is the metadata of the workflow instance
	Metadata *metadata.WorkflowMetadata

	// LastSequenceID is the sequence ID of the newest event in the workflow instances's history
	LastSequenceID int64

	// NewEvents are new events since the last task execution
	NewEvents []*history.Event

	// Backend specific data, only the producer of the task should rely on this.
	CustomData any
}

WorkflowTask represents work for one workflow execution slice.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL