Documentation
¶
Index ¶
- Constants
- type ActivityHandler
- type ActivityHandlerFunc
- type ActivityInterceptor
- type ActivityTaskCanceledError
- type ActivityTaskDispatcher
- type ActivityWorker
- type BoundedGoroutineDispatcher
- type CallingGoroutineDispatcher
- type ComposedDecisionInterceptor
- func (c *ComposedDecisionInterceptor) AfterTask(t *swf.PollForActivityTaskOutput, result interface{}, err error) (interface{}, error)
- func (c *ComposedDecisionInterceptor) AfterTaskCanceled(t *swf.PollForActivityTaskOutput, details string)
- func (c *ComposedDecisionInterceptor) AfterTaskComplete(t *swf.PollForActivityTaskOutput, result interface{})
- func (c *ComposedDecisionInterceptor) AfterTaskFailed(t *swf.PollForActivityTaskOutput, err error)
- func (c *ComposedDecisionInterceptor) BeforeTask(t *swf.PollForActivityTaskOutput)
- type CoordinatedActivityHandler
- type CoordinatedActivityHandlerCancelFunc
- type CoordinatedActivityHandlerStartFunc
- type CoordinatedActivityHandlerTickFunc
- type FuncInterceptor
- func (i *FuncInterceptor) AfterTask(activity *swf.PollForActivityTaskOutput, result interface{}, err error) (interface{}, error)
- func (i *FuncInterceptor) AfterTaskCanceled(activity *swf.PollForActivityTaskOutput, details string)
- func (i *FuncInterceptor) AfterTaskComplete(activity *swf.PollForActivityTaskOutput, result interface{})
- func (i *FuncInterceptor) AfterTaskFailed(activity *swf.PollForActivityTaskOutput, err error)
- func (i *FuncInterceptor) BeforeTask(activity *swf.PollForActivityTaskOutput)
- type NewGoroutineDispatcher
- type SWFOps
Examples ¶
Constants ¶
const ( TaskGone = "Unknown activity" ExecutionGone = "Unknown execution" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ActivityHandler ¶
type ActivityHandler struct { Activity string HandlerFunc ActivityHandlerFunc Input interface{} }
func NewActivityHandler ¶
func NewActivityHandler(activity string, handler interface{}) *ActivityHandler
func (*ActivityHandler) ZeroInput ¶
func (a *ActivityHandler) ZeroInput() interface{}
type ActivityHandlerFunc ¶
type ActivityHandlerFunc func(activityTask *swf.PollForActivityTaskOutput, input interface{}) (interface{}, error)
type ActivityInterceptor ¶
type ActivityInterceptor interface { BeforeTask(*swf.PollForActivityTaskOutput) AfterTask(t *swf.PollForActivityTaskOutput, result interface{}, err error) (interface{}, error) AfterTaskComplete(t *swf.PollForActivityTaskOutput, result interface{}) AfterTaskFailed(t *swf.PollForActivityTaskOutput, err error) AfterTaskCanceled(t *swf.PollForActivityTaskOutput, details string) }
ActivityInterceptor allows manipulation of the decision task and the outcome at key points in the task lifecycle.
func NewComposedDecisionInterceptor ¶
func NewComposedDecisionInterceptor(interceptors ...ActivityInterceptor) ActivityInterceptor
type ActivityTaskCanceledError ¶
type ActivityTaskCanceledError struct {
// contains filtered or unexported fields
}
func (ActivityTaskCanceledError) Details ¶
func (e ActivityTaskCanceledError) Details() *string
func (ActivityTaskCanceledError) Error ¶
func (e ActivityTaskCanceledError) Error() string
type ActivityTaskDispatcher ¶
type ActivityTaskDispatcher interface {
DispatchTask(*swf.PollForActivityTaskOutput, func(*swf.PollForActivityTaskOutput))
}
ActivityTaskDispatcher is used by the ActivityWorker machinery to dispatch the handling of ActivityTasks. Different implementations can provide different concurrency models.
type ActivityWorker ¶
type ActivityWorker struct { Serializer fsm.StateSerializer // Domain of the workflow associated with the FSM. Domain string // TaskList that the underlying poller will poll for decision tasks. TaskList string // Identity used in PollForActivityTaskRequests, can be empty. Identity string // Client used to make SWF api requests. SWF SWFOps // ShutdownManager ShutdownManager *poller.ShutdownManager // ActivityTaskDispatcher ActivityTaskDispatcher ActivityTaskDispatcher // ActivityInterceptor ActivityInterceptor ActivityInterceptor // allow panics in activities rather than recovering and failing the activity, useful for testing AllowPanics bool // reads the EventCorrelator and backs off based on what retry # the activity is. BackoffOnFailure bool // maximum backoff sleep on retries that fail. MaxBackoffSeconds int // contains filtered or unexported fields }
Example ¶
var swfOps SWFOps taskList := "aTaskListSharedBetweenTaskOneAndTwo" handleTask1 := func(task *swf.PollForActivityTaskOutput, input interface{}) (interface{}, error) { return input, nil } handleTask2 := func(task *swf.PollForActivityTaskOutput, input interface{}) (interface{}, error) { return input, nil } handler1 := &ActivityHandler{Activity: "one", HandlerFunc: handleTask1} handler2 := &ActivityHandler{Activity: "two", HandlerFunc: handleTask2} worker := &ActivityWorker{ Domain: "swf-domain", Serializer: fsm.JSONStateSerializer{}, TaskList: taskList, SWF: swfOps, Identity: "test-activity-worker", } worker.AddHandler(handler1) worker.AddHandler(handler2) go worker.Start()
Output:
func (*ActivityWorker) AddCoordinatedHandler ¶
func (w *ActivityWorker) AddCoordinatedHandler(heartbeatInterval, tickMinInterval time.Duration, handler *CoordinatedActivityHandler)
AddCoordinatedHandler automatically takes care of sending back heartbeats and updating state on workflows for an activity task. tickMinInterval determines the max rate at which the CoordinatedActivityHandler.Tick function will be called.
For example, when the Tick function returns quickly (e.g.: noop), and tickMinInterval is 1 * time.Second, Tick is guaranteed to be called at most once per second. The rate can be slower if Tick takes more than tickMinInterval to complete.
func (*ActivityWorker) AddHandler ¶
func (a *ActivityWorker) AddHandler(handler *ActivityHandler)
func (*ActivityWorker) Init ¶
func (a *ActivityWorker) Init()
func (*ActivityWorker) Start ¶
func (a *ActivityWorker) Start()
type BoundedGoroutineDispatcher ¶
type BoundedGoroutineDispatcher struct { NumGoroutines int // contains filtered or unexported fields }
BoundedGoroutineDispatcher is a DecisionTaskDispatcher that uses a bounded number of goroutines to run decision handlers.
func (*BoundedGoroutineDispatcher) DispatchTask ¶
func (b *BoundedGoroutineDispatcher) DispatchTask(task *swf.PollForActivityTaskOutput, handler func(*swf.PollForActivityTaskOutput))
DispatchTask calls sends the task on a channel that NumGoroutines goroutines are selecting on. Goroutines recieving a task run it in the same goroutine. note that this is unsynchronized as DispatchTask will only be called by the single poller goroutine.
type CallingGoroutineDispatcher ¶
type CallingGoroutineDispatcher struct{}
CallingGoroutineDispatcher is a DecisionTaskDispatcher that runs the decision handler in the polling goroutine
func (*CallingGoroutineDispatcher) DispatchTask ¶
func (*CallingGoroutineDispatcher) DispatchTask(task *swf.PollForActivityTaskOutput, handler func(*swf.PollForActivityTaskOutput))
DispatchTask calls the handler in the same goroutine.
type ComposedDecisionInterceptor ¶
type ComposedDecisionInterceptor struct {
// contains filtered or unexported fields
}
func (*ComposedDecisionInterceptor) AfterTask ¶
func (c *ComposedDecisionInterceptor) AfterTask(t *swf.PollForActivityTaskOutput, result interface{}, err error) (interface{}, error)
func (*ComposedDecisionInterceptor) AfterTaskCanceled ¶
func (c *ComposedDecisionInterceptor) AfterTaskCanceled(t *swf.PollForActivityTaskOutput, details string)
func (*ComposedDecisionInterceptor) AfterTaskComplete ¶
func (c *ComposedDecisionInterceptor) AfterTaskComplete(t *swf.PollForActivityTaskOutput, result interface{})
func (*ComposedDecisionInterceptor) AfterTaskFailed ¶
func (c *ComposedDecisionInterceptor) AfterTaskFailed(t *swf.PollForActivityTaskOutput, err error)
func (*ComposedDecisionInterceptor) BeforeTask ¶
func (c *ComposedDecisionInterceptor) BeforeTask(t *swf.PollForActivityTaskOutput)
type CoordinatedActivityHandler ¶
type CoordinatedActivityHandler struct { Start CoordinatedActivityHandlerStartFunc Tick CoordinatedActivityHandlerTickFunc Cancel CoordinatedActivityHandlerCancelFunc Input interface{} Activity string }
func NewCoordinatedActivityHandler ¶
func NewCoordinatedActivityHandler(activity string, start interface{}, tick interface{}, cancel interface{}) *CoordinatedActivityHandler
type CoordinatedActivityHandlerCancelFunc ¶
type CoordinatedActivityHandlerCancelFunc func(*swf.PollForActivityTaskOutput, interface{}) error
type CoordinatedActivityHandlerStartFunc ¶
type CoordinatedActivityHandlerStartFunc func(*swf.PollForActivityTaskOutput, interface{}) (interface{}, error)
type CoordinatedActivityHandlerTickFunc ¶
type CoordinatedActivityHandlerTickFunc func(*swf.PollForActivityTaskOutput, interface{}) (bool, interface{}, error)
type FuncInterceptor ¶
type FuncInterceptor struct { BeforeTaskFn func(*swf.PollForActivityTaskOutput) AfterTaskFn func(t *swf.PollForActivityTaskOutput, result interface{}, err error) (interface{}, error) AfterTaskCompleteFn func(t *swf.PollForActivityTaskOutput, result interface{}) AfterTaskFailedFn func(t *swf.PollForActivityTaskOutput, err error) AfterTaskCanceledFn func(t *swf.PollForActivityTaskOutput, details string) }
FuncInterceptor is a ActivityInterceptor that you can set handler funcs on. if any are unset, they are no-ops.
func (*FuncInterceptor) AfterTask ¶
func (i *FuncInterceptor) AfterTask(activity *swf.PollForActivityTaskOutput, result interface{}, err error) (interface{}, error)
func (*FuncInterceptor) AfterTaskCanceled ¶
func (i *FuncInterceptor) AfterTaskCanceled(activity *swf.PollForActivityTaskOutput, details string)
AfterTaskCanceled runs the AfterTaskCanceledFn if not nil
func (*FuncInterceptor) AfterTaskComplete ¶
func (i *FuncInterceptor) AfterTaskComplete(activity *swf.PollForActivityTaskOutput, result interface{})
AfterTaskComplete runs the AfterTaskCompleteFn if not nil
func (*FuncInterceptor) AfterTaskFailed ¶
func (i *FuncInterceptor) AfterTaskFailed(activity *swf.PollForActivityTaskOutput, err error)
AfterTaskFailed runs the AfterTaskFailedFn if not nil
func (*FuncInterceptor) BeforeTask ¶
func (i *FuncInterceptor) BeforeTask(activity *swf.PollForActivityTaskOutput)
BeforeTask runs the BeforeTaskFn if not nil
type NewGoroutineDispatcher ¶
type NewGoroutineDispatcher struct { }
NewGoroutineDispatcher is a DecisionTaskDispatcher that runs the decision handler in a new goroutine.
func (*NewGoroutineDispatcher) DispatchTask ¶
func (*NewGoroutineDispatcher) DispatchTask(task *swf.PollForActivityTaskOutput, handler func(*swf.PollForActivityTaskOutput))
DispatchTask calls the handler in a new goroutine.
type SWFOps ¶
type SWFOps interface { RecordActivityTaskHeartbeat(req *swf.RecordActivityTaskHeartbeatInput) (*swf.RecordActivityTaskHeartbeatOutput, error) RespondActivityTaskCanceled(req *swf.RespondActivityTaskCanceledInput) (*swf.RespondActivityTaskCanceledOutput, error) RespondActivityTaskCompleted(req *swf.RespondActivityTaskCompletedInput) (*swf.RespondActivityTaskCompletedOutput, error) RespondActivityTaskFailed(req *swf.RespondActivityTaskFailedInput) (*swf.RespondActivityTaskFailedOutput, error) PollForActivityTask(req *swf.PollForActivityTaskInput) (*swf.PollForActivityTaskOutput, error) GetWorkflowExecutionHistory(req *swf.GetWorkflowExecutionHistoryInput) (*swf.GetWorkflowExecutionHistoryOutput, error) SignalWorkflowExecution(req *swf.SignalWorkflowExecutionInput) (*swf.SignalWorkflowExecutionOutput, error) }