worker

package
v0.0.0-...-96d8389 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 16, 2024 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Overview

The worker package helps developers to develop Gearman's worker in an easy way.

Index

Examples

Constants

View Source
const (
	Unlimited = iota
	OneByOne
)

Variables

View Source
var (
	ErrNoneAgents = errors.New("None active agents")
	ErrNoneFuncs  = errors.New("None functions")
	ErrTimeOut    = errors.New("Executing time out")
	ErrUnknown    = errors.New("Unknown error")
)

Functions

func MemInfo

func MemInfo(job Job) ([]byte, error)

func SysInfo

func SysInfo(job Job) ([]byte, error)

Types

type ErrorHandler

type ErrorHandler func(error)

An error handler

type Job

type Job interface {
	Err() error
	Data() []byte
	Fn() string
	SendWarning(data []byte)
	SendData(data []byte)
	UpdateStatus(numerator, denominator int)
	Handle() string
	UniqueId() string
}

type JobFunc

type JobFunc func(Job) ([]byte, error)

type JobHandler

type JobHandler func(Job) error

Job handler

type Worker

type Worker struct {
	sync.Mutex

	Id           string
	ErrorHandler ErrorHandler
	JobHandler   JobHandler
	// contains filtered or unexported fields
}

Worker is the only structure needed by worker side developing. It can connect to multi-server and grab jobs.

Example
package main

import (
	"fmt"
	"sync"

	rt "github.com/drawks/gearhulk/pkg/runtime"
	"github.com/drawks/gearhulk/worker"
)

func main() {
	// An example of worker
	w := worker.New(worker.Unlimited)
	defer w.Close()
	// Add a gearman job server
	if err := w.AddServer(rt.Network, "127.0.0.1:4730"); err != nil {
		fmt.Println(err)
		return
	}
	// A function for handling jobs
	foobar := func(job worker.Job) ([]byte, error) {
		return nil, nil
	}
	// Add the function to worker
	if err := w.AddFunc("foobar", foobar, 0); err != nil {
		fmt.Println(err)
		return
	}
	var wg sync.WaitGroup
	// A custome handler, for handling other results, eg. ECHO, dtError.
	w.JobHandler = func(job worker.Job) error {
		if job.Err() == nil {
			fmt.Println(string(job.Data()))
		} else {
			fmt.Println(job.Err())
		}
		wg.Done()
		return nil
	}
	// An error handler for handling worker's internal errors.
	w.ErrorHandler = func(e error) {
		fmt.Println(e)
		// Ignore the error or shutdown the worker
	}
	// Tell Gearman job server: I'm ready!
	if err := w.Ready(); err != nil {
		fmt.Println(err)
		return
	}
	// Running main loop
	go w.Work()
	wg.Add(1)
	// calling Echo
	w.Echo([]byte("Hello"))
	// Waiting results
	wg.Wait()
}
Output:

Hello

func New

func New(limit int) (worker *Worker)

Return a worker.

If limit is set to Unlimited(=0), the worker will grab all jobs and execute them parallelly. If limit is greater than zero, the number of paralled executing jobs are limited under the number. If limit is assigned to OneByOne(=1), there will be only one job executed in a time.

func (*Worker) AddFunc

func (worker *Worker) AddFunc(funcname string,
	f JobFunc, timeout uint32) (err error)

Add a function. Set timeout as Unlimited(=0) to disable executing timeout.

func (*Worker) AddServer

func (worker *Worker) AddServer(net, addr string) (err error)

Add a Gearman job server.

addr should be formatted as 'host:port'.

func (*Worker) Agents

func (w *Worker) Agents() int

func (*Worker) Close

func (worker *Worker) Close()

Close connection and exit main loop

func (*Worker) Echo

func (worker *Worker) Echo(data []byte)

Echo

func (*Worker) Ready

func (worker *Worker) Ready() (err error)

Connect to Gearman server and tell every server what can this worker do.

func (*Worker) Reconnect

func (worker *Worker) Reconnect() error

func (*Worker) RemoveFunc

func (worker *Worker) RemoveFunc(funcname string) (err error)

Remove a function.

func (*Worker) Reset

func (worker *Worker) Reset()

Remove all of functions. Both from the worker and job servers.

func (*Worker) Running

func (w *Worker) Running() (string, int)

func (*Worker) SetId

func (worker *Worker) SetId(id string)

Set the worker's unique id.

func (*Worker) Shutdown

func (worker *Worker) Shutdown()

func (*Worker) Work

func (worker *Worker) Work()

Main loop, block here Most of time, this should be evaluated in goroutine.

type WorkerDisconnectError

type WorkerDisconnectError struct {
	// contains filtered or unexported fields
}

Error type passed when a worker connection disconnects

func (*WorkerDisconnectError) Error

func (e *WorkerDisconnectError) Error() string

func (*WorkerDisconnectError) Reconnect

func (e *WorkerDisconnectError) Reconnect() (err error)

Responds to the error by asking the worker to reconnect

func (*WorkerDisconnectError) Server

func (e *WorkerDisconnectError) Server() (net string, addr string)

Which server was this for?

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL