update

package
v1.26.2-121.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 25, 2024 License: MIT Imports: 25 Imported by: 0

README

Documentation

Index

Constants

View Source
const ProtocolV1 = protocol.Type("temporal.api.update.v1")

Variables

View Source
var (
	WorkflowUpdateAbortedErr = serviceerror.NewUnavailable("Workflow Update was aborted.")
)

Functions

This section is empty.

Types

type AbortReason added in v1.24.0

type AbortReason uint32
const (
	AbortReasonRegistryCleared AbortReason = iota + 1
	AbortReasonWorkflowCompleted
	AbortReasonWorkflowContinuing
)

func (AbortReason) Error added in v1.24.0

func (r AbortReason) Error() error

Error returns an error which will be set to Update futures while aborting Update.

type EventStore added in v1.21.0

type EventStore interface {
	effect.Controller

	// AddWorkflowExecutionUpdateAcceptedEvent writes an Update accepted
	// event. The data may not be durable when this function returns.
	AddWorkflowExecutionUpdateAcceptedEvent(
		updateID string,
		acceptedRequestMessageId string,
		acceptedRequestSequencingEventId int64,
		acceptedRequest *updatepb.Request,
	) (*historypb.HistoryEvent, error)

	// AddWorkflowExecutionUpdateCompletedEvent writes an Update completed
	// event. The data may not be durable when this function returns.
	AddWorkflowExecutionUpdateCompletedEvent(
		acceptedEventID int64,
		resp *updatepb.Response,
	) (*historypb.HistoryEvent, error)

	// CanAddEvent returns true if an event can be added to the EventStore.
	CanAddEvent() bool
}

EventStore is the interface that an Update needs to read and write events and to be notified when buffered writes have been flushed. It is the expectation of this code that writes to EventStore will return before the data has been made durable. Callbacks attached to the EventStore via OnAfterCommit and OnAfterRollback *must* be called after the EventStore state is successfully written or is discarded.

type Option added in v1.23.0

type Option func(*registry)

func WithInFlightLimit added in v1.21.0

func WithInFlightLimit(f func() int) Option

WithInFlightLimit provides an optional limit to the number of incomplete Updates that a Registry instance will allow.

func WithLogger added in v1.21.0

func WithLogger(l log.Logger) Option

WithLogger sets the log.Logger to be used by Registry and its Updates.

func WithMetrics added in v1.21.0

func WithMetrics(m metrics.Handler) Option

WithMetrics sets the metrics.Handler to be used by Registry and its Updates.

func WithTotalLimit added in v1.21.0

func WithTotalLimit(f func() int) Option

WithTotalLimit provides an optional limit to the total number of Updates for workflow run.

func WithTracerProvider added in v1.21.0

func WithTracerProvider(t trace.TracerProvider) Option

WithTracerProvider sets the trace.TracerProvider (and by extension the trace.Tracer) to be used by Registry and its Updates.

type Registry

type Registry interface {
	// FindOrCreate finds an existing Update or creates a new one. The second
	// return value (bool) indicates whether the Update returned already
	// existed and was found (true) or was not found and has been newly
	// created (false).
	FindOrCreate(ctx context.Context, updateID string) (_ *Update, alreadyExisted bool, _ error)

	// Find an existing Update in this Registry.
	// Returns nil if Update doesn't exist.
	Find(ctx context.Context, updateID string) *Update

	// TryResurrect tries to resurrect the Update from the protocol message,
	// whose body contains an Acceptance or Rejection message.
	// It returns an error if some unexpected error happened, but if there is not
	// enough data in the message, it just returns a nil Update.
	// If the Update was successfully resurrected, it is added to the registry in stateAdmitted.
	TryResurrect(ctx context.Context, acptOrRejMsg *protocolpb.Message) (*Update, error)

	// HasOutgoingMessages returns true if the Registry has any Updates
	// for which outgoing message can be generated.
	// If includeAlreadySent is set to true, then it will return true
	// even if an Update message was already sent but not processed by worker.
	HasOutgoingMessages(includeAlreadySent bool) bool

	// Send returns messages for all Updates that need to be sent to the worker.
	// If includeAlreadySent is set to true, then messages will be created even
	// for Updates which were already sent but not processed by worker.
	Send(ctx context.Context, includeAlreadySent bool, workflowTaskStartedEventID int64) []*protocolpb.Message

	// RejectUnprocessed rejects all Updates that are waiting for a workflow task to be completed.
	// This method should be called after all messages from worker are handled to make sure
	// that worker processed (rejected or accepted) all Updates that were delivered on the workflow task.
	RejectUnprocessed(ctx context.Context, effects effect.Controller) ([]string, error)

	// Abort all incomplete Updates in the Registry.
	Abort(reason AbortReason)

	// Clear the Registry and abort all Updates.
	Clear()

	// Len observes the number of incomplete (not completed or rejected) Updates in this Registry.
	Len() int

	// GetSize returns approximate size of the Registry in bytes.
	GetSize() int

	// FailoverVersion of a Mutable State at the time of Registry creation.
	FailoverVersion() int64
}

Registry maintains a set of Updates that have been admitted to run against a workflow execution.

func NewRegistry

func NewRegistry(
	store UpdateStore,
	opts ...Option,
) Registry

type Status added in v1.24.0

type Status struct {
	// The most advanced stage reached by Update.
	Stage enumspb.UpdateWorkflowExecutionLifecycleStage
	// Outcome of Update if it is completed or rejected.
	Outcome *updatepb.Outcome
}

Status describes the current status of Update from Registry.

type Update

type Update struct {
	// contains filtered or unexported fields
}

Update docs are at /docs/architecture/workflow-update.md.

func New added in v1.21.0

func New(id string, opts ...updateOpt) *Update

New creates a new Update instance with the provided ID and set of options.

func (*Update) Admit added in v1.24.0

func (u *Update) Admit(
	req *updatepb.Request,
	eventStore EventStore,
) error

Admit works if the Update is in any state, but if the state is anything other than stateCreated then it just early returns a nil error. This effectively gives us Update request deduplication by updateID. If the Update is in stateCreated then it builds a protocolpb.Message that will be sent when Send is called.

func (*Update) GetSize added in v1.24.0

func (u *Update) GetSize() int

func (*Update) OnProtocolMessage added in v1.24.0

func (u *Update) OnProtocolMessage(
	protocolMsg *protocolpb.Message,
	eventStore EventStore,
) error

OnProtocolMessage delivers a message to the Update state machine. The Body field of *protocolpb.Message parameter is expected to be one of *updatepb.Response, *updatepb.Rejection, *updatepb.Acceptance. Writes to the EventStore occur synchronously but externally observable effects on this Update (e.g., emitting an Outcome or an Accepted) are registered with the EventStore to be applied after the durable Updates are committed. If the EventStore rolls back its effects, this state machine does the same.

If you modify the state machine, please update the diagram in /docs/architecture/workflow-update.md.

func (*Update) Send added in v1.24.0

func (u *Update) Send(
	includeAlreadySent bool,
	sequencingID *protocolpb.Message_EventId,
) *protocolpb.Message

Send move Update from stateAdmitted to stateSent and returns the message to be sent to worker. If Update is not in expected stateAdmitted, Send does nothing and returns nil. If includeAlreadySent is set to true, then Send will return a message even if Update was already sent but not processed by worker. If Update lacks a request, then return nil; the request will be communicated to the worker via an UpdateAdmitted event. Note: once Update moved to stateSent it never moves back to stateAdmitted.

func (*Update) WaitLifecycleStage added in v1.23.0

func (u *Update) WaitLifecycleStage(
	ctx context.Context,
	waitStage enumspb.UpdateWorkflowExecutionLifecycleStage,
	softTimeout time.Duration,
) (*Status, error)

WaitLifecycleStage waits until the Update has reached waitStage or a timeout. If the Update reaches waitStage with no timeout, the outcome (if any) is returned. If there is a timeout due to the supplied soft timeout, then the most advanced stage known to have been reached is returned together with an empty outcome. If there is a timeout due to supplied context deadline expiry, then the error is returned. If waitStage is UNSPECIFIED, current, reached Status is returned immediately (even if ctx is expired).

type UpdateStore added in v1.21.0

type UpdateStore interface {
	VisitUpdates(visitor func(updID string, updInfo *persistencespb.UpdateInfo))
	GetUpdateOutcome(ctx context.Context, updateID string) (*updatepb.Outcome, error)
	GetCurrentVersion() int64
	IsWorkflowExecutionRunning() bool
}

UpdateStore represents the update package's requirements for reading Updates from the store.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL