update

package
v1.24.0-m2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2024 License: MIT Imports: 23 Imported by: 0

README

Update State Machine

The diagram below depicts the state machine implemented in update.go.

stateDiagram-v2
    [*] --> Admitted: New

    Admitted --> ProvisionallyRequested: onRequestMsg
    ProvisionallyRequested --> Requested: commit

    Requested --> ProvisionallyAccepted: onAcceptanceMsg
    ProvisionallyAccepted --> Accepted: commit

    Requested --> ProvisionallyCompleted: onRejectionMsg
    ProvisionallyCompleted --> Completed: commit

    ProvisionallyAccepted --> ProvisionallyCompleted: onResponse
    Accepted --> ProvisionallyCompleted: onResponse

Documentation

Index

Constants

View Source
const ProtocolV1 = protocol.Type("temporal.api.update.v1")

Variables

This section is empty.

Functions

This section is empty.

Types

type CancelReason

type CancelReason uint32
const (
	CancelReasonWorkflowCompleted CancelReason = iota + 1
	CancelReasonWorkflowTerminated
)

func (CancelReason) RejectionFailure

func (r CancelReason) RejectionFailure() *failurepb.Failure

type EventStore added in v1.21.0

type EventStore interface {
	effect.Controller

	// AddWorkflowExecutionUpdateAcceptedEvent writes an update accepted
	// event. The data may not be durable when this function returns.
	AddWorkflowExecutionUpdateAcceptedEvent(
		updateID string,
		acceptedRequestMessageId string,
		acceptedRequestSequencingEventId int64,
		acceptedRequest *updatepb.Request,
	) (*historypb.HistoryEvent, error)

	// AddWorkflowExecutionUpdateCompletedEvent writes an update completed
	// event. The data may not be durable when this function returns.
	AddWorkflowExecutionUpdateCompletedEvent(
		acceptedEventID int64,
		resp *updatepb.Response,
	) (*historypb.HistoryEvent, error)

	// CanAddEvent returns true if an event can be added to the EventStore.
	CanAddEvent() bool
}

EventStore is the interface that an Update needs to read and write events and to be notified when buffered writes have been flushed. It is the expectation of this code that writes to EventStore will return before the data has been made durable. Callbacks attached to the EventStore via OnAfterCommit and OnAfterRollback *must* be called after the EventStore state is successfully written or is discarded.

type Option added in v1.23.0

type Option func(*registry)

func WithInFlightLimit added in v1.21.0

func WithInFlightLimit(f func() int) Option

WithInFlightLimit provides an optional limit to the number of incomplete updates that a Registry instance will allow.

func WithLogger added in v1.21.0

func WithLogger(l log.Logger) Option

WithLogger sets the log.Logger to be used by an UpdateRegistry and its Updates.

func WithMetrics added in v1.21.0

func WithMetrics(m metrics.Handler) Option

WithMetrics sets the metrics.Handler to be used by an UpdateRegistry and its Updates.

func WithTotalLimit added in v1.21.0

func WithTotalLimit(f func() int) Option

WithTotalLimit provides an optional limit to the total number of updates for workflow.

func WithTracerProvider added in v1.21.0

func WithTracerProvider(t trace.TracerProvider) Option

WithTracerProvider sets the trace.TracerProvider (and by extension the trace.Tracer) to be used by an UpdateRegistry and its Updates.

type Registry

type Registry interface {
	// FindOrCreate finds an existing Update or creates a new one. The second
	// return value (bool) indicates whether the Update returned already
	// existed and was found (true) or was not found and has been newly
	// created (false)
	FindOrCreate(ctx context.Context, protocolInstanceID string) (*Update, bool, error)

	// Find finds an existing update in this Registry but does not create a
	// new update if no update is found.
	Find(ctx context.Context, protocolInstanceID string) (*Update, bool)

	// HasOutgoingMessages returns true if the registry has any Updates
	// for which outgoing message can be generated.
	// If includeAlreadySent is set to true then it will return true
	// even if update message was already sent but not processed by worker.
	HasOutgoingMessages(includeAlreadySent bool) bool

	// Send returns messages for all Updates that need to be sent to the worker.
	// If includeAlreadySent is set to true then messages will be created even
	// for updates which were already sent but not processed by worker.
	Send(ctx context.Context, includeAlreadySent bool, workflowTaskStartedEventID int64, eventStore EventStore) []*protocolpb.Message

	// RejectUnprocessed reject all updates that are waiting for workflow task to be completed.
	// This method should be called after all messages from worker are handled to make sure
	// that worker processed (rejected or accepted) all updates that were delivered on the workflow task.
	RejectUnprocessed(ctx context.Context, eventStore EventStore) ([]string, error)

	// CancelIncomplete cancels all incomplete updates in the registry:
	//   - updates in stateAdmitted, stateRequested, or stateSent are rejected,
	//   - updates in stateAccepted are ignored (see CancelIncomplete() in update.go for details),
	//   - updates in stateCompleted are ignored.
	CancelIncomplete(ctx context.Context, reason CancelReason, eventStore EventStore) error

	// Len observes the number of incomplete updates in this Registry.
	Len() int

	// GetSize returns the size of the update object
	GetSize() int
}

Registry maintains a set of updates that have been admitted to run against a workflow execution.

func NewRegistry

func NewRegistry(
	getStoreFn func() Store,
	opts ...Option,
) Registry

type Store added in v1.23.0

type Store interface {
	VisitUpdates(visitor func(updID string, updInfo *updatespb.UpdateInfo))
	GetUpdateOutcome(ctx context.Context, updateID string) (*updatepb.Outcome, error)
}

Store represents the update package's requirements for reading updates from the store.

type Update

type Update struct {
	// contains filtered or unexported fields
}

Update is a state machine for the update protocol. It reads and writes messages from the go.temporal.io/api/update/v1 package. See the diagram in service/history/workflow/update/README.md. The update state machine is straightforward except in that it provides "provisional" in-between states where the update has received a message that has modified its internal state but those changes have not been made visible to clients yet (e.g. accepted or outcome futures have not been set yet). The observable changes are bound to the EventStore's effect.Controller and will be triggered when those effects are applied. State transitions (OnMessage calls) must be done while holding the workflow lock.

func New added in v1.21.0

func New(id string, opts ...updateOpt) *Update

New creates a new Update instance with the provided ID that will call the onComplete callback when it completes.

func (*Update) CancelIncomplete

func (u *Update) CancelIncomplete(ctx context.Context, reason CancelReason, eventStore EventStore) error

CancelIncomplete cancels update if it wasn't completed yet:

  • if in stateAdmitted, stateRequested, or stateSent -> reject,
  • if in stateAccepted -> do nothing,
  • if in stateCompleted -> do nothing.

func (*Update) GetSize added in v1.24.0

func (u *Update) GetSize() int

func (*Update) OnProtocolMessage added in v1.24.0

func (u *Update) OnProtocolMessage(
	ctx context.Context,
	protocolMsg *protocolpb.Message,
	eventStore EventStore,
) error

OnProtocolMessage delivers a message to the Update state machine. The Body field of *protocolpb.Message parameter is expected to be one of *updatepb.Response, *updatepb.Rejection, *updatepb.Acceptance. Writes to the EventStore occur synchronously but externally observable effects on this Update (e.g. emitting an Outcome or an Accepted) are registered with the EventStore to be applied after the durable updates are committed. If the EventStore rolls back its effects, this state machine does the same.

If you modify the state machine please update the diagram in service/history/workflow/update/README.md.

func (*Update) Request

func (u *Update) Request(
	ctx context.Context,
	req *updatepb.Request,
	eventStore EventStore,
) error

Request works if the Update is in any state but if the state is anything other than stateAdmitted then it just early returns a nil error. This effectively gives us update request deduplication by update ID. If the Update is in stateAdmitted then it builds a protocolpb.Message that will be sent on ensuing calls to PollOutgoingMessages until the update is accepted.

func (*Update) Send added in v1.24.0

func (u *Update) Send(
	_ context.Context,
	includeAlreadySent bool,
	sequencingID *protocolpb.Message_EventId,
	eventStore EventStore,
) *protocolpb.Message

Send moves update from stateRequested to stateSent and returns the message to be sent to worker. If update is not in expected stateRequested, Send does nothing and returns nil. If includeAlreadySent is set to true then Send will return message even if update was already sent but not processed by worker. Note: once update moved to stateSent it never moves back to stateRequested.

func (*Update) Status added in v1.23.0

func (u *Update) Status() (UpdateStatus, error)

Status returns an UpdateStatus containing the enumspb.UpdateWorkflowExecutionLifecycleStage corresponding to the current state of this Update, and the Outcome if it has one.

func (*Update) WaitAccepted added in v1.21.0

func (u *Update) WaitAccepted(ctx context.Context) (UpdateStatus, error)

WaitAccepted blocks on the acceptance of this update, returning nil if it has been accepted but not yet completed, or the overall Outcome if the update has been completed (including completed by rejection). This call will block until the acceptance occurs or the provided context.Context expires. It is safe to call this method outside of workflow lock.

func (*Update) WaitLifecycleStage added in v1.23.0

func (u *Update) WaitLifecycleStage(
	ctx context.Context,
	waitStage enumspb.UpdateWorkflowExecutionLifecycleStage,
	softTimeout time.Duration) (UpdateStatus, error)

WaitLifecycleStage waits until the Update has reached at least `waitStage` or a timeout. If the Update reaches `waitStage` with no timeout, the most advanced stage known to have been reached is returned, along with the outcome if any. If there is a timeout due to the supplied soft timeout, then unspecified stage and nil outcome are returned, without an error. If there is a timeout due to context deadline expiry, then the error is returned as usual.

func (*Update) WaitOutcome

func (u *Update) WaitOutcome(ctx context.Context) (UpdateStatus, error)

WaitOutcome observes this Update's completion, returning when the Outcome is available. This call will block until the Outcome is known or the provided context.Context expires. It is safe to call this method outside of workflow lock.

type UpdateStatus added in v1.23.0

type UpdateStatus struct {
	Stage   enumspb.UpdateWorkflowExecutionLifecycleStage
	Outcome *updatepb.Outcome
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL