queue

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 29, 2021 License: Apache-2.0 Imports: 11 Imported by: 7

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	DelayOnQueueIsEmpty = 250 * time.Millisecond
	DelayOnFailedTask   = 5 * time.Second
	DelayOnRepeat       = 25 * time.Millisecond
)

Functions

This section is empty.

Types

type TaskQueue

type TaskQueue struct {
	Name     string
	Handler  func(task.Task) TaskResult
	HeadLock sync.Mutex
	Status   string
	// contains filtered or unexported fields
}

func NewTasksQueue

func NewTasksQueue() *TaskQueue

func (*TaskQueue) AddAfter

func (q *TaskQueue) AddAfter(id string, newTask task.Task)

AddAfter inserts a task after the task with specified id.

func (*TaskQueue) AddBefore

func (q *TaskQueue) AddBefore(id string, newTask task.Task)

AddBefore inserts a task before the task with specified id.

func (*TaskQueue) AddFirst

func (q *TaskQueue) AddFirst(t task.Task)

AddFirst adds new head element.

func (*TaskQueue) AddLast

func (q *TaskQueue) AddLast(t task.Task)

AddFirst adds new tail element.

func (*TaskQueue) DoWithHeadLock

func (q *TaskQueue) DoWithHeadLock(fn func(tasksQueue *TaskQueue))

func (*TaskQueue) Filter

func (q *TaskQueue) Filter(filterFn func(task.Task) bool)

Filter run filterFn on every task and remove each with false result.

func (*TaskQueue) Get

func (q *TaskQueue) Get(id string) task.Task

Get returns a task by id.

func (*TaskQueue) GetFirst

func (q *TaskQueue) GetFirst() task.Task

GetFirst returns a head element.

func (*TaskQueue) GetLast

func (q *TaskQueue) GetLast() task.Task

GetLast returns a tail element.

func (*TaskQueue) IsEmpty

func (q *TaskQueue) IsEmpty() bool

func (*TaskQueue) Iterate

func (q *TaskQueue) Iterate(doFn func(task.Task))

Iterate run doFn for every task.

func (*TaskQueue) Length

func (q *TaskQueue) Length() int

func (*TaskQueue) MeasureActionTime

func (q *TaskQueue) MeasureActionTime(action string) func()

MeasureActionTime is a helper to measure execution time of queue's actions

func (*TaskQueue) Remove

func (q *TaskQueue) Remove(id string) (t task.Task)

Remove finds element by id and deletes it.

func (*TaskQueue) RemoveFirst

func (q *TaskQueue) RemoveFirst() (t task.Task)

RemoveFirst deletes a head element, so head is moved.

func (*TaskQueue) RemoveLast

func (q *TaskQueue) RemoveLast() (t task.Task)

RemoveLast deletes a tail element, so tail is moved.

func (*TaskQueue) Start

func (q *TaskQueue) Start()

func (*TaskQueue) Stop

func (q *TaskQueue) Stop()

func (*TaskQueue) String

func (q *TaskQueue) String() string

Dump tasks in queue to one line

func (*TaskQueue) WithAddHandler

func (tq *TaskQueue) WithAddHandler(fn func(task.Task)) *TaskQueue

func (*TaskQueue) WithContext

func (q *TaskQueue) WithContext(ctx context.Context)

func (*TaskQueue) WithHandler

func (tq *TaskQueue) WithHandler(fn func(task.Task) TaskResult) *TaskQueue

func (*TaskQueue) WithMetricStorage

func (q *TaskQueue) WithMetricStorage(mstor *metric_storage.MetricStorage)

func (*TaskQueue) WithName

func (tq *TaskQueue) WithName(name string) *TaskQueue

func (*TaskQueue) WithRemoveHandler

func (tq *TaskQueue) WithRemoveHandler(fn func(task.Task)) *TaskQueue

type TaskQueueSet

type TaskQueueSet struct {
	Queues   map[string]*TaskQueue
	MainName string
	// contains filtered or unexported fields
}

TaskQueueSet is a manager for a set of named queues

func NewTaskQueueSet

func NewTaskQueueSet() *TaskQueueSet

func (*TaskQueueSet) Add

func (tqs *TaskQueueSet) Add(queue *TaskQueue)

func (*TaskQueueSet) DoWithLock

func (tqs *TaskQueueSet) DoWithLock(fn func(tqs *TaskQueueSet))

*

taskQueueSet.DoWithLock(func(tqs *TaskQueueSet){
   tqs.GetMain().Pop()
})

func (*TaskQueueSet) GetByName

func (tqs *TaskQueueSet) GetByName(name string) *TaskQueue

func (*TaskQueueSet) GetMain

func (tqs *TaskQueueSet) GetMain() *TaskQueue

func (*TaskQueueSet) Iterate

func (tqs *TaskQueueSet) Iterate(doFn func(queue *TaskQueue))

Iterate run doFn for every task.

func (*TaskQueueSet) NewNamedQueue

func (tqs *TaskQueueSet) NewNamedQueue(name string, handler func(task.Task) TaskResult)

func (*TaskQueueSet) Remove

func (tqs *TaskQueueSet) Remove(name string)

func (*TaskQueueSet) Start

func (tqs *TaskQueueSet) Start()

func (*TaskQueueSet) StartMain

func (tqs *TaskQueueSet) StartMain()

func (*TaskQueueSet) Stop

func (tqs *TaskQueueSet) Stop()

func (*TaskQueueSet) WaitStopWithTimeout

func (tqs *TaskQueueSet) WaitStopWithTimeout(timeout time.Duration)

func (*TaskQueueSet) WithContext

func (tqs *TaskQueueSet) WithContext(ctx context.Context)

func (*TaskQueueSet) WithMainName

func (tqs *TaskQueueSet) WithMainName(name string)

func (*TaskQueueSet) WithMetricStorage

func (tqs *TaskQueueSet) WithMetricStorage(mstor *metric_storage.MetricStorage)

type TaskResult

type TaskResult struct {
	Status     string
	HeadTasks  []task.Task
	TailTasks  []task.Task
	AfterTasks []task.Task

	DelayBeforeNextTask time.Duration

	AfterHandle func()
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL