Documentation ¶
Index ¶
- Constants
- func WithInFlightLimit(f func() int) regOpt
- func WithLogger(l log.Logger) regOpt
- func WithMetrics(m metrics.Handler) regOpt
- func WithTotalLimit(f func() int) regOpt
- func WithTracerProvider(t trace.TracerProvider) regOpt
- type EventStore
- type Registry
- type RegistryImpl
- func (r *RegistryImpl) Find(ctx context.Context, id string) (*Update, bool)
- func (r *RegistryImpl) FindOrCreate(ctx context.Context, id string) (*Update, bool, error)
- func (r *RegistryImpl) HasOutgoing() bool
- func (r *RegistryImpl) Len() int
- func (r *RegistryImpl) ReadOutgoingMessages(workflowTaskStartedEventID int64) []*protocolpb.Message
- func (r *RegistryImpl) TerminateUpdates(_ context.Context, _ EventStore)
- type Update
- func (u *Update) OnMessage(ctx context.Context, msg proto.Message, eventStore EventStore) error
- func (u *Update) ReadOutgoingMessages(out *[]*protocolpb.Message, sequencingID *protocolpb.Message_EventId)
- func (u *Update) WaitAccepted(ctx context.Context) (*updatepb.Outcome, error)
- func (u *Update) WaitOutcome(ctx context.Context) (*updatepb.Outcome, error)
- type UpdateStore
Constants ¶
const ProtocolV1 = protocol.Type("temporal.api.update.v1")
Variables ¶
This section is empty.
Functions ¶
func WithInFlightLimit ¶ added in v1.21.0
func WithInFlightLimit(f func() int) regOpt
WithInFlightLimit provides an optional limit to the number of incomplete updates that a Registry instance will allow.
func WithLogger ¶ added in v1.21.0
WithLogger sets the log.Logger to be used by an UpdateRegistry and its Updates.
func WithMetrics ¶ added in v1.21.0
WithMetrics sets the metrics.Handler to be used by an UpdateRegistry and its Updates.
func WithTotalLimit ¶ added in v1.21.0
func WithTotalLimit(f func() int) regOpt
WithTotalLimit provides an optional limit to the total number of updates for workflow.
func WithTracerProvider ¶ added in v1.21.0
func WithTracerProvider(t trace.TracerProvider) regOpt
WithTracerProvider sets the trace.TracerProvider (and by extension the trace.Tracer) to be used by an UpdateRegistry and its Updates.
Types ¶
type EventStore ¶ added in v1.21.0
type EventStore interface { effect.Controller // AddWorkflowExecutionUpdateAcceptedEvent writes an update accepted // event. The data may not be durable when this function returns. AddWorkflowExecutionUpdateAcceptedEvent( updateID string, acceptedRequestMessageId string, acceptedRequestSequencingEventId int64, acceptedRequest *updatepb.Request, ) (*historypb.HistoryEvent, error) // AddWorkflowExecutionUpdateCompletedEvent writes an update completed // event. The data may not be durable when this function returns. AddWorkflowExecutionUpdateCompletedEvent( acceptedEventID int64, resp *updatepb.Response, ) (*historypb.HistoryEvent, error) }
EventStore is the interface that an Update needs to read and write events and to be notified when buffered writes have been flushed. It is the expectation of this code that writes to EventStore will return before the data has been made durable. Callbacks attached to the EventStore via OnAfterCommit and OnAfterRollback *must* be called after the EventStore state is successfully written or is discarded.
type Registry ¶
type Registry interface { // FindOrCreate finds an existing Update or creates a new one. The second // return value (bool) indicates whether the Update returned already // existed and was found (true) or was not found and has been newly // created (false) FindOrCreate(ctx context.Context, protocolInstanceID string) (*Update, bool, error) // Find finds an existing update in this Registry but does not create a // new update if no update is found. Find(ctx context.Context, protocolInstanceID string) (*Update, bool) // ReadOutgoingMessages polls each registered Update for outbound // messages and returns them. ReadOutgoingMessages(startedEventID int64) []*protocolpb.Message // TerminateUpdates terminates all existing updates in the registry // and notifies update aPI callers with corresponding error. TerminateUpdates(ctx context.Context, eventStore EventStore) // HasOutgoing returns true if the registry has any Updates that want to // sent messages to a worker. HasOutgoing() bool // Len observes the number of incomplete updates in this Registry. Len() int }
Registry maintains a set of updates that have been admitted to run against a workflow execution.
type RegistryImpl ¶
type RegistryImpl struct {
// contains filtered or unexported fields
}
func NewRegistry ¶
func NewRegistry( getStoreFn func() UpdateStore, opts ...regOpt, ) *RegistryImpl
func (*RegistryImpl) FindOrCreate ¶ added in v1.21.0
func (*RegistryImpl) HasOutgoing ¶ added in v1.21.0
func (r *RegistryImpl) HasOutgoing() bool
func (*RegistryImpl) Len ¶ added in v1.21.0
func (r *RegistryImpl) Len() int
func (*RegistryImpl) ReadOutgoingMessages ¶ added in v1.21.0
func (r *RegistryImpl) ReadOutgoingMessages( workflowTaskStartedEventID int64, ) []*protocolpb.Message
func (*RegistryImpl) TerminateUpdates ¶ added in v1.21.0
func (r *RegistryImpl) TerminateUpdates(_ context.Context, _ EventStore)
type Update ¶
type Update struct {
// contains filtered or unexported fields
}
Update is a state machine for the update protocol. It reads and writes messages from the go.temporal.io/api/update/v1 package. The update state machine is straightforward except in that it provides "provisional" in-between states where the update has received a message that has modified its internal state but those changes have not been made visible to clients yet (e.g. accepted or outcome futures have not been set yet). The observable changes are bound to the EventStore's effect.Controller and will be triggered when those effects are applied. State transitions (OnMessage calls) must be done while holding the workflow lock.
func New ¶ added in v1.21.0
New creates a new Update instance with the provided ID that will call the onComplete callback when it completes.
func (*Update) OnMessage ¶ added in v1.21.0
OnMessage delivers a message to the Update state machine. The proto.Message parameter is expected to be one of *updatepb.Request, *updatepb.Response, *updatepb.Rejection, *updatepb.Acceptance, or a *protocolpb.Message whose Body field contains an instance from the same list. Writes to the EventStore occur synchronously but externally observable effects on this Update (e.g. emitting an Outcome or an Accepted) are registered with the EventStore to be applied after the durable updates are committed. If the EventStore rolls back its effects, this state machine does the same.
func (*Update) ReadOutgoingMessages ¶ added in v1.21.0
func (u *Update) ReadOutgoingMessages(out *[]*protocolpb.Message, sequencingID *protocolpb.Message_EventId)
ReadOutgoingMessages loads any outbound messages from this Update state machine into the output slice provided.
func (*Update) WaitAccepted ¶ added in v1.21.0
WaitAccepted blocks on the acceptance of this update, returning nil if has been accepted but not yet completed or the overall Outcome if the update has been completed (including completed by rejection). This call will block until the acceptance occurs or the provided context.Context expires. It is safe to call this method outside of workflow lock.
func (*Update) WaitOutcome ¶
WaitOutcome observes this Update's completion, returning the Outcome when it is available. This call will block until the Outcome is known or the provided context.Context expires. It is safe to call this method outside of workflow lock.
type UpdateStore ¶ added in v1.21.0
type UpdateStore interface { VisitUpdates(visitor func(updID string, updInfo *updatespb.UpdateInfo)) GetUpdateOutcome(ctx context.Context, updateID string) (*updatepb.Outcome, error) }
UpdateStore represents the update package's requirements for reading updates from the store.