celery

package module
v0.0.0-...-d4e8c32 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 8, 2022 License: Apache-2.0 Imports: 12 Imported by: 0

README

xk6-celery

This is a k6 extension using the xk6 system to generate load on celery worker.

Build

To build a k6 binary with this extension, first ensure you have the prerequisites:

Then:

  1. Install xk6:
go install go.k6.io/xk6/cmd/xk6@latest
  1. Build the binary:
xk6 build --with github.com/fornfrey/xk6-celery@latest

Example

Celery App

❗ For extension to work ensure --task-events flag is enabled.

Run

celery -A tasks worker --task-events

tasks.py

from celery import Celery

app = Celery('tasks', broker='amqp://localhost:5672')
app.conf.update(task_send_sent_event=True)  # to collect child tasks metrics ensure this flag is enabled

@app.task
def add(x, y):
    print(x + y)

@app.task
def parent():
    print("parent")
    child.apply_async()

@app.task
def child():
    print("child")
k6

Run

k6 run -u 1 -i 1 script.js

script.js

import celery from 'k6/x/celery';

const client = celery.connect("amqp://localhost:5672", "celery");

export function teardown() {
  client.close()
}

export default function () {
    client.runTask({
        name: "tasks.add",
        args: [2, 3]
})  // blocks until task is done
    client.runTask({
        name: "tasks.parent",
        args: []
    })   // blocks until all child tasks are done
}

Result output:


          /\      |‾‾| /‾‾/   /‾‾/   
     /\  /  \     |  |/  /   /  /    
    /  \/    \    |     (   /   ‾‾\  
   /          \   |  |\  \ |  (‾)  | 
  / __________ \  |__| \__\ \_____/ .io

  execution: local
     script: script.js
     output: -

  scenarios: (100.00%) 1 scenario, 1 max VUs, 10m30s max duration (incl. graceful stop):
           * default: 1 iterations shared among 1 VUs (maxDuration: 10m0s, gracefulStop: 30s)


running (00m00.0s), 0/1 VUs, 1 complete and 0 interrupted iterations
default ✓ [======================================] 1 VUs  00m00.0s/10m0s  1/1 shared iters

     █ teardown

     celery_task_queue_time...: avg=2.48ms  min=1.15ms   med=3.02ms   max=3.27ms  p(90)=3.22ms  p(95)=3.24ms 
     celery_task_runtime......: avg=5.09ms  min=574.35µs med=813.72µs max=13.89ms p(90)=11.27ms p(95)=12.58ms
     celery_tasks.............: 2       74.081224/s
     celery_tasks_child.......: 1       37.040612/s
     celery_tasks_succeeded...: 100.00% ✓ 3          ✗ 0
     celery_tasks_total.......: 3       111.121836/s
     data_received............: 0 B     0 B/s
     data_sent................: 0 B     0 B/s
     iteration_duration.......: avg=12.44ms min=3.44ms   med=12.44ms  max=21.44ms p(90)=19.64ms p(95)=20.54ms
     iterations...............: 1       37.040612/s

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func RegisterSerializer

func RegisterSerializer(name string, serializer Serializer, contentType, encoding string)

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

func (*Client) Close

func (client *Client) Close()

func (*Client) RunTask

func (client *Client) RunTask(sig *TaskSignature, vu modules.VU, vuMetrics *celeryMetrics) error

type ModuleInstance

type ModuleInstance struct {
	*RootModule
	// contains filtered or unexported fields
}

func (*ModuleInstance) Connect

func (moduleInstance *ModuleInstance) Connect(brokerUrl string, queueName string) *goja.Object

func (*ModuleInstance) Exports

func (mi *ModuleInstance) Exports() modules.Exports

type RootModule

type RootModule struct {
	// contains filtered or unexported fields
}

func (*RootModule) NewModuleInstance

func (root *RootModule) NewModuleInstance(vu modules.VU) modules.Instance

type Serializer

type Serializer func(celeryBody []any) ([]byte, error)

type TaskSignature

type TaskSignature struct {
	Name       string
	Args       []any
	Kwargs     map[string]any
	Serializer string
}

func (*TaskSignature) Publishing

func (sig *TaskSignature) Publishing(taskId string) (amqp091.Publishing, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL