activity

package
v0.0.0-...-28893bc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 20, 2015 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Examples

Constants

View Source
const (
	TaskGone      = "Unknown activity"
	ExecutionGone = "Unknown execution"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type ActivityHandler

type ActivityHandler struct {
	Activity    string
	HandlerFunc ActivityHandlerFunc
	Input       interface{}
}

func NewActivityHandler

func NewActivityHandler(activity string, handler interface{}) *ActivityHandler

func (*ActivityHandler) ZeroInput

func (a *ActivityHandler) ZeroInput() interface{}

type ActivityHandlerFunc

type ActivityHandlerFunc func(activityTask *swf.PollForActivityTaskOutput, input interface{}) (interface{}, error)

type ActivityInterceptor

type ActivityInterceptor interface {
	BeforeTask(*swf.PollForActivityTaskOutput)
	AfterTask(t *swf.PollForActivityTaskOutput, result interface{}, err error) (interface{}, error)
	AfterTaskComplete(t *swf.PollForActivityTaskOutput, result interface{})
	AfterTaskFailed(t *swf.PollForActivityTaskOutput, err error)
	AfterTaskCanceled(t *swf.PollForActivityTaskOutput, details string)
}

ActivityInterceptor allows manipulation of the decision task and the outcome at key points in the task lifecycle.

func NewComposedDecisionInterceptor

func NewComposedDecisionInterceptor(interceptors ...ActivityInterceptor) ActivityInterceptor

type ActivityTaskCanceledError

type ActivityTaskCanceledError struct {
	// contains filtered or unexported fields
}

func (ActivityTaskCanceledError) Details

func (e ActivityTaskCanceledError) Details() *string

func (ActivityTaskCanceledError) Error

type ActivityTaskDispatcher

type ActivityTaskDispatcher interface {
	DispatchTask(*swf.PollForActivityTaskOutput, func(*swf.PollForActivityTaskOutput))
}

ActivityTaskDispatcher is used by the ActivityWorker machinery to dispatch the handling of ActivityTasks. Different implementations can provide different concurrency models.

type ActivityWorker

type ActivityWorker struct {
	Serializer fsm.StateSerializer

	// Domain of the workflow associated with the FSM.
	Domain string
	// TaskList that the underlying poller will poll for decision tasks.
	TaskList string
	// Identity used in PollForActivityTaskRequests, can be empty.
	Identity string
	// Client used to make SWF api requests.
	SWF SWFOps

	// ShutdownManager
	ShutdownManager *poller.ShutdownManager
	// ActivityTaskDispatcher
	ActivityTaskDispatcher ActivityTaskDispatcher
	// ActivityInterceptor
	ActivityInterceptor ActivityInterceptor
	// allow panics in activities rather than recovering and failing the activity, useful for testing
	AllowPanics bool
	// reads the EventCorrelator and backs off based on what retry # the activity is.
	BackoffOnFailure bool
	// maximum backoff sleep on retries that fail.
	MaxBackoffSeconds int
	// contains filtered or unexported fields
}
Example
var swfOps SWFOps

taskList := "aTaskListSharedBetweenTaskOneAndTwo"

handleTask1 := func(task *swf.PollForActivityTaskOutput, input interface{}) (interface{}, error) {
	return input, nil
}

handleTask2 := func(task *swf.PollForActivityTaskOutput, input interface{}) (interface{}, error) {
	return input, nil
}

handler1 := &ActivityHandler{Activity: "one", HandlerFunc: handleTask1}

handler2 := &ActivityHandler{Activity: "two", HandlerFunc: handleTask2}

worker := &ActivityWorker{
	Domain:     "swf-domain",
	Serializer: fsm.JSONStateSerializer{},
	TaskList:   taskList,
	SWF:        swfOps,
	Identity:   "test-activity-worker",
}

worker.AddHandler(handler1)

worker.AddHandler(handler2)

go worker.Start()
Output:

func (*ActivityWorker) AddCoordinatedHandler

func (w *ActivityWorker) AddCoordinatedHandler(heartbeatInterval, tickMinInterval time.Duration, handler *CoordinatedActivityHandler)

AddCoordinatedHandler automatically takes care of sending back heartbeats and updating state on workflows for an activity task. tickMinInterval determines the max rate at which the CoordinatedActivityHandler.Tick function will be called.

For example, when the Tick function returns quickly (e.g.: noop), and tickMinInterval is 1 * time.Second, Tick is guaranteed to be called at most once per second. The rate can be slower if Tick takes more than tickMinInterval to complete.

func (*ActivityWorker) AddHandler

func (a *ActivityWorker) AddHandler(handler *ActivityHandler)

func (*ActivityWorker) Init

func (a *ActivityWorker) Init()

func (*ActivityWorker) Start

func (a *ActivityWorker) Start()

type BoundedGoroutineDispatcher

type BoundedGoroutineDispatcher struct {
	NumGoroutines int
	// contains filtered or unexported fields
}

BoundedGoroutineDispatcher is a DecisionTaskDispatcher that uses a bounded number of goroutines to run decision handlers.

func (*BoundedGoroutineDispatcher) DispatchTask

DispatchTask calls sends the task on a channel that NumGoroutines goroutines are selecting on. Goroutines recieving a task run it in the same goroutine. note that this is unsynchronized as DispatchTask will only be called by the single poller goroutine.

type CallingGoroutineDispatcher

type CallingGoroutineDispatcher struct{}

CallingGoroutineDispatcher is a DecisionTaskDispatcher that runs the decision handler in the polling goroutine

func (*CallingGoroutineDispatcher) DispatchTask

DispatchTask calls the handler in the same goroutine.

type ComposedDecisionInterceptor

type ComposedDecisionInterceptor struct {
	// contains filtered or unexported fields
}

func (*ComposedDecisionInterceptor) AfterTask

func (c *ComposedDecisionInterceptor) AfterTask(t *swf.PollForActivityTaskOutput, result interface{}, err error) (interface{}, error)

func (*ComposedDecisionInterceptor) AfterTaskCanceled

func (c *ComposedDecisionInterceptor) AfterTaskCanceled(t *swf.PollForActivityTaskOutput, details string)

func (*ComposedDecisionInterceptor) AfterTaskComplete

func (c *ComposedDecisionInterceptor) AfterTaskComplete(t *swf.PollForActivityTaskOutput, result interface{})

func (*ComposedDecisionInterceptor) AfterTaskFailed

func (c *ComposedDecisionInterceptor) AfterTaskFailed(t *swf.PollForActivityTaskOutput, err error)

func (*ComposedDecisionInterceptor) BeforeTask

type CoordinatedActivityHandler

type CoordinatedActivityHandler struct {
	Start    CoordinatedActivityHandlerStartFunc
	Tick     CoordinatedActivityHandlerTickFunc
	Cancel   CoordinatedActivityHandlerCancelFunc
	Input    interface{}
	Activity string
}

func NewCoordinatedActivityHandler

func NewCoordinatedActivityHandler(activity string, start interface{}, tick interface{}, cancel interface{}) *CoordinatedActivityHandler

type CoordinatedActivityHandlerCancelFunc

type CoordinatedActivityHandlerCancelFunc func(*swf.PollForActivityTaskOutput, interface{}) error

type CoordinatedActivityHandlerStartFunc

type CoordinatedActivityHandlerStartFunc func(*swf.PollForActivityTaskOutput, interface{}) (interface{}, error)

type CoordinatedActivityHandlerTickFunc

type CoordinatedActivityHandlerTickFunc func(*swf.PollForActivityTaskOutput, interface{}) (bool, interface{}, error)

type FuncInterceptor

type FuncInterceptor struct {
	BeforeTaskFn        func(*swf.PollForActivityTaskOutput)
	AfterTaskFn         func(t *swf.PollForActivityTaskOutput, result interface{}, err error) (interface{}, error)
	AfterTaskCompleteFn func(t *swf.PollForActivityTaskOutput, result interface{})
	AfterTaskFailedFn   func(t *swf.PollForActivityTaskOutput, err error)
	AfterTaskCanceledFn func(t *swf.PollForActivityTaskOutput, details string)
}

FuncInterceptor is a ActivityInterceptor that you can set handler funcs on. if any are unset, they are no-ops.

func (*FuncInterceptor) AfterTask

func (i *FuncInterceptor) AfterTask(activity *swf.PollForActivityTaskOutput, result interface{}, err error) (interface{}, error)

func (*FuncInterceptor) AfterTaskCanceled

func (i *FuncInterceptor) AfterTaskCanceled(activity *swf.PollForActivityTaskOutput, details string)

AfterTaskCanceled runs the AfterTaskCanceledFn if not nil

func (*FuncInterceptor) AfterTaskComplete

func (i *FuncInterceptor) AfterTaskComplete(activity *swf.PollForActivityTaskOutput, result interface{})

AfterTaskComplete runs the AfterTaskCompleteFn if not nil

func (*FuncInterceptor) AfterTaskFailed

func (i *FuncInterceptor) AfterTaskFailed(activity *swf.PollForActivityTaskOutput, err error)

AfterTaskFailed runs the AfterTaskFailedFn if not nil

func (*FuncInterceptor) BeforeTask

func (i *FuncInterceptor) BeforeTask(activity *swf.PollForActivityTaskOutput)

BeforeTask runs the BeforeTaskFn if not nil

type NewGoroutineDispatcher

type NewGoroutineDispatcher struct {
}

NewGoroutineDispatcher is a DecisionTaskDispatcher that runs the decision handler in a new goroutine.

func (*NewGoroutineDispatcher) DispatchTask

DispatchTask calls the handler in a new goroutine.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL