Documentation
¶
Index ¶
- Constants
- func ClearAll() *ekaerr.Error
- func Empty() *ekaerr.Error
- func Init(options ...Option) *ekaerr.Error
- func Run() *ekaerr.Error
- func Stop()
- type Bokchoy
- func (b *Bokchoy) ClearAll() *ekaerr.Error
- func (b *Bokchoy) Empty() *ekaerr.Error
- func (b *Bokchoy) Publish(queueName string, payload interface{}, options ...Option) (*Task, *ekaerr.Error)
- func (b *Bokchoy) Queue(name string, options ...Option) *Queue
- func (b *Bokchoy) Run() *ekaerr.Error
- func (b *Bokchoy) Stop()
- func (b *Bokchoy) Use(queueName string, handlers ...HandlerFunc) *Bokchoy
- type Broker
- type BrokerStats
- type Color
- type HandlerFunc
- type Option
- func WithBroker(broker Broker) Option
- func WithConcurrency(concurrency int8) Option
- func WithCountdown(countdown time.Duration) Option
- func WithCustomSerializerJSON(example interface{}) Option
- func WithDisableOutput(disableOutput bool) Option
- func WithLogger(logger *ekalog.Logger) Option
- func WithMaxRetries(maxRetries int8) Option
- func WithQueues(queues ...string) Option
- func WithQueuess(queues []string) Option
- func WithRetryIntervals(retryIntervals []time.Duration) Option
- func WithSerializer(serializer Serializer) Option
- func WithTTL(ttl time.Duration) Option
- func WithTimeout(timeout time.Duration) Option
- type Queue
- func (q *Queue) Cancel(taskID string) (*Task, *ekaerr.Error)
- func (q *Queue) Consume() ([]Task, *ekaerr.Error)
- func (q *Queue) Count() (BrokerStats, *ekaerr.Error)
- func (q *Queue) Empty() *ekaerr.Error
- func (q *Queue) Get(taskID string) (*Task, *ekaerr.Error)
- func (q *Queue) List() ([]Task, *ekaerr.Error)
- func (q *Queue) Name() string
- func (q *Queue) NewTask(payload interface{}, options ...Option) *Task
- func (q *Queue) OnComplete(callback HandlerFunc) *Queue
- func (q *Queue) OnFailure(callback HandlerFunc) *Queue
- func (q *Queue) OnStart(callback HandlerFunc) *Queue
- func (q *Queue) OnSuccess(callback HandlerFunc) *Queue
- func (q *Queue) Publish(payload interface{}, options ...Option) (*Task, *ekaerr.Error)
- func (q *Queue) PublishTask(task *Task) *ekaerr.Error
- func (q *Queue) Use(callback ...HandlerFunc) *Queue
- type Serializer
- type Task
- func (t *Task) Deserialize(data []byte, userPayloadSerializer Serializer) *ekaerr.Error
- func (t *Task) ID() string
- func (t *Task) IsFinished() bool
- func (t *Task) MarkAsCanceled()
- func (t *Task) MarkAsSucceeded()
- func (t *Task) QueueName() string
- func (t *Task) Serialize(userPayloadSerializer Serializer) ([]byte, *ekaerr.Error)
- func (t *Task) Status() TaskStatus
- type TaskStatus
Constants ¶
const (
VERSION = "v1.4.6, 22 September 2021, 15:11 GMT+3"
)
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Bokchoy ¶
type Bokchoy struct {
// contains filtered or unexported fields
}
Bokchoy is the main object which stores all configuration, queues and broker.
func Use ¶ added in v1.0.0
func Use(queueName string, handlers ...HandlerFunc) *Bokchoy
func (*Bokchoy) ClearAll ¶ added in v1.0.0
ClearAll clears all queues in the broker and also removes all metadata. Does nothing (but returns an error) if Bokchoy already running (Run() has called).
func (*Bokchoy) Empty ¶
Empty empties initialized queues. Returns an error of the first queue that can not be emptied. Does nothing (but returns an error) if Bokchoy already running (Run() has called).
func (*Bokchoy) Publish ¶
func (b *Bokchoy) Publish(queueName string, payload interface{}, options ...Option) (*Task, *ekaerr.Error)
Publish publishes a new payload to a queue.
func (*Bokchoy) Queue ¶
Queue gets or creates a new queue.
If Run() has been called already, the new queue's consumers will be start immediately (if it's a new queue, and if Bokchoy has not been stopped yet).
If queue with the given 'name' has already declared, the Queue method just returns it, but if at least one Option is provided (even nil), the queue will be recreated with provided options.
type Broker ¶
type Broker interface { fmt.Stringer // Get returns serialized Task from the broker. // You can call Task.Deserialize then to decode received data. Get(queueName, taskID string) ([]byte, *ekaerr.Error) // Delete deletes raw data in broker based on key. Delete(queueName, taskID string) *ekaerr.Error // List returns raw data stored in broker. List(queueName string) ([][]byte, *ekaerr.Error) // Empty empties a queue. Empty(queueName string) *ekaerr.Error // ClearAll clears all queues in the broker and also removes all metadata. ClearAll() *ekaerr.Error // Count returns number of items from a queue name. Count(queueName string) (BrokerStats, *ekaerr.Error) // Set synchronizes the stored item. Set(queueName, taskID string, data []byte, ttl time.Duration) *ekaerr.Error // Publish publishes raw data. Publish(queueName, taskID string, taskPayload []byte, taskEtaUnixNano int64) *ekaerr.Error // Consume returns an array of raw data. Consume(queueName string, maxETA int64) ([][]byte, *ekaerr.Error) }
Broker is the common interface to define a Broker.
type BrokerStats ¶ added in v1.0.0
BrokerStats is the statistics returned by a Queue.
type HandlerFunc ¶
HandlerFunc is a handler to handle incoming tasks.
type Option ¶
type Option func(opts *options)
Option is an option unit.
func WithBroker ¶ added in v1.0.0
WithBroker registers new broker.
func WithConcurrency ¶
WithConcurrency defines the number of concurrent consumers.
func WithCountdown ¶
WithCountdown defines the countdown to launch a delayed task.
func WithCustomSerializerJSON ¶ added in v1.3.1
func WithCustomSerializerJSON(example interface{}) Option
WithCustomSerializerJSON is an alias for WithSerializer(CustomSerializerJSON(example)).
func WithDisableOutput ¶
WithDisableOutput defines if the output (logo, queues information) should be disabled.
func WithMaxRetries ¶
WithMaxRetries defines the number of maximum retries for a failed task.
func WithQueues ¶
WithQueues allows to override queues to run.
func WithQueuess ¶ added in v1.0.0
WithQueuess allows to override queues to run.
func WithRetryIntervals ¶
WithRetryIntervals defines the retry intervals for a failed task.
func WithSerializer ¶
func WithSerializer(serializer Serializer) Option
WithSerializer defines the Serializer.
func WithTimeout ¶
WithTimeout defines the timeout used to execute a task.
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue contains consumers to enqueue.
func (*Queue) Count ¶
func (q *Queue) Count() (BrokerStats, *ekaerr.Error)
Count returns statistics from queue: * direct: number of waiting tasks * delayed: number of waiting delayed tasks * total: number of total tasks
func (*Queue) Get ¶
Get returns a Task instance from the current Broker's Queue with its id. Returns nil as Task if requested task is not found.
func (*Queue) NewTask ¶
NewTask returns a new Task instance from payload and options.
Requirements: - Current Queue is valid. Otherwise nil Task is returned.
func (*Queue) OnComplete ¶
func (q *Queue) OnComplete(callback HandlerFunc) *Queue
OnComplete registers a new handler to be executed when a task is completed.
func (*Queue) OnFailure ¶
func (q *Queue) OnFailure(callback HandlerFunc) *Queue
OnFailure registers a new handler to be executed when a task is failed.
func (*Queue) OnStart ¶
func (q *Queue) OnStart(callback HandlerFunc) *Queue
OnStart registers a new handler to be executed when a task is started.
func (*Queue) OnSuccess ¶
func (q *Queue) OnSuccess(callback HandlerFunc) *Queue
OnSuccess registers a new handler to be executed when a task is succeeded.
func (*Queue) Publish ¶
Publish is a alias for:
task := q.NewTask(ctx, payload, options...) q.PublishTask(ctx, task)
func (*Queue) PublishTask ¶
PublishTask publishes a new task to the current Queue.
func (*Queue) Use ¶
func (q *Queue) Use(callback ...HandlerFunc) *Queue
Use appends a new handler middleware to the queue.
type Serializer ¶
type Serializer interface { Dumps(interface{}) ([]byte, *ekaerr.Error) Loads([]byte, *interface{}) *ekaerr.Error IsHumanReadable() bool Name() string }
Serializer defines an interface to implement a serializer, to encode user's Task payload to be a part of encoded RAW data of tasks, that will be used by Broker.
func CustomSerializerJSON ¶ added in v1.0.3
func CustomSerializerJSON(example interface{}) Serializer
CustomSerializerJSON returns a new JSON Serializer, that expects the same type's values will passed to Serializer.Dumps(), Serializer.Loads() as type of value you pass to this constructor.
Using that constructor builds a special JSON Serializer exactly for you, meaning that even defining destination for Serializer.Loads() as interface{}, the underlying type will be always T, that you pass to this constructor. Look:
var ( ser = CustomSerializerJSON(T{}) dest interface{} ) if err := ser.Loads(<...>, &dest); err.IsNotNil() { _, ok := dest.(T); // ok == true, if no err. }
Passing nil interface{} returns the same Serializer as you may get using DefaultSerializerJSON().
It's OK to use both of T or *T as type. What you pass is what you get. Value is not important, so you can just use T{} or (*T)(nil).
func DefaultSerializerDummy ¶ added in v1.0.0
func DefaultSerializerDummy() Serializer
func DefaultSerializerJSON ¶ added in v1.0.0
func DefaultSerializerJSON() Serializer
DefaultSerializerJSON is the same as CustomSerializerJSON(nil).
type Task ¶
type Task struct { Error *ekaerr.Error Panic interface{} PublishedAt ekatime.Timestamp TTL time.Duration ETA int64 RetryIntervals []time.Duration MaxRetries int8 ExecTime time.Duration Timeout time.Duration Payload interface{} // contains filtered or unexported fields }
Task is the model stored in a Queue.
func (*Task) Deserialize ¶ added in v1.0.0
func (t *Task) Deserialize(data []byte, userPayloadSerializer Serializer) *ekaerr.Error
Deserialize returns a Task instance from raw data.
func (*Task) ID ¶
ID returns an unique ID (ULID) of the current Task. Read more: https://github.com/oklog/ulid .
WARNING! This value not guaranteed to be an unique over queues. Although the chances of collision over queues are slim, keep that in mind. If you need to distinguish two Task s from different queues, use ID() along with QueueName().
Nil safe. Returns an empty string if Task is not initialized properly.
func (*Task) IsFinished ¶ added in v1.0.0
IsFinished reports whether current Task is considered finished, and callbacks onCompleted may be called for that.
func (*Task) MarkAsCanceled ¶
func (t *Task) MarkAsCanceled()
func (*Task) MarkAsSucceeded ¶
func (t *Task) MarkAsSucceeded()
func (*Task) QueueName ¶ added in v1.0.0
QueueName returns a queue name to which this task is published, or retrieved from.
func (*Task) Serialize ¶
func (t *Task) Serialize(userPayloadSerializer Serializer) ([]byte, *ekaerr.Error)
Serialize serializes a Task to raw data.
func (*Task) Status ¶
func (t *Task) Status() TaskStatus
Status returns the Task's status, that:
- Has been sent by you, or
- Task had at the moment when you retrieve the Task from a Bokchoy backend.
Requirements:
- Current Task is valid. Otherwise TASK_STATUS_INVALID is returned.
WARNING! TAKE A LOOK ALSO AT THE IsFinished() METHOD.
type TaskStatus ¶ added in v1.0.0
type TaskStatus int8
const ( TASK_STATUS_INVALID TaskStatus = 0 TASK_STATUS_WAITING TaskStatus = 1 TASK_STATUS_PROCESSING TaskStatus = 2 TASK_STATUS_RETRYING TaskStatus = 5 TASK_STATUS_SUCCEEDED TaskStatus = 10 TASK_STATUS_FAILED TaskStatus = -1 TASK_STATUS_CANCELLED TaskStatus = -2 TASK_STATUS_TIMED_OUT TaskStatus = -3 )
func (TaskStatus) String ¶ added in v1.0.0
func (ts TaskStatus) String() string
Source Files
¶
- bokchoy.go
- bokchoy_package_client.go
- bokchoy_package_client_private.go
- bokchoy_private.go
- broker.go
- constants.go
- consumer_private.go
- handler.go
- init.go
- options.go
- options_private.go
- queue.go
- queue_private.go
- serializer.go
- serializer_dummy.go
- serializer_json.go
- serializer_json_private.go
- task.go
- task_msgpack_view.go
- task_msgpack_view_gen.go
- task_private.go
- task_status.go
- terminal_private.go