Documentation ¶
Index ¶
- Constants
- type ActivityHandler
- type ActivityHandlerFunc
- type ActivityInterceptor
- type ActivityTaskCanceledError
- type ActivityTaskDispatcher
- type ActivityWorker
- func (w *ActivityWorker) AddCoordinatedHandler(heartbeatInterval, tickMinInterval time.Duration, ...)
- func (a *ActivityWorker) AddHandler(handler *ActivityHandler)
- func (a *ActivityWorker) HandleActivityTask(activityTask *swf.PollForActivityTaskOutput)
- func (h *ActivityWorker) HandleWithRecovery(handler func(*swf.PollForActivityTaskOutput)) func(*swf.PollForActivityTaskOutput)
- func (a *ActivityWorker) Init()
- func (a *ActivityWorker) Start()
- type BoundedGoroutineDispatcher
- type CallingGoroutineDispatcher
- type ComposedDecisionInterceptor
- func (c *ComposedDecisionInterceptor) AfterTask(t *swf.PollForActivityTaskOutput, result interface{}, err error) (interface{}, error)
- func (c *ComposedDecisionInterceptor) AfterTaskCanceled(t *swf.PollForActivityTaskOutput, details string)
- func (c *ComposedDecisionInterceptor) AfterTaskComplete(t *swf.PollForActivityTaskOutput, result interface{})
- func (c *ComposedDecisionInterceptor) AfterTaskFailed(t *swf.PollForActivityTaskOutput, err error)
- func (c *ComposedDecisionInterceptor) BeforeTask(t *swf.PollForActivityTaskOutput)
- type CoordinatedActivityHandler
- type CoordinatedActivityHandlerCancelFunc
- type CoordinatedActivityHandlerFinishFunc
- type CoordinatedActivityHandlerStartFunc
- type CoordinatedActivityHandlerTickFunc
- type CountdownGoroutineDispatcher
- type FuncInterceptor
- func (i *FuncInterceptor) AfterTask(activity *swf.PollForActivityTaskOutput, result interface{}, err error) (interface{}, error)
- func (i *FuncInterceptor) AfterTaskCanceled(activity *swf.PollForActivityTaskOutput, details string)
- func (i *FuncInterceptor) AfterTaskComplete(activity *swf.PollForActivityTaskOutput, result interface{})
- func (i *FuncInterceptor) AfterTaskFailed(activity *swf.PollForActivityTaskOutput, err error)
- func (i *FuncInterceptor) BeforeTask(activity *swf.PollForActivityTaskOutput)
- type NewGoroutineDispatcher
- type SWFOps
Examples ¶
Constants ¶
const ( TaskGone = "Unknown activity" ExecutionGone = "Unknown execution" )
const (
FailureReasonMaxChars = 256
)
Various constants defined by SWF
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ActivityHandler ¶
type ActivityHandler struct { Activity string HandlerFunc ActivityHandlerFunc Input interface{} }
func NewActivityHandler ¶
func NewActivityHandler(activity string, handler interface{}) *ActivityHandler
func (*ActivityHandler) ZeroInput ¶
func (a *ActivityHandler) ZeroInput() interface{}
type ActivityHandlerFunc ¶
type ActivityHandlerFunc func(activityTask *swf.PollForActivityTaskOutput, input interface{}) (interface{}, error)
type ActivityInterceptor ¶
type ActivityInterceptor interface { BeforeTask(*swf.PollForActivityTaskOutput) AfterTask(t *swf.PollForActivityTaskOutput, result interface{}, err error) (interface{}, error) AfterTaskComplete(t *swf.PollForActivityTaskOutput, result interface{}) AfterTaskFailed(t *swf.PollForActivityTaskOutput, err error) AfterTaskCanceled(t *swf.PollForActivityTaskOutput, details string) }
ActivityInterceptor allows manipulation of the decision task and the outcome at key points in the task lifecycle.
func NewComposedDecisionInterceptor ¶
func NewComposedDecisionInterceptor(interceptors ...ActivityInterceptor) ActivityInterceptor
type ActivityTaskCanceledError ¶
type ActivityTaskCanceledError struct {
// contains filtered or unexported fields
}
func (ActivityTaskCanceledError) Details ¶
func (e ActivityTaskCanceledError) Details() *string
func (ActivityTaskCanceledError) Error ¶
func (e ActivityTaskCanceledError) Error() string
type ActivityTaskDispatcher ¶
type ActivityTaskDispatcher interface {
DispatchTask(*swf.PollForActivityTaskOutput, func(*swf.PollForActivityTaskOutput))
}
ActivityTaskDispatcher is used by the ActivityWorker machinery to dispatch the handling of ActivityTasks. Different implementations can provide different concurrency models.
type ActivityWorker ¶
type ActivityWorker struct { Serializer fsm.StateSerializer SystemSerializer fsm.StateSerializer // Domain of the workflow associated with the FSM. Domain string // TaskList that the underlying poller will poll for decision tasks. TaskList string // Identity used in PollForActivityTaskRequests, can be empty. Identity string // Client used to make SWF api requests. SWF SWFOps // ShutdownManager ShutdownManager *poller.ShutdownManager // ActivityTaskDispatcher ActivityTaskDispatcher ActivityTaskDispatcher // ActivityInterceptor ActivityInterceptor ActivityInterceptor // allow panics in activities rather than recovering and failing the activity, useful for testing AllowPanics bool // reads the EventCorrelator and backs off based on what retry # the activity is. BackoffOnFailure bool // maximum backoff sleep on retries that fail. MaxBackoffSeconds int // contains filtered or unexported fields }
Example ¶
var swfOps SWFOps taskList := "aTaskListSharedBetweenTaskOneAndTwo" handleTask1 := func(task *swf.PollForActivityTaskOutput, input interface{}) (interface{}, error) { return input, nil } handleTask2 := func(task *swf.PollForActivityTaskOutput, input interface{}) (interface{}, error) { return input, nil } handler1 := &ActivityHandler{Activity: "one", HandlerFunc: handleTask1} handler2 := &ActivityHandler{Activity: "two", HandlerFunc: handleTask2} worker := &ActivityWorker{ Domain: "swf-domain", Serializer: fsm.JSONStateSerializer{}, TaskList: taskList, SWF: swfOps, Identity: "test-activity-worker", } worker.AddHandler(handler1) worker.AddHandler(handler2) go worker.Start()
Output:
func (*ActivityWorker) AddCoordinatedHandler ¶
func (w *ActivityWorker) AddCoordinatedHandler(heartbeatInterval, tickMinInterval time.Duration, handler *CoordinatedActivityHandler)
AddCoordinatedHandler automatically takes care of sending back heartbeats and updating state on workflows for an activity task. tickMinInterval determines the max rate at which the CoordinatedActivityHandler.Tick function will be called.
For example, when the Tick function returns quickly (e.g.: noop), and tickMinInterval is 1 * time.Second, Tick is guaranteed to be called at most once per second. The rate can be slower if Tick takes more than tickMinInterval to complete.
func (*ActivityWorker) AddHandler ¶
func (a *ActivityWorker) AddHandler(handler *ActivityHandler)
func (*ActivityWorker) HandleActivityTask ¶
func (a *ActivityWorker) HandleActivityTask(activityTask *swf.PollForActivityTaskOutput)
HandleActivityTask is the callback passed into the registered ActivityTaskDispatcher. It is exposed so that users can handle polling themselves and call DispatchTask directly with this as the callback.
e.g. activityWorker.ActivityTaskDispatcher.DispatchTask(activityTask, a.HandleWithRecovery(a.HandleActivityTask))
Note: You will need to handle recovering from panics if you call this directly without wrapping with HandleWithRecovery.
func (*ActivityWorker) HandleWithRecovery ¶
func (h *ActivityWorker) HandleWithRecovery(handler func(*swf.PollForActivityTaskOutput)) func(*swf.PollForActivityTaskOutput)
HandleWithRecovery is used to wrap handler functions (such as HandleActivityTask) so they gracefully recover from panics.
func (*ActivityWorker) Init ¶
func (a *ActivityWorker) Init()
func (*ActivityWorker) Start ¶
func (a *ActivityWorker) Start()
type BoundedGoroutineDispatcher ¶
type BoundedGoroutineDispatcher struct { NumGoroutines int // contains filtered or unexported fields }
BoundedGoroutineDispatcher is a DecisionTaskDispatcher that uses a bounded number of goroutines to run decision handlers.
func (*BoundedGoroutineDispatcher) DispatchTask ¶
func (b *BoundedGoroutineDispatcher) DispatchTask(task *swf.PollForActivityTaskOutput, handler func(*swf.PollForActivityTaskOutput))
DispatchTask calls sends the task on a channel that NumGoroutines goroutines are selecting on. Goroutines recieving a task run it in the same goroutine. note that this is unsynchronized as DispatchTask will only be called by the single poller goroutine.
type CallingGoroutineDispatcher ¶
type CallingGoroutineDispatcher struct{}
CallingGoroutineDispatcher is a DecisionTaskDispatcher that runs the decision handler in the polling goroutine
func (*CallingGoroutineDispatcher) DispatchTask ¶
func (*CallingGoroutineDispatcher) DispatchTask(task *swf.PollForActivityTaskOutput, handler func(*swf.PollForActivityTaskOutput))
DispatchTask calls the handler in the same goroutine.
type ComposedDecisionInterceptor ¶
type ComposedDecisionInterceptor struct {
// contains filtered or unexported fields
}
func (*ComposedDecisionInterceptor) AfterTask ¶
func (c *ComposedDecisionInterceptor) AfterTask(t *swf.PollForActivityTaskOutput, result interface{}, err error) (interface{}, error)
func (*ComposedDecisionInterceptor) AfterTaskCanceled ¶
func (c *ComposedDecisionInterceptor) AfterTaskCanceled(t *swf.PollForActivityTaskOutput, details string)
func (*ComposedDecisionInterceptor) AfterTaskComplete ¶
func (c *ComposedDecisionInterceptor) AfterTaskComplete(t *swf.PollForActivityTaskOutput, result interface{})
func (*ComposedDecisionInterceptor) AfterTaskFailed ¶
func (c *ComposedDecisionInterceptor) AfterTaskFailed(t *swf.PollForActivityTaskOutput, err error)
func (*ComposedDecisionInterceptor) BeforeTask ¶
func (c *ComposedDecisionInterceptor) BeforeTask(t *swf.PollForActivityTaskOutput)
type CoordinatedActivityHandler ¶
type CoordinatedActivityHandler struct { // Start is called when a new activity is ready to be handled. Start CoordinatedActivityHandlerStartFunc // Tick is called regularly to process a running activity. // Tick that returns true, nil, nil just expresses that the job is still running. // Tick that returns true, &SomeStruct{}, nil will express that the job is still running and also send an 'ActivityUpdated' signal back to the FSM with SomeStruct{} as the Input. // Tick that returns false, &SomeStruct{}, nil, expresses that the job/activity is done and send SomeStruct{} back as the result. as well as stops heartbeating. // Tick that returns false, nil, nil, expresses that the job is done and send no result back, as well as stops heartbeating. // Tick that returns false, nil, err expresses that the job/activity failed and sends back err as the reason. as well as stops heartbeating. Tick CoordinatedActivityHandlerTickFunc // Cancel is called when a running activity receives a request to cancel // via heartbeat update. Cancel CoordinatedActivityHandlerCancelFunc // Finish is called at the end of handling every activity. // It is called no matter the outcome, eg if Start fails, // Tick decides to stop continuing, or the activity is canceled. Finish CoordinatedActivityHandlerFinishFunc Input interface{} Activity string }
func NewCoordinatedActivityHandler ¶
func NewCoordinatedActivityHandler(activity string, start interface{}, tick interface{}, cancel interface{}, finish interface{}) *CoordinatedActivityHandler
type CoordinatedActivityHandlerCancelFunc ¶
type CoordinatedActivityHandlerCancelFunc func(*swf.PollForActivityTaskOutput, interface{}) error
type CoordinatedActivityHandlerFinishFunc ¶
type CoordinatedActivityHandlerFinishFunc func(*swf.PollForActivityTaskOutput, interface{}) error
type CoordinatedActivityHandlerStartFunc ¶
type CoordinatedActivityHandlerStartFunc func(*swf.PollForActivityTaskOutput, interface{}) (interface{}, error)
type CoordinatedActivityHandlerTickFunc ¶
type CoordinatedActivityHandlerTickFunc func(*swf.PollForActivityTaskOutput, interface{}) (bool, interface{}, error)
type CountdownGoroutineDispatcher ¶
type CountdownGoroutineDispatcher struct { Stop chan bool StopAck chan bool // contains filtered or unexported fields }
CountdownGoroutineDispatcher is a dispatcher that you can register with a ShutdownManager. Used in your ActivityWorkers, it will count in-flight activities. It doesnt ack shutdowns until the number of in-flight activities are zero.
func RegisterNewCountdownGoroutineDispatcher ¶
func RegisterNewCountdownGoroutineDispatcher(mgr poller.ShutdownManager) *CountdownGoroutineDispatcher
RegisterNewCountdownGoroutineDispatcher constructs a new CountdownGoroutineDispatcher, start it and register it with the given ShutdownManager
func (*CountdownGoroutineDispatcher) DispatchTask ¶
func (m *CountdownGoroutineDispatcher) DispatchTask(t *swf.PollForActivityTaskOutput, f func(*swf.PollForActivityTaskOutput))
func (*CountdownGoroutineDispatcher) Start ¶
func (m *CountdownGoroutineDispatcher) Start()
type FuncInterceptor ¶
type FuncInterceptor struct { BeforeTaskFn func(*swf.PollForActivityTaskOutput) AfterTaskFn func(t *swf.PollForActivityTaskOutput, result interface{}, err error) (interface{}, error) AfterTaskCompleteFn func(t *swf.PollForActivityTaskOutput, result interface{}) AfterTaskFailedFn func(t *swf.PollForActivityTaskOutput, err error) AfterTaskCanceledFn func(t *swf.PollForActivityTaskOutput, details string) }
FuncInterceptor is a ActivityInterceptor that you can set handler funcs on. if any are unset, they are no-ops.
func (*FuncInterceptor) AfterTask ¶
func (i *FuncInterceptor) AfterTask(activity *swf.PollForActivityTaskOutput, result interface{}, err error) (interface{}, error)
func (*FuncInterceptor) AfterTaskCanceled ¶
func (i *FuncInterceptor) AfterTaskCanceled(activity *swf.PollForActivityTaskOutput, details string)
AfterTaskCanceled runs the AfterTaskCanceledFn if not nil
func (*FuncInterceptor) AfterTaskComplete ¶
func (i *FuncInterceptor) AfterTaskComplete(activity *swf.PollForActivityTaskOutput, result interface{})
AfterTaskComplete runs the AfterTaskCompleteFn if not nil
func (*FuncInterceptor) AfterTaskFailed ¶
func (i *FuncInterceptor) AfterTaskFailed(activity *swf.PollForActivityTaskOutput, err error)
AfterTaskFailed runs the AfterTaskFailedFn if not nil
func (*FuncInterceptor) BeforeTask ¶
func (i *FuncInterceptor) BeforeTask(activity *swf.PollForActivityTaskOutput)
BeforeTask runs the BeforeTaskFn if not nil
type NewGoroutineDispatcher ¶
type NewGoroutineDispatcher struct { }
NewGoroutineDispatcher is a DecisionTaskDispatcher that runs the decision handler in a new goroutine.
func (*NewGoroutineDispatcher) DispatchTask ¶
func (*NewGoroutineDispatcher) DispatchTask(task *swf.PollForActivityTaskOutput, handler func(*swf.PollForActivityTaskOutput))
DispatchTask calls the handler in a new goroutine.
type SWFOps ¶
type SWFOps interface { RecordActivityTaskHeartbeat(req *swf.RecordActivityTaskHeartbeatInput) (*swf.RecordActivityTaskHeartbeatOutput, error) RespondActivityTaskCanceled(req *swf.RespondActivityTaskCanceledInput) (*swf.RespondActivityTaskCanceledOutput, error) RespondActivityTaskCompleted(req *swf.RespondActivityTaskCompletedInput) (*swf.RespondActivityTaskCompletedOutput, error) RespondActivityTaskFailed(req *swf.RespondActivityTaskFailedInput) (*swf.RespondActivityTaskFailedOutput, error) PollForActivityTask(req *swf.PollForActivityTaskInput) (*swf.PollForActivityTaskOutput, error) GetWorkflowExecutionHistory(req *swf.GetWorkflowExecutionHistoryInput) (*swf.GetWorkflowExecutionHistoryOutput, error) SignalWorkflowExecution(req *swf.SignalWorkflowExecutionInput) (*swf.SignalWorkflowExecutionOutput, error) }