Documentation ¶
Index ¶
- Variables
- func NewAPIRequestHandler(tokenHeaderName string, tokenCreator tokens.Creator, client *http.Client) queue.TaskHandler
- func NewDispatchHandler(handlers map[queue.TaskType]queue.TaskHandler) queue.TaskHandler
- func NewJSONAPIHandler(client clients.BaseAPIClient) queue.TaskHandler
- func NewJSONAPIHandlerWithErrorChecker(client clients.BaseAPIClient, checker CheckForErrorFunction) queue.TaskHandler
- func NewSQLTaskHandler(name string, db *sql.DB) queue.TaskHandler
- type APIRequestProgress
- type APIRequestStage
- type APIRequestTaskSpec
- type CheckForErrorFunction
- type SQLExecTaskSpec
- type SQLTaskProgress
Constants ¶
This section is empty.
Variables ¶
var ( // APIRequestTask marks a task as an API request task APIRequestTask queue.TaskType = "api-request" )
var ( // ErrNoHandlerFound occurs when dispatcher can'f find a registered handler for a task type ErrNoHandlerFound = errors.New("no handler found") )
var (
ErrSerializingHearbeat = errors.New("failed to serialize progress payload while sending heartbeat")
)
Functions ¶
func NewAPIRequestHandler ¶
func NewAPIRequestHandler(tokenHeaderName string, tokenCreator tokens.Creator, client *http.Client) queue.TaskHandler
NewAPIRequestHandler creates a task handler that makes an HTTP request to a target API. The response from the request must be valid JSON or a stream of new line-separated JSON objects, otherwise the task will fail.
func NewDispatchHandler ¶
func NewDispatchHandler(handlers map[queue.TaskType]queue.TaskHandler) queue.TaskHandler
NewDispatchHandler creates a task handler that will dispatch tasks to other handlers
func NewJSONAPIHandler ¶ added in v4.4.0
func NewJSONAPIHandler(client clients.BaseAPIClient) queue.TaskHandler
NewJSONAPIHandler creates a task handler that makes an JSON HTTP request to a target API using the provided BaseAPIClient.
The response from the request must be valid JSON or a stream of new line-separated JSON objects, otherwise the task will fail
The BaseAPIClient is responsible for bringing its own TokenProvider.
The NewAPIRequestHandler can be preferred if the request is not a JSON payload.
The NewJSONAPIHandler can be preferred because it is easier to mock the BaseAPIClient for tests.
Example usage:
client := clients.NewBaseAPIClient( "", // use an empty baseURL because the task spec will hold the URL "X-Auth", clients.TokenProviderFromCreator(&creator, "apiRequestTask", tokens.Options{}), http.DefaultClient, false, ) handler := NewJSONAPIHandler(client)
Alternatively, use it within your custom task handler, this is required if the client behavior is dependent on the task spec:
type customHandler struct { tracing.Tracer } func (h customHandler) Process(ctx context.Context, task queue.Task, heartbeats chan<- queue.Progress) (err error) { span, ctx := h.StartSpan(ctx, "Process") defer func() { close(heartbeats) heartbeats = nil h.FinishSpan(span, err) }() var spec tasks.CustomSpec err = json.Unmarshal(task.Spec, &spec) if err != nil { return err } creator := specSpecificTokenCreator{ projectID: spec.ProjectID, } client := clients.NewBaseAPIClient( "", // use an empty baseURL because the task spec will hold the URL "Auth", clients.TokenProviderFromCreator(&creator, "apiRequestTask", tokens.Options{}), http.DefaultClient, false, ) client = clients.WithRetry(client, maxAttempts, backoff.Exponential()) taskHandler := handlers.NewJSONAPIHandler(client) return taskHandler.Process(ctx, task, heartbeats) }
func NewJSONAPIHandlerWithErrorChecker ¶ added in v4.7.0
func NewJSONAPIHandlerWithErrorChecker(client clients.BaseAPIClient, checker CheckForErrorFunction) queue.TaskHandler
func NewSQLTaskHandler ¶
func NewSQLTaskHandler(name string, db *sql.DB) queue.TaskHandler
NewSQLTaskHandler creates a sqlTaskHandler handler instance with the given tracing name
Types ¶
type APIRequestProgress ¶
type APIRequestProgress struct { // Stage is the current stage of the API request task Stage APIRequestStage `json:"stage,omitempty"` // Duration of the HTTP request Duration *time.Duration `json:"duration,omitempty"` // ReturnedStatus is a status returned from the target endpoint ReturnedStatus *int `json:"returnedStatus,omitempty"` // ReturnedBody is a body returned from the target endpoint ReturnedBody *string `json:"returnedBody,omitempty"` // ErrorMessage contains an error message string if it occurs during the update process ErrorMessage *string `json:"errorMessage,omitempty"` }
APIRequestProgress describes the progress of the API request task stored during the heartbeat handling
type APIRequestStage ¶
type APIRequestStage string
var ( // RequestPreparing means the task is preparing the request parameters and the body RequestPreparing APIRequestStage = "preparing" // RequestPending means the request was sent, awaiting the response RequestPending APIRequestStage = "pending" // RequestResponse means the response was received RequestResponse APIRequestStage = "response" )
type APIRequestTaskSpec ¶
type APIRequestTaskSpec struct { // Method to use for the API request Method string `json:"method"` // URL is the target URL for the request. // Must be an absolute URL that contains the scheme and the host components. URL string `json:"url"` // RequestBody to send RequestBody string `json:"requestBody"` // RequestHeaders to send RequestHeaders map[string]string `json:"requestHeaders"` // Authorized if `true` the task will send a header with the // signed JWT token as a part of the request Authorized bool `json:"authorized"` // ExpectedStatus is an HTTP status expected as a response. // If it does not match the actual status the task fails ExpectedStatus int `json:"expectedStatus"` }
APIRequestTaskSpec describes the specification of the API request task
type CheckForErrorFunction ¶ added in v4.7.0
type CheckForErrorFunction func(m json.RawMessage) error
CheckForErrorStatus checks if the response contains an error status
type SQLExecTaskSpec ¶
type SQLExecTaskSpec struct { // SQL is the actual sql that will be run SQL string `json:"sql"` }
SQLExecTaskSpec defines a task that simply executes a single SQL statement. This can be used for simple CRON cleanup tasks, for example.
type SQLTaskProgress ¶
type SQLTaskProgress struct { // Duration of the HTTP request in milliseconds Duration *int64 `json:"duration,omitempty"` // RowsAffected RowsAffected *int64 `json:"rowsAffected,omitempty"` // ErrorMessage contains an error message string if it occurs during the update process ErrorMessage *string `json:"errorMessage,omitempty"` }
SQLTaskProgress contains the generic progress information for a sql task