client

package
v1.0.0-2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 28, 2019 License: GPL-3.0, MPL-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// This is the protocol version supported by this client.
	// The server might be running an older or newer version.
	ExpectedProtocolVersion = 2
)

Variables

View Source
var (
	Name      = "Faktory"
	License   = "GPLv3"
	Licensing = "Licensed under the GNU Public License 3.0"
	Version   = "1.0.0"
)
View Source
var (
	Everything = JobFilter{
		Regexp: "*",
	}
)
View Source
var (
	// Set this to a non-empty value in a consumer process
	// e.g. see how faktory_worker_go sets this.
	RandomProcessWid = ""
)

Functions

This section is empty.

Types

type Client

type Client struct {
	Location string
	Options  *ClientData
	// contains filtered or unexported fields
}

The Client structure represents a thread-unsafe connection to a Faktory server. It is recommended to use a connection pool of Clients in a multi-threaded process. See faktory_worker_go's internal connection pool for example.

TODO Provide a connection pool as part of this package?

func Dial

func Dial(srv *Server, password string) (*Client, error)

Dial connects to the remote faktory server.

client.Dial(client.Localhost, "topsecret")

func Open

func Open() (*Client, error)

Open connects to a Faktory server based on environment variable conventions:

• Use FAKTORY_PROVIDER to point to a custom URL variable. • Use FAKTORY_URL as a catch-all default.

Use the URL to configure any necessary password:

tcp://:mypassword@localhost:7419

By default Open assumes localhost with no password which is appropriate for local development.

func (*Client) Ack

func (c *Client) Ack(jid string) error

func (*Client) Beat

func (c *Client) Beat() (string, error)

func (*Client) Clear

func (c *Client) Clear(name Structure) error

func (*Client) Close

func (c *Client) Close() error

func (*Client) Discard

func (c *Client) Discard(name Structure, filter JobFilter) error

func (*Client) Fail

func (c *Client) Fail(jid string, err error, backtrace []byte) error

Fail notifies Faktory that a job failed with the given error. If backtrace is non-nil, it is assumed to be the output from runtime/debug.Stack().

func (*Client) Fetch

func (c *Client) Fetch(q ...string) (*Job, error)

func (*Client) Flush

func (c *Client) Flush() error

func (*Client) Generic

func (c *Client) Generic(cmdline string) (string, error)

func (*Client) Info

func (c *Client) Info() (map[string]interface{}, error)

func (*Client) Kill

func (c *Client) Kill(name Structure, filter JobFilter) error

func (*Client) Push

func (c *Client) Push(job *Job) error

func (*Client) Requeue

func (c *Client) Requeue(name Structure, filter JobFilter) error

type ClientData

type ClientData struct {
	Hostname string   `json:"hostname"`
	Wid      string   `json:"wid"`
	Pid      int      `json:"pid"`
	Labels   []string `json:"labels"`
	// Hash is hex(sha256(password + nonce))
	PasswordHash string `json:"pwdhash"`
	// The protocol version used by this client.
	// The server can reject this connection if the version will not work
	// The server advertises its protocol version in the HI.
	Version int `json:"v"`
}

ClientData is serialized to JSON and sent with the HELLO command. PasswordHash is required if the server is not listening on localhost. The WID (worker id) must be random and unique for each worker process. It can be a UUID, etc. Non-worker processes should leave WID empty.

The other elements can be useful for debugging and are displayed on the Busy tab.

type Failure

type Failure struct {
	RetryCount   int      `json:"retry_count"`
	FailedAt     string   `json:"failed_at"`
	NextAt       string   `json:"next_at,omitempty"`
	ErrorMessage string   `json:"message,omitempty"`
	ErrorType    string   `json:"errtype,omitempty"`
	Backtrace    []string `json:"backtrace,omitempty"`
}

type Job

type Job struct {
	// required
	Jid   string        `json:"jid"`
	Queue string        `json:"queue"`
	Type  string        `json:"jobtype"`
	Args  []interface{} `json:"args"`

	// optional
	CreatedAt  string                 `json:"created_at,omitempty"`
	EnqueuedAt string                 `json:"enqueued_at,omitempty"`
	At         string                 `json:"at,omitempty"`
	ReserveFor int                    `json:"reserve_for,omitempty"`
	Retry      int                    `json:"retry,omitempty"`
	Backtrace  int                    `json:"backtrace,omitempty"`
	Failure    *Failure               `json:"failure,omitempty"`
	Custom     map[string]interface{} `json:"custom,omitempty"`
}

func NewJob

func NewJob(jobtype string, args ...interface{}) *Job

func (*Job) GetCustom

func (j *Job) GetCustom(name string) (interface{}, bool)

func (*Job) SetCustom

func (j *Job) SetCustom(name string, value interface{})

type JobFilter

type JobFilter struct {
	Jids    []string `json:"jids,omitempty"`
	Regexp  string   `json:"regexp,omitempty"`
	Jobtype string   `json:"jobtype,omitempty"`
}

func Matching

func Matching(pattern string) JobFilter

This is a generic pattern match across the entire job JSON payload. Be very careful that you don't accidentally match some unintended part of the payload.

NB: your pattern should have * on each side. The pattern is passed directly to Redis.

Example: discard any job retries whose payload contains the special word "uid:12345":

client.Discard(faktory.Retries, faktory.Matching("*uid:12345*"))

See the Redis SCAN documentation for pattern matching examples. https://redis.io/commands/scan

func OfType

func OfType(jobtype string) JobFilter

Matches jobs based on the exact Jobtype. This is pretty fast because it devolves to Matching(`"jobtype":"$ARG"`) and matches within Redis.

func WithJids

func WithJids(jids ...string) JobFilter

Match jobs with the given JIDs. Warning: O(m*n), very slow because it has to pull every job into Faktory and check the JID against the list.

If you pass in a single JID, it will devolve to matching within Redis and perform much faster. For that reason, it might be better to handle one JID at a time.

func (JobFilter) Matching

func (jf JobFilter) Matching(pattern string) JobFilter

func (JobFilter) OfType

func (jf JobFilter) OfType(jobtype string) JobFilter

func (JobFilter) WithJids

func (jf JobFilter) WithJids(jids ...string) JobFilter

type MutateClient

type MutateClient interface {

	// Move the given jobs from structure to the Dead set.
	// Faktory will not touch them anymore but you can still see them in the Web UI.
	//
	// Kill(Retries, OfType("DataSyncJob").WithJids("abc", "123"))
	Kill(name Structure, filter JobFilter) error

	// Move the given jobs to their associated queue so they can be immediately
	// picked up and processed.
	Requeue(name Structure, filter JobFilter) error

	// Throw away the given jobs, e.g. if you want to delete all jobs named "QuickbooksSyncJob"
	//
	//   Discard(Dead, OfType("QuickbooksSyncJob"))
	Discard(name Structure, filter JobFilter) error

	// Empty the entire given structure, e.g. if you want to clear all retries.
	// This is very fast as it is special cased by Faktory.
	Clear(name Structure) error
}

Commands which allow you to perform admin tasks on various Faktory structures. These are NOT designed to be used in business logic but rather for maintenance, data repair, migration, etc. They can have poor scalability or performance edge cases.

Generally these operations are O(n) or worse. They will get slower as your data gets bigger.

type Operation

type Operation struct {
	Cmd    string     `json:"cmd"`
	Target Structure  `json:"target"`
	Filter *JobFilter `json:"filter,omitempty"`
}

type ProtocolError

type ProtocolError struct {
	// contains filtered or unexported fields
}

func (*ProtocolError) Error

func (pe *ProtocolError) Error() string

type Server

type Server struct {
	Network  string
	Address  string
	Password string
	Timeout  time.Duration
	TLS      *tls.Config
}

func DefaultServer

func DefaultServer() *Server

func (*Server) Open

func (s *Server) Open() (*Client, error)

func (*Server) ReadFromEnv

func (s *Server) ReadFromEnv() error

type Structure

type Structure string
const (
	Scheduled Structure = "scheduled"
	Retries   Structure = "retries"
	Dead      Structure = "dead"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL