Documentation
¶
Overview ¶
Package poller contains TaskPollers and PollerShutdownManager to help workaround the 'phantom' task assignment in SWF, by alowing your pollers to wait until any in-flight polls are done to shut down.
TaskPollers ¶
DecisionTaskPoller and ActivityTaskPoller facilitate proper usage of the PollForDecisionTask and PollForActivityTask endpoints in the SWF API. These endpoints are used by DecisionTask and ActivityTask workers to claim tasks on which to work. The endpoints use long polling. SWF will keep the request open for up to 60 seconds before returning an 'empty' response. If a task is generated before that time, a non-empty task is delivered (and assigned to) a particular polling client.
There is an unfortunate bug in SWF that occurs when a long-polling request gets terminated client side, rather than waiting for the SWF API to respond. SWF does not recognize this condition so it can result in assigning a task to a disconnected worker, which will subsequently cause the task to timeout. This is not terrible if the task has a short timeout but can cause big delays if the task does have a long timeout.
Both types of pollers allow you to manage polling yourself by calling Poll() directly. However it is recommended that you use the PollUntilShutdownBy(...) function, which works in concert with a PollerShutdownManager to await all in-flight polls to complete. This facilitates clean shutdown of end user processes.
PollerShutdownManager ¶
When PollerShutdownManager.ShutdownPollers() is called, it will signal any registered pollers to shut down once any in-flight polls have completed, and block until this happens. The shutdown process can take up to 60 seconds due to the length of SWF long polls before an empty response is returned.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ActivityOps ¶
type ActivityOps interface {
PollForActivityTask(req *swf.PollForActivityTaskInput) (resp *swf.PollForActivityTaskOutput, err error)
}
type ActivityTaskPoller ¶
type ActivityTaskPoller struct { Identity string Domain string TaskList string // contains filtered or unexported fields }
ActivityTaskPoller polls a given task list in a domain for activity tasks, and sends tasks on its Tasks channel.
func NewActivityTaskPoller ¶
func NewActivityTaskPoller(awc ActivityOps, domain string, identity string, taskList string) *ActivityTaskPoller
NewActivityTaskPoller returns an ActivityTaskPoller.
func (*ActivityTaskPoller) Poll ¶
func (p *ActivityTaskPoller) Poll() (*swf.PollForActivityTaskOutput, error)
Poll polls the task list for a task. If there is no task, nil is returned. If an error is encountered, no task is returned.
func (*ActivityTaskPoller) PollUntilShutdownBy ¶
func (p *ActivityTaskPoller) PollUntilShutdownBy(mgr *ShutdownManager, pollerName string, onTask func(*swf.PollForActivityTaskOutput))
PollUntilShutdownBy will poll until signaled to shutdown by the ShutdownManager. this func blocks, so run it in a goroutine if necessary. The implementation calls Poll() and invokes the callback whenever a valid PollForActivityTaskResponse is received.
type DecisionOps ¶
type DecisionOps interface {
PollForDecisionTaskPages(*swf.PollForDecisionTaskInput, func(*swf.PollForDecisionTaskOutput, bool) bool) error
}
SWFOps is the subset of the swf.SWF api used by pollers
type DecisionTaskPoller ¶
type DecisionTaskPoller struct { Identity string Domain string TaskList string // contains filtered or unexported fields }
DecisionTaskPoller polls a given task list in a domain for decision tasks.
func NewDecisionTaskPoller ¶
func NewDecisionTaskPoller(dwc DecisionOps, domain string, identity string, taskList string) *DecisionTaskPoller
NewDecisionTaskPoller returns a DecisionTaskPoller whick can be used to poll the given task list.
func (*DecisionTaskPoller) Poll ¶
func (p *DecisionTaskPoller) Poll(taskReady func(*swf.PollForDecisionTaskOutput) bool) (*swf.PollForDecisionTaskOutput, error)
Poll polls the task list for a task. If there is no task available, nil is returned. If an error is encountered, no task is returned.
func (*DecisionTaskPoller) PollUntilShutdownBy ¶
func (p *DecisionTaskPoller) PollUntilShutdownBy(mgr *ShutdownManager, pollerName string, onTask func(*swf.PollForDecisionTaskOutput), taskReady func(*swf.PollForDecisionTaskOutput) bool)
PollUntilShutdownBy will poll until signaled to shutdown by the PollerShutdownManager. this func blocks, so run it in a goroutine if necessary. The implementation calls Poll() and invokes the callback whenever a valid PollForDecisionTaskResponse is received.
type ShutdownManager ¶
type ShutdownManager struct {
// contains filtered or unexported fields
}
ShutdownManager facilitates cleanly shutting down pollers when the application decides to exit. When StopPollers() is called it will send to each of the stopChan that have been registered, then recieve from each of the ackChan that have been registered. At this point StopPollers() returns.
func NewShutdownManager ¶
func NewShutdownManager() *ShutdownManager
NewShutdownManager creates a ShutdownManager
func (*ShutdownManager) Deregister ¶
func (p *ShutdownManager) Deregister(name string)
Deregister removes a registered pair of channels from the shutdown manager.
func (*ShutdownManager) Register ¶
func (p *ShutdownManager) Register(name string, stopChan chan bool, ackChan chan bool)
Register registers a named pair of channels to the shutdown manager. Buffered channels please!
func (*ShutdownManager) StopPollers ¶
func (p *ShutdownManager) StopPollers()
StopPollers blocks until it is able to stop all the registered pollers, which can take up to 60 seconds. the registered pollers are cleared once all pollers have acked the stop.