Documentation ¶
Overview ¶
Package taskhawk is a replacement for celery that works on AWS SQS/SNS, while keeping things pretty simple and straightforward. Any unbound function can be converted into a TaskHawk task.
For inter-service messaging, see Hedwig: https://godoc.org/github.com/Automatic/hedwig-go/hedwig.
Provisioning ¶
Taskhawk works on SQS and SNS as backing queues. Before you can publish tasks, you need to provision the required infra. This may be done manually, or, preferably, using Terraform. Taskhawk provides tools to make infra configuration easier: see Taskhawk Terraform Generator (https://github.com/Automatic/taskhawk-terraform-generator) for further details.
Using Taskhawk ¶
To use taskhawk, convert your function into a "Task" as shown here:
type SendEmailTask struct { taskhawk.Task } func (t *SendEmailTask) Run(context.Context, interface{}) error {...}
Tasks may accept input of arbitrary type as long as it's serializable to JSON.
Then, define a few required settings:
sessionCache := AWSSessionsCache{} settings := taskhawk.Settings{ AWSAccessKey: <YOUR AWS ACCESS KEY>, AWSAccountID: <YOUR AWS ACCOUNT ID>, AWSRegion: <YOUR AWS REGION>, AWSSecretKey: <YOUR AWS SECRET KEY>, Queue: <YOUR TASKHAWK QUEUE>, } taskhawk.InitSettings(settings)
Before the task can be dispatched, it would need to be registered like so:
func NewSendEmailTask() *SendEmailTask { return &SendEmailTask{ Task: taskhawk.Task{ TaskName: "tasks.SendEmailTask", Inputer: func() interface{} { return &SendEmailTaskInput{} }, Publisher: NewPublisher(sessionCache, settings), } } } taskhawk.RegisterTask(NewSendEmailTask())
And finally, dispatch your task asynchronously:
NewSendEmailTask().dispatch(&SendEmailTaskInput{...})
To pass your context, use:
NewSendEmailTask().dispatchWithContext(ctx, &SendEmailTaskInput{...})
If you want to include a custom headers with the message (for example, you can include a request_id field for cross-application tracing), you can set it on the input object (ITaskHeaders interface).
If you want to customize priority, you can do it like so:
NewSendEmailTask().dispatchWithPriority(ctx, taskhawk.PriorityHigh, &SendEmailTaskInput{...})
Tasks are held in SQS queue until they're successfully executed, or until they fail a configurable number of times. Failed tasks are moved to a Dead Letter Queue, where they're held for 14 days, and may be examined for further debugging.
Priority ¶
Taskhawk provides 4 priority queues to use, which may be customized per task, or per message. For more details, see https://godoc.org/github.com/Automatic/taskhawk-go/taskhawk#Priority.
Metadata and Headers ¶
If your input struct satisfies `taskhawk.ITaskMetadata` interface, it'll be filled in with the following attributes:
id: task identifier. This represents a run of a task.
priority: the priority this task message was dispatched with.
receipt: SQS receipt for the task. This may be used to extend message visibility if the task is running longer than expected.
timestamp: task dispatch epoch timestamp (milli-seconds)
version: message format version.
If your input struct satisfies ITaskHeaders interface, it'll be filled with custom headers that the task was dispatched with.
Helper structs that automatically satisfy these interfaces are available in taskhawk library that you may embed in your input struct like so:
type SendEmailTaskInput struct { taskhawk.TaskMetadata taskhawk.TaskHeaders ... }
For a compile time type assertion check, you may add (in global scope):
var _ taskhawk.ITaskMetadata = &SendEmailTaskInput{} var _ taskhawk.ITaskHeaders = &SendEmailTaskInput{}
This snippet won't consume memory or do anything at runtime.
Consumer ¶
A consumer for SQS based workers can be started as following:
consumer := taskhawk.NewQueueConsumer(sessionCache, settings) consumer.ListenForMessages(ctx, &taskhawk.ListenRequest{...})
This is a blocking function, so if you want to listen to multiple priority queues, you'll need to run these on separate goroutines.
A consumer for Lambda based workers can be started as following:
consumer := taskhawk.NewLambdaConsumer(sessionCache, settings) consumer.HandleLambdaEvent(ctx, &snsEvent)
where snsEvent is the event provided by AWS to your Lambda function as described in AWS documentation: https://docs.aws.amazon.com/lambda/latest/dg/eventsources.html#eventsources-sns.
Index ¶
- Constants
- Variables
- func InitSettings(settings *Settings)
- func NewLambdaHandler(consumer ILambdaConsumer) lambda.Handler
- type AWSSessionsCache
- type DefaultHeaders
- type ILambdaConsumer
- type IPublisher
- type IQueueConsumer
- type ITask
- type ITaskHeaders
- type ITaskMetadata
- type ITaskRegistry
- type JSONTime
- type LambdaHandler
- type LambdaRequest
- type ListenRequest
- type PreProcessHookLambdaApp
- type PreProcessHookQueueApp
- type Priority
- type QueueRequest
- type Settings
- type Task
- type TaskHeaders
- type TaskMetadata
- type TaskRegistry
- func (tr *TaskRegistry) Dispatch(taskName string, input interface{}) error
- func (tr *TaskRegistry) DispatchWithContext(ctx context.Context, taskName string, input interface{}) error
- func (tr *TaskRegistry) DispatchWithPriority(ctx context.Context, taskName string, priority Priority, input interface{}) error
- func (tr *TaskRegistry) GetTask(name string) (ITask, error)
- func (tr *TaskRegistry) NewLambdaConsumer(sessionCache *AWSSessionsCache, settings *Settings) ILambdaConsumer
- func (tr *TaskRegistry) NewQueueConsumer(sessionCache *AWSSessionsCache, settings *Settings) IQueueConsumer
- func (tr *TaskRegistry) RegisterTask(task ITask) error
- type Version
Constants ¶
const (
ErrStringTaskNotFound = "Task not found"
)
Error strings used within Taskhawk Error strings are used instead of errors, cause the error stack trace can only be reliably constructed from within the appropriate execution context
Variables ¶
var ErrRetry = errors.New("Retry exception")
ErrRetry indicates that task failed for a known reason and should be retried without logging falure
Functions ¶
func InitSettings ¶
func InitSettings(settings *Settings)
InitSettings initializes a settings object after validating all values and filling in defaults. This function must be called before Settings object is used.
func NewLambdaHandler ¶
func NewLambdaHandler(consumer ILambdaConsumer) lambda.Handler
NewLambdaHandler returns a new lambda Handler that can be started like so:
func main() { lambda.StartHandler(NewLambdaHandler(consumer)) }
If you want to add additional error handle (e.g. panic catch etc), you can always use your own Handler, and call LambdaHandler.Invoke
Types ¶
type AWSSessionsCache ¶
type AWSSessionsCache struct {
// contains filtered or unexported fields
}
AWSSessionsCache is a cache that holds AWS sessions
func NewAWSSessionsCache ¶
func NewAWSSessionsCache() *AWSSessionsCache
NewAWSSessionsCache creates a new session cache
func (*AWSSessionsCache) GetSession ¶
func (c *AWSSessionsCache) GetSession(ctx context.Context) *session.Session
GetSession retrieves a session if it is cached, otherwise creates one
type DefaultHeaders ¶
DefaultHeaders is the type of the function for injecting custom headers for every task
type ILambdaConsumer ¶
type ILambdaConsumer interface { // HandleLambdaInput processes taskhawk messages for Lambda apps and calls the task like so: // // task_fn(input). // // If `input` implements `ITaskMetadata`, metadata will be filled in with the following things: id, version, // header, receipt. // // The error returned by the task is returned to the caller, which should be returned from the Lambda handler. HandleLambdaEvent(ctx context.Context, snsEvent *events.SNSEvent) error }
ILambdaConsumer represents a taskhawk consumer for lambda apps
type IPublisher ¶
type IPublisher interface { // Publish publishes a message on Taskhawk broker Publish(ctx context.Context, message *message) error // Settings returns publisher's settings Settings() *Settings }
IPublisher interface represents all publish related functions
func NewPublisher ¶
func NewPublisher(sessionCache *AWSSessionsCache, settings *Settings) IPublisher
NewPublisher creates a new publisher
type IQueueConsumer ¶
type IQueueConsumer interface { // ListenForMessages starts a taskhawk listener for message types provided and calls the task like so: // // task_fn(input). // // If `input` implements `ITaskMetadata`, metadata will be filled in with the following things: id, version, // header, receipt. // // The message is explicitly deleted only if task function ran successfully. In case of an exception the message is // kept on queue and processed again. If the callback keeps failing, SQS dead letter queue mechanism kicks in and // the message is moved to the dead-letter queue. // // This function never returns by default. Possible shutdown methods: // 1. Cancel the context - returns immediately. // 2. Set a deadline on the context of less than 10 seconds - returns after processing current messages. // 3. Run for limited number of loops by setting LoopCount on the request - returns after running loop a finite // number of times ListenForMessages(ctx context.Context, request *ListenRequest) error }
IQueueConsumer represents a taskhawk consumer for SQS apps
type ITask ¶
type ITask interface { // Name of the task. This is used to serialize/deserialize tasks, and so should be changed carefully Name() string // Priority of the task by default. A publisher _may_ chose to override. Priority() Priority // NewInput returns an empty input struct as expected by the Task's Run method. May be `nil` // If your task needs to get custom headers set during dispatch, implement interface ITaskHeaders, // or embed TaskHeaders // If your task needs to get metadata (message id etc), implement interface ITaskMetadata, or embed TaskMetadata NewInput() interface{} // Run is the main method for a task. The conrete type of the input parameter will be same as whatever is // returned by the NewInput() method. Run(context context.Context, input interface{}) error }
ITask is an interface all TaskHawk tasks are expected to implement
type ITaskHeaders ¶
type ITaskHeaders interface { // SetHeaders sets the headers on a task input SetHeaders(map[string]string) // GetHeaders returns the headers set on a task input GetHeaders() map[string]string }
ITaskHeaders interface needs to be implemented by the input struct if your task needs to get custom headers set during dispatch
type ITaskMetadata ¶
type ITaskMetadata interface { // SetID sets the message id SetID(string) // SetPriority sets the priority a message was dispatched with SetPriority(Priority) // SetReceipt sets the message receipt from SQS. // This may be used to extend visibility timeout for long running tasks SetReceipt(string) // SetTimestamp sets the message dispatch timestamp SetTimestamp(JSONTime) // SetVersion sets the message schema version SetVersion(Version) }
ITaskMetadata interface needs to be implemented by the input struct if your task needs to get metatada ( message id etc)
type ITaskRegistry ¶
type ITaskRegistry interface { // DispatchWithPriority dispatches a task asynchronously with custom priority. // The concrete type of input is expected to be same as the concrete type of NewInput()'s return value. DispatchWithPriority(ctx context.Context, taskName string, priority Priority, input interface{}) error // DispatchWithContext dispatches a task asynchronously with context. // The concrete type of input is expected to be same as the concrete type of NewInput()'s return value. DispatchWithContext(ctx context.Context, taskName string, input interface{}) error // Dispatch a task asynchronously. The concrete type of input is expected to be same as the concrete type of // NewInput()'s return value. Dispatch(taskName string, input interface{}) error // GetTask fetches a task from the task registry GetTask(name string) (ITask, error) // NewLambdaConsumer creates a new taskhawk consumer for lambda apps // // Cancelable context may be used to cancel processing of messages NewLambdaConsumer(sessionCache *AWSSessionsCache, settings *Settings) ILambdaConsumer // NewQueueConsumer creates a new taskhawk consumer for queue apps // // Cancelable context may be used to cancel processing of messages NewQueueConsumer(sessionCache *AWSSessionsCache, settings *Settings) IQueueConsumer // RegisterTask registers the task to the task registry RegisterTask(task ITask) error }
ITaskRegistry is an interface for the task registry to manage tasks
type JSONTime ¶
JSONTime is just a wrapper around time that serializes time to epoch in milliseconds
func (JSONTime) MarshalJSON ¶
MarshalJSON changes time to epoch in milliseconds
func (*JSONTime) UnmarshalJSON ¶
UnmarshalJSON changes time from epoch in milliseconds or string (RFC3339) to time.Time.
type LambdaHandler ¶
type LambdaHandler struct {
// contains filtered or unexported fields
}
LambdaHandler implements Lambda.Handler interface
type LambdaRequest ¶
type LambdaRequest struct { Ctx context.Context Priority Priority Record *events.SNSEventRecord TaskRegistry ITaskRegistry }
LambdaRequest represents a request for lambda apps
type ListenRequest ¶
type ListenRequest struct { // Priority represents the priority queue for fetching messages from. Defaults to PriorityDefault. Priority Priority // NumMessages represents the number of SQS messages to fetch in one API call. Defaults ot 1. NumMessages uint // VisibilityTimeoutS represents the default visibility timeout in seconds for a message. // This is the amount of time you have to process your tasks. // It defaults to whatever is set in the queue configuration. VisibilityTimeoutS uint // LoopCount is the number of loops to run for fetching messages. // This may be used to limit to only certain number of messages. // Defaults to running as an infinite loop until the context is canceled. LoopCount uint }
ListenRequest represents a request to listen for messages
type PreProcessHookLambdaApp ¶
type PreProcessHookLambdaApp func(request *LambdaRequest) error
PreProcessHookLambdaApp is the type of the function for pre-process hook for lambda apps
type PreProcessHookQueueApp ¶
type PreProcessHookQueueApp func(request *QueueRequest) error
PreProcessHookQueueApp is the type of the function for pre-process hook for SQS apps
type Priority ¶
type Priority int
Priority of a task. This may be used to differentiate batch jobs from other tasks for example.
High and low priority queues provide independent scaling knobs for your use-case.
const ( // PriorityDefault is the default priority of a task if nothing is specified. In most cases, // using just the default queue should work fine. PriorityDefault Priority = iota // Keep default first so empty values automatically default PriorityLow PriorityHigh // PriorityBulk queue will typically have different monitoring, and may be used for bulk jobs, // such as sending push notifications to all users. This allows you to effectively // throttle the tasks. PriorityBulk )
Priority for a task
func (Priority) MarshalJSON ¶
MarshalJSON changes Priority to a JSON string
func (*Priority) UnmarshalJSON ¶
UnmarshalJSON changes priority from a JSON string to Priority
type QueueRequest ¶
type QueueRequest struct { Ctx context.Context Priority Priority QueueMessage *sqs.Message QueueName string QueueURL string TaskRegistry ITaskRegistry }
QueueRequest represents a request for queue apps
type Settings ¶
type Settings struct { AWSRegion string AWSAccountID string AWSAccessKey string AWSSecretKey string // AWS read timeout for publisher AWSReadTimeout time.Duration // AWS debug request error logs toggle AWSDebugRequestLogEnabled bool // AWSSessionToken represents temporary credentials (for example, for Lambda apps) AWSSessionToken string // optional; // DefaultHeaders is a function that may be used to inject custom headers into every message, // for example, request id. This hook is called right before dispatch, and // any headers that are explicitly specified when dispatching may override // these headers. DefaultHeaders DefaultHeaders // optional; // IsLambdaApp indicates if this is a lambda app (which uses SNS instead of SQS). IsLambdaApp bool // PreProcessHookQueueApp is a function which can be used to plug into the message processing pipeline // BEFORE any processing happens. This hook may be used to perform // initializations such as set up a global request id based on message // headers. PreProcessHookQueueApp PreProcessHookQueueApp // optional; // PreProcessHookLambdaApp is a function which can be used to plug into the message processing pipeline // BEFORE any processing happens. This hook may be used to perform // initializations such as set up a global request id based on message // headers. PreProcessHookLambdaApp PreProcessHookLambdaApp // optional; // Queue is the name of the taskhawk queue for this project (exclude the TASKHAWK- prefix) Queue string // ShutdownTimeout is the time the app has to shut down before being brutally killed ShutdownTimeout time.Duration // optional; defaults to 10s // Sync changes taskhawk dispatch to synchronous mode. This is similar // to Celery's Eager mode and is helpful for integration testing Sync bool }
Settings is used to create Taskhawk settings
type Task ¶
type Task struct { // TaskName represents the name of the task TaskName string // Inputer is a function that returns an empty input object as the task expects. // This is optional and if not specified, it implies task doesn't require input Inputer inputer // DefaultPriority is the default priority of a task. This may be overridden at a specific message level. DefaultPriority Priority }
Task is a base struct that should be embedded in all TaskHawk tasks. It provides partial implementation of the ITask interface by implementing a few methods
type TaskHeaders ¶
TaskHeaders provides a default implementation for ITaskHeaders and may be embedded in your input struct
func (*TaskHeaders) GetHeaders ¶
func (h *TaskHeaders) GetHeaders() map[string]string
GetHeaders returns the custom headers passed when the task was dispatched
func (*TaskHeaders) SetHeaders ¶
func (h *TaskHeaders) SetHeaders(headers map[string]string)
SetHeaders sets the custom headers passed when the task was dispatched
type TaskMetadata ¶
type TaskMetadata struct { ID string Priority Priority Receipt string Timestamp JSONTime Version Version }
TaskMetadata provides a default implementation for ITaskMetadata and may be embedded in your input struct
func (*TaskMetadata) SetPriority ¶
func (m *TaskMetadata) SetPriority(priority Priority)
SetPriority sets the priority a message was dispatched with
func (*TaskMetadata) SetReceipt ¶
func (m *TaskMetadata) SetReceipt(receipt string)
SetReceipt sets the message receipt from SQS. This may be used to extend visibility timeout for long running tasks
func (*TaskMetadata) SetTimestamp ¶
func (m *TaskMetadata) SetTimestamp(time JSONTime)
SetTimestamp sets the message dispatch timestamp
func (*TaskMetadata) SetVersion ¶
func (m *TaskMetadata) SetVersion(version Version)
SetVersion sets the message schema version
type TaskRegistry ¶
type TaskRegistry struct {
// contains filtered or unexported fields
}
TaskRegistry manages and dispatches tasks registered to this registry
func NewTaskRegistry ¶
func NewTaskRegistry(publisher IPublisher) (*TaskRegistry, error)
NewTaskRegistry creates a task registry
func (*TaskRegistry) Dispatch ¶
func (tr *TaskRegistry) Dispatch(taskName string, input interface{}) error
Dispatch a task asynchronously. The concrete type of input is expected to be same as the concrete type of NewInput()'s return value.
func (*TaskRegistry) DispatchWithContext ¶
func (tr *TaskRegistry) DispatchWithContext(ctx context.Context, taskName string, input interface{}) error
DispatchWithContext dispatches a task asynchronously with context. The concrete type of input is expected to be same as the concrete type of NewInput()'s return value.
func (*TaskRegistry) DispatchWithPriority ¶
func (tr *TaskRegistry) DispatchWithPriority(ctx context.Context, taskName string, priority Priority, input interface{}) error
DispatchWithPriority dispatches a task asynchronously with custom priority. The concrete type of input is expected to be same as the concrete type of NewInput()'s return value.
func (*TaskRegistry) GetTask ¶
func (tr *TaskRegistry) GetTask(name string) (ITask, error)
GetTask fetches a task from the task registry
func (*TaskRegistry) NewLambdaConsumer ¶
func (tr *TaskRegistry) NewLambdaConsumer(sessionCache *AWSSessionsCache, settings *Settings) ILambdaConsumer
NewLambdaConsumer creates a new taskhawk consumer for lambda apps
Cancelable context may be used to cancel processing of messages
func (*TaskRegistry) NewQueueConsumer ¶
func (tr *TaskRegistry) NewQueueConsumer(sessionCache *AWSSessionsCache, settings *Settings) IQueueConsumer
NewQueueConsumer creates a new taskhawk consumer for queue apps
Cancelable context may be used to cancel processing of messages
func (*TaskRegistry) RegisterTask ¶
func (tr *TaskRegistry) RegisterTask(task ITask) error
RegisterTask registers the task to the task registry
type Version ¶
type Version string
Version represents the message format version
const ( // Version1_0 represents the first version of the message format schema Version1_0 Version = "1.0" // CurrentVersion represents the current version of the taskhawk message schema CurrentVersion = Version1_0 )