celeriac

package module
v0.0.0-...-79515a2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 28, 2015 License: MIT Imports: 9 Imported by: 0

README

Celeriac

Golang client library for adding support for interacting and monitoring Celery workers and tasks.

It provides functionality to place tasks on the task queue, as well as monitor task and worker events.

Dependencies

This library depends upon the following packages:

  • github.com/streadway/amqp
  • github.com/Sirupsen/logrus
  • github.com/nu7hatch/gouuid
  • github.com/pquerna/ffjson

Usage

Installation: go get github.com/svcavallar/celeriac.v1

This imports a new namespace called celeriac

package main

import (
	"log"
	"os"

	"github.com/svcavallar/celeriac.v1"
)

func main() {
	taskBrokerURI := "amqp://user:pass@localhost:5672/vhost"

	// Connect to RabbitMQ task queue
	TaskQueueMgr, err := celeriac.NewTaskQueueMgr(taskBrokerURI)
	if err != nil {
		log.Printf("Failed to connect to task queue: %v", err)
		os.Exit(-1)
	}

	log.Printf("Service connected to task queue - (URL: %s)", taskBrokerURI)

	// Get the task events from the task events channel
	for {
		select {
		default:
			ev := <-TaskQueueMgr.Monitor.EventsChannel

			if ev != nil {

				if x, ok := ev.(*celeriac.WorkerEvent); ok {
					log.Printf("Task monitor: Worker event - %s", x.Type)
				} else if x, ok := ev.(*celeriac.TaskEvent); ok {
					log.Printf("Task monitor: Task event - %s [ID]: %s", x.Type, x.UUID)
				}

			}
		}
	}
}

Documentation

Overview

Package celeriac is a package for interacting with Celery.

It provides functionality to place tasks on the task queue, as well as monitor task and worker events.

Index

Constants

View Source
const (
	// ConstPublishTaskContentType is the content type of the task data to be published
	ConstPublishTaskContentType = "application/json"

	// ConstPublishTaskContentEncoding is the content encoding type of the task data to be published
	ConstPublishTaskContentEncoding = "utf-8"

	// ConstTaskDefaultExchangeName is the default exchange name to use when publishing a task
	ConstTaskDefaultExchangeName = ""

	// ConstTaskDefaultRoutingKey is the default routing key to use when publishing a task
	ConstTaskDefaultRoutingKey = "celery"

	// ConstTaskControlExchangeName is the exchange name for dispatching task control commands
	ConstTaskControlExchangeName = "celery.pidbox"

	// ConstEventsMonitorExchangeName is the exchange name used for Celery events
	ConstEventsMonitorExchangeName = "celeryev"

	// ConstEventsMonitorExchangeType is the exchange type for the events monitor
	ConstEventsMonitorExchangeType = "topic"

	// ConstEventsMonitorQueueName is the queue name of the events monitor
	ConstEventsMonitorQueueName = "celeriac-events-monitor-queue"

	// ConstEventsMonitorBindingKey is the binding key for the events monitor
	ConstEventsMonitorBindingKey = "*.*"

	// ConstEventsMonitorConsumerTag is the consumer tag name for the events monitor
	ConstEventsMonitorConsumerTag = "celeriac-events-monitor"

	// ConstTimeFormat is the general format for all timestamps
	ConstTimeFormat = "2006-01-02T15:04:05.999999"

	// ConstEventTypeWorkerOnline is the event type when a Celery worker comes online
	ConstEventTypeWorkerOnline string = "worker-online"

	// ConstEventTypeWorkerOffline is the event type when a Celery worker goes offline
	ConstEventTypeWorkerOffline string = "worker-offline"

	// ConstEventTypeWorkerHeartbeat is the event type when a Celery worker is online and "alive"
	ConstEventTypeWorkerHeartbeat string = "worker-heartbeat"

	// ConstEventTypeTaskSent is the event type when a Celery task is sent
	ConstEventTypeTaskSent string = "task-sent"

	// ConstEventTypeTaskReceived is the event type when a Celery worker receives a task
	ConstEventTypeTaskReceived string = "task-received"

	// ConstEventTypeTaskStarted is the event type when a Celery worker starts a task
	ConstEventTypeTaskStarted string = "task-started"

	// ConstEventTypeTaskSucceeded is the event type when a Celery worker completes a task
	ConstEventTypeTaskSucceeded string = "task-succeeded"

	// ConstEventTypeTaskFailed is the event type when a Celery worker fails to complete a task
	ConstEventTypeTaskFailed string = "task-failed"

	// ConstEventTypeTaskRevoked is the event type when a Celery worker has its task revoked
	ConstEventTypeTaskRevoked string = "task-revoked"

	// ConstEventTypeTaskRetried is the event type when a Celery worker retries a task
	ConstEventTypeTaskRetried string = "task-retried"
)

Variables

View Source
var (
	// ErrInvalidTaskID is raised when an invalid task ID has been detected
	ErrInvalidTaskID = errors.New("Invalid task ID specified")

	// ErrInvalidTaskName is raised when an invalid task name has been detected
	ErrInvalidTaskName = errors.New("Invalid task name specified")
)

Global Errors

Functions

func Fail

func Fail(err error, msg string)

Fail logs the error and exits the program Only use this to handle critical errors

func Log

func Log(err error, msg string)

Log only logs the error but doesn't exit the program Use this to log errors that should not exit the program

Types

type Event

type Event struct {
	// Type is the Celery event type. See supported events listed in "constants.go"
	Type string `json:"type"`

	// Hostname is the name of the host on which the Celery worker is operating
	Hostname string `json:"hostname"`

	// Timestamp is the current time of the event
	Timestamp float32 `json:"timestamp"`

	// PID is the process ID
	PID int `json:"pid"`

	// Clock is the current clock time
	Clock int `json:"clock"`

	// UTCOffset is the offset from UTC for the time when this event is valid
	UTCOffset int `json:"utcoffset"`
}

Event defines a base event emitted by Celery workers.

func NewEvent

func NewEvent() *Event

NewEvent is a factory function to create a new Event object

func (*Event) MarshalJSON

func (mj *Event) MarshalJSON() ([]byte, error)

func (*Event) MarshalJSONBuf

func (mj *Event) MarshalJSONBuf(buf fflib.EncodingBuffer) error

func (*Event) TimestampFormatted

func (event *Event) TimestampFormatted() string

TimestampFormatted returns a formatted string representation of the task event timestamp

func (*Event) UnmarshalJSON

func (uj *Event) UnmarshalJSON(input []byte) error

func (*Event) UnmarshalJSONFFLexer

func (uj *Event) UnmarshalJSONFFLexer(fs *fflib.FFLexer, state fflib.FFParseState) error

type PingCmd

type PingCmd struct {
	// contains filtered or unexported fields
}

PingCmd is a wrapper to a command

func NewPingCmd

func NewPingCmd() *PingCmd

NewPingCmd creates a new command for pinging workers

type RateLimitTaskCmd

type RateLimitTaskCmd struct {
	Arguments rateLimitTaskArgs `json:"arguments"`
	// contains filtered or unexported fields
}

RateLimitTaskCmd is a wrapper to a command

func NewRateLimitTaskCmd

func NewRateLimitTaskCmd(taskName string, rateLimit string) *RateLimitTaskCmd

NewRateLimitTaskCmd creates a new command for rate limiting a task

taskName: Name of task to change rate limit for rateLimit: The rate limit as tasks per second, or a rate limit string (`"100/m"`, etc.

see :attr:`celery.task.base.Task.rate_limit` for more information)

type RevokeTaskCmd

type RevokeTaskCmd struct {
	Arguments revokeTaskArgs `json:"arguments"`
	// contains filtered or unexported fields
}

RevokeTaskCmd is a wrapper to a command

func NewRevokeTaskCmd

func NewRevokeTaskCmd(taskID string, terminateProcess bool) *RevokeTaskCmd

NewRevokeTaskCmd creates a new command for revoking a task by given id

If a task is revoked, the workers will ignore the task and not execute it after all.

type Task

type Task struct {
	// TaskName is the name of the task
	TaskName string

	// ID is the task UUID
	ID string

	// Args are task arguments (optional)
	Args []string

	// KWArgs are keyword arguments (optional)
	KWArgs map[string]interface{}

	// Retries is a number of retries to perform if an error occurs (optional)
	Retries int

	// ETA is the estimated completion time (optional)
	ETA time.Time

	// Expires is the time when this task will expire (optional)
	Expires time.Time
}

Task is a representation of a Celery task

func NewTask

func NewTask(taskName string, args []string, kwargs map[string]interface{}) (*Task, error)

NewTask is a factory function that creates and returns a pointer to a new task object

func (*Task) MarshalJSON

func (task *Task) MarshalJSON() ([]byte, error)

MarshalJSON marshals a Task object into a json bytes array

Time properties are converted to UTC and formatted in ISO8601

type TaskEvent

type TaskEvent struct {
	Type      string  `json:"type"`
	Hostname  string  `json:"hostname"`
	Timestamp float32 `json:"timestamp"`
	PID       int     `json:"pid"`
	Clock     int     `json:"clock"`
	UTCOffset int     `json:"utcoffset"`

	// UUID is the id of the task
	UUID string `json:"uuid"`

	// Name is the textual name of the task executed
	Name string `json:"name, omitempty"`

	// Args is a string of the arguments passed to the task
	Args string `json:"args, omitempty"`

	// Kwargs is a string of the key-word arguments passed to the task
	Kwargs string `json:"kwargs, omitempty"`

	// Result is a string containing the result of a completed task
	Result string `json:"result, omitempty"`

	// Runtime is a ...
	Runtime float32 `json:"runtime, omitempty"`

	// Retries is the number of re-tries this task has performed
	Retries int `json:"retries, omitempty"`

	// ETA is the estimated time of completion
	ETA int `json:"eta, omitempty"`

	// Exception is a string containing error/exception information
	Exception string `json:"exception, omitempty"`

	// Traceback is a string containing extended error information
	Traceback string `json:"traceback, omitempty"`

	// Terminated is a flag indicating whether the task has been terminated
	Terminated bool `json:"terminated, omitempty"`

	// Signum is the signal number
	Signum interface{} `json:"signum, omitempty"`

	// Expired is a flag indicating whether the task has expired due to factors
	Expired bool `json:"expired, omitempty"`
}

TaskEvent is the JSON schema for Celery task events

func NewTaskEvent

func NewTaskEvent() *TaskEvent

NewTaskEvent is a factory function to create a new TaskEvent object

func (*TaskEvent) MarshalJSON

func (mj *TaskEvent) MarshalJSON() ([]byte, error)

func (*TaskEvent) MarshalJSONBuf

func (mj *TaskEvent) MarshalJSONBuf(buf fflib.EncodingBuffer) error

func (*TaskEvent) UnmarshalJSON

func (uj *TaskEvent) UnmarshalJSON(input []byte) error

func (*TaskEvent) UnmarshalJSONFFLexer

func (uj *TaskEvent) UnmarshalJSONFFLexer(fs *fflib.FFLexer, state fflib.FFParseState) error

type TaskMonitor

type TaskMonitor struct {

	// Public channel on which events are piped
	EventsChannel chan interface{}
	// contains filtered or unexported fields
}

TaskMonitor is a Celery task event consumer

func NewTaskMonitor

func NewTaskMonitor(connection *amqp.Connection, channel *amqp.Channel,
	exchangeName string, exchangeType string, queueName string, bindingKey string, ctag string) (*TaskMonitor, error)

NewTaskMonitor is a factory function that creates a new Celery consumer

func (*TaskMonitor) SetMonitorWorkerHeartbeatEvents

func (monitor *TaskMonitor) SetMonitorWorkerHeartbeatEvents(processHeartbeatEvents bool)

SetMonitorWorkerHeartbeatEvents sets the property whether to process heartbeat events emitted by workers.

NOTE: By default this is set to 'false' so as to minimize unnecessary "noisy heartbeat" events.

func (*TaskMonitor) Shutdown

func (monitor *TaskMonitor) Shutdown() error

Shutdown stops all monitoring, cleaning up any open connections

type TaskQueueMgr

type TaskQueueMgr struct {
	Monitor *TaskMonitor
	// contains filtered or unexported fields
}

TaskQueueMgr defines a manager for interoperating with a Celery task queue

func NewTaskQueueMgr

func NewTaskQueueMgr(brokerURI string) (*TaskQueueMgr, error)

NewTaskQueueMgr is a factory function that creates a new instance of the TaskQueueMgr

func (*TaskQueueMgr) Close

func (taskQueueMgr *TaskQueueMgr) Close()

Close performs appropriate cleanup of any open task queue connections

func (*TaskQueueMgr) DispatchTask

func (taskQueueMgr *TaskQueueMgr) DispatchTask(taskName string, taskData map[string]interface{}, routingKey string) (*Task, error)

DispatchTask places a new task on the Celery task queue Creates a new Task based on the supplied task name and data

func (*TaskQueueMgr) Ping

func (taskQueueMgr *TaskQueueMgr) Ping() error

Ping attempts to ping Celery workers

func (*TaskQueueMgr) RateLimitTask

func (taskQueueMgr *TaskQueueMgr) RateLimitTask(taskName string, rateLimit string) error

RateLimitTask attempts to set rate limit tasks by type

func (*TaskQueueMgr) RevokeTask

func (taskQueueMgr *TaskQueueMgr) RevokeTask(taskID string) error

RevokeTask attempts to notify Celery workers that the specified task needs revoking

func (*TaskQueueMgr) TimeLimitTask

func (taskQueueMgr *TaskQueueMgr) TimeLimitTask(taskName string, hardLimit string, softLimit string) error

TimeLimitTask attempts to set time limits for task by type

type TimeLimitTaskCmd

type TimeLimitTaskCmd struct {
	Arguments timeLimitTaskArgs `json:"arguments"`
	// contains filtered or unexported fields
}

TimeLimitTaskCmd is a wrapper to a command

func NewTimeLimitTaskCmd

func NewTimeLimitTaskCmd(taskName string, hardLimit string, softLimit string) *TimeLimitTaskCmd

NewTimeLimitTaskCmd creates a new command for rate limiting a task

taskName: Name of task to change rate limit for hardLimit: New hard time limit (in seconds) softLimit: New soft time limit (in seconds)

type WorkerEvent

type WorkerEvent struct {
	Type      string  `json:"type"`
	Hostname  string  `json:"hostname"`
	Timestamp float32 `json:"timestamp"`
	PID       int     `json:"pid"`
	Clock     int     `json:"clock"`
	UTCOffset int     `json:"utcoffset"`

	// SWSystem is the software system being used
	SWSystem string `json:"sw_sys"`

	// SWVersion is the software version being used
	SWVersion string `json:"sw_ver"`

	// LoadAverage is an array of average CPU loadings for the worker
	LoadAverage []float32 `json:"loadavg"`

	// Freq is the worker frequency use
	Freq float32 `json:"freq"`

	// SWIdentity is the software identity
	SWIdentity string `json:"sw_ident"`

	// Processed is the number of items processed
	Processed int `json:"processed, omitempt"`

	// Active is the active number of workers
	Active int `json:"active, omitempty"`
}

WorkerEvent defines an event emitted by workers, specific to its operation. Event "types" emitted are:

  • "worker-online"
  • "worker-offline"
  • "worker-heartbeat"

Example worker event json:

{
	"sw_sys": "Darwin",
	"clock": 74,
	"timestamp": 1843965659.580637,
	"hostname": "celery@Stefans-Mac.local",
	"pid": 10837,
	"sw_ver": "3.1.18",
	"utcoffset": -11,
	"loadavg": [2.0, 2.41, 2.54],
	"processed": 6,
	"active": 0,
	"freq": 2.0,
	"type": "worker-offline",
	"sw_ident": "py-celery"
}

func NewWorkerEvent

func NewWorkerEvent() *WorkerEvent

NewWorkerEvent is a factory function to create a new WorkerEvent object

func (*WorkerEvent) MarshalJSON

func (mj *WorkerEvent) MarshalJSON() ([]byte, error)

func (*WorkerEvent) MarshalJSONBuf

func (mj *WorkerEvent) MarshalJSONBuf(buf fflib.EncodingBuffer) error

func (*WorkerEvent) UnmarshalJSON

func (uj *WorkerEvent) UnmarshalJSON(input []byte) error

func (*WorkerEvent) UnmarshalJSONFFLexer

func (uj *WorkerEvent) UnmarshalJSONFFLexer(fs *fflib.FFLexer, state fflib.FFParseState) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL