handlers

package
v4.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 27, 2021 License: MIT Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// APIRequestTask marks a task as an API request task
	APIRequestTask queue.TaskType = "api-request"
)
View Source
var (
	// ErrNoHandlerFound occurs when dispatcher can'f find a registered handler for a task type
	ErrNoHandlerFound = errors.New("no handler found")
)
View Source
var (
	ErrSerializingHearbeat = errors.New("failed to serialize progress payload while sending heartbeat")
)

Functions

func NewAPIRequestHandler

func NewAPIRequestHandler(tokenHeaderName string, tokenCreator tokens.Creator, client *http.Client) queue.TaskHandler

NewAPIRequestHandler creates a task handler that makes an HTTP request to a target API. The response from the request must be valid JSON or a stream of new line-separated JSON objects, otherwise the task will fail.

func NewDispatchHandler

func NewDispatchHandler(handlers map[queue.TaskType]queue.TaskHandler) queue.TaskHandler

NewDispatchHandler creates a task handler that will dispatch tasks to other handlers

func NewJSONAPIHandler added in v4.4.0

func NewJSONAPIHandler(client clients.BaseAPIClient) queue.TaskHandler

NewJSONAPIHandler creates a task handler that makes an JSON HTTP request to a target API using the provided BaseAPIClient.

The response from the request must be valid JSON or a stream of new line-separated JSON objects, otherwise the task will fail

The BaseAPIClient is responsible for bringing its own TokenProvider.

The NewAPIRequestHandler can be preferred if the request is not a JSON payload.

The NewJSONAPIHandler can be preferred because it is easier to mock the BaseAPIClient for tests.

Example usage:

client := clients.NewBaseAPIClient(
	"", // use an empty baseURL because the task spec will hold the URL
	"X-Auth",
	clients.TokenProviderFromCreator(&creator, "apiRequestTask", tokens.Options{}),
	http.DefaultClient,
	false,
)
handler := NewJSONAPIHandler(client)

Alternatively, use it within your custom task handler, this is required if the client behavior is dependent on the task spec:

type customHandler struct {
	tracing.Tracer
}
func (h customHandler) Process(ctx context.Context, task queue.Task, heartbeats chan<- queue.Progress) (err error) {
	span, ctx := h.StartSpan(ctx, "Process")
	defer func() {
		close(heartbeats)
		heartbeats = nil
		h.FinishSpan(span, err)
	}()

	var spec tasks.CustomSpec
	err = json.Unmarshal(task.Spec, &spec)
	if err != nil {
		return err
	}

	creator := specSpecificTokenCreator{
		projectID: spec.ProjectID,
	}
	client := clients.NewBaseAPIClient(
		"", // use an empty baseURL because the task spec will hold the URL
		"Auth",
		clients.TokenProviderFromCreator(&creator, "apiRequestTask", tokens.Options{}),
		http.DefaultClient,
		false,
	)
	client = clients.WithRetry(client, maxAttempts, backoff.Exponential())
	taskHandler := handlers.NewJSONAPIHandler(client)

	return taskHandler.Process(ctx, task, heartbeats)
}

func NewSQLTaskHandler

func NewSQLTaskHandler(name string, db *sql.DB) queue.TaskHandler

NewSQLTaskHandler creates a sqlTaskHandler handler instance with the given tracing name

Types

type APIRequestProgress

type APIRequestProgress struct {
	// Stage is the current stage of the API request task
	Stage APIRequestStage `json:"stage,omitempty"`
	// Duration of the HTTP request
	Duration *time.Duration `json:"duration,omitempty"`
	// ReturnedStatus is a status returned from the target endpoint
	ReturnedStatus *int `json:"returnedStatus,omitempty"`
	// ReturnedBody is a body returned from the target endpoint
	ReturnedBody *string `json:"returnedBody,omitempty"`
	// ErrorMessage contains an error message string if it occurs during the update process
	ErrorMessage *string `json:"errorMessage,omitempty"`
}

APIRequestProgress describes the progress of the API request task stored during the heartbeat handling

type APIRequestStage

type APIRequestStage string
var (
	// RequestPreparing means the task is preparing the request parameters and the body
	RequestPreparing APIRequestStage = "preparing"
	// RequestPending means the request was sent, awaiting the response
	RequestPending APIRequestStage = "pending"
	// RequestResponse means the response was received
	RequestResponse APIRequestStage = "response"
)

type APIRequestTaskSpec

type APIRequestTaskSpec struct {
	// Method to use for the API request
	Method string `json:"method"`
	// URL is the target URL for the request.
	// Must be an absolute URL that contains the scheme and the host components.
	URL string `json:"url"`
	// RequestBody to send
	RequestBody string `json:"requestBody"`
	// RequestHeaders to send
	RequestHeaders map[string]string `json:"requestHeaders"`
	// Authorized if `true` the task will send a header with the
	// signed JWT token as a part of the request
	Authorized bool `json:"authorized"`
	// ExpectedStatus is an HTTP status expected as a response.
	// If it does not match the actual status the task fails
	ExpectedStatus int `json:"expectedStatus"`
}

APIRequestTaskSpec describes the specification of the API request task

type SQLExecTaskSpec

type SQLExecTaskSpec struct {
	// SQL is the actual sql that will be run
	SQL string `json:"sql"`
}

SQLExecTaskSpec defines a task that simply executes a single SQL statement. This can be used for simple CRON cleanup tasks, for example.

type SQLTaskProgress

type SQLTaskProgress struct {
	// Duration of the HTTP request in milliseconds
	Duration *int64 `json:"duration,omitempty"`
	// RowsAffected
	RowsAffected *int64 `json:"rowsAffected,omitempty"`
	// ErrorMessage contains an error message string if it occurs during the update process
	ErrorMessage *string `json:"errorMessage,omitempty"`
}

SQLTaskProgress contains the generic progress information for a sql task

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL