queue

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 7, 2024 License: GPL-3.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// ErrQueueTimeout is thrown when an item hasn't been updated by the timeout variable defined.
	ErrQueueTimeout = Error("queue entry timed-out")

	// ErrQueueNoAvailableItem is thrown when an item does not have any items left to push.
	ErrQueueNoAvailableItem = Error("no available items")

	// ErrQueueRestart is used when one wants to restart the current active item.
	ErrQueueRestart = Error("queue entry restart")

	// ErrQueueCanceled is used when a user sends an empty struct{}{} to `value.Cancel`.
	ErrQueueCanceled = Error("queue item canceled")

	// ErrValueDisappeared is used when the corresponding value for a key suddenly
	// turned into a <nil>. This could only happen, if someone decided to set the
	// pointer to <nil>, however on general terms it's used to prevent deadlocks within
	// the popAwait() routine.
	ErrValueDisappeared = Error("value disappeared")
)
View Source
const (
	// DefaultTick defines the Pop goroutine ticker's duration.
	DefaultTick = 30 * time.Second
)

Variables

View Source
var (
	// AcquireTTL is how long we'll wait to Acquire() a sempahore release.
	AcquireTTL = 1 * time.Second

	// ManagedTTL is how long till a timeout occurs for un-updated entry that's managed.
	ManagedTTL = 30 * time.Second

	// UnmanagedTTL is how long till a timeout occurs for un-updated entry that's unmanaged.
	UnmanagedTTL = 10 * time.Minute
)

Functions

func ErrQueueEntryTimedOut

func ErrQueueEntryTimedOut(key Key) error

ErrQueueEntryTimedOut throws ErrQueueTimeout for the specified Key.

func ErrQueueNoAvailableItems

func ErrQueueNoAvailableItems(key Key) error

ErrQueueNoAvailableItems throws ErrQueueNoAvailableItem for the specified Key.

func ErrQueueValueDisappeared

func ErrQueueValueDisappeared(key Key) error

ErrQueueValueDisappeared throws ErrValueDisappeared for the specified Key.

Types

type Detail

type Detail struct {
	ID   int64
	Pack int64
	Bot  string
}

func (Detail) Clone

func (d Detail) Clone() Detail

Clone will create a clone of the detail.

type Error

type Error string

Error can be made into a custom error.

func (Error) Error

func (e Error) Error() string

type Items

type Items map[domain.AnimeBase]*State

type Key

type Key any

type KeyError

type KeyError struct {
	Key Key
	// contains filtered or unexported fields
}

KeyError targets a 'queue.Key' with an error message.

func (*KeyError) Error

func (e *KeyError) Error() string

func (*KeyError) Is

func (e *KeyError) Is(target error) bool

func (*KeyError) Unwrap

func (e *KeyError) Unwrap() (err error)

type Mapper

type Mapper[V any] interface {
	Map(active, inactive Items) (V, error)
}

Mapper defines an interface to be able to map a queue items into any other values

type Progress

type Progress struct {
	Type string //	@refer	to 'pkg/websocket/protocol' ProgressType

	Percentage float64
	Speed      float64

	CurrentFilesize *float64
	Filesize        *float64

	CurrentDuration *float64
	Duration        *float64
}

Progress defines the progress for a value whilst it's in queue.

func (Progress) Equal

func (p Progress) Equal(other Progress) bool

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

func NewCustomQueue

func NewCustomQueue(tick time.Duration) *Queue

func NewQueue

func NewQueue() *Queue

func (*Queue) ActiveItems

func (q *Queue) ActiveItems() Items

func (*Queue) AddLimit

func (q *Queue) AddLimit(limit int)

func (*Queue) Cancel

func (q *Queue) Cancel(anime domain.AnimeBase)

Cancel attempts to cancel out the current active item. If the item is not canceled it'll call the inactive Delete().

func (*Queue) Close

func (q *Queue) Close() (err error)

Close will clear out all the active and inactive items. On-going active items will get canceled here.

func (*Queue) Collector

func (q *Queue) Collector() prometheus.Collector

Collector creates a `prometheus.Collector` with metrics such metrics such as `active`, `inactive` and `limit` guage statistics.

func (*Queue) Contains

func (q *Queue) Contains(anime domain.AnimeBase) (found bool)

Contains will return true if the key is stored in the active or inactive queue.

func (*Queue) Delete

func (q *Queue) Delete(anime domain.AnimeBase)

Delete removes the queue entry (it does not send an empty to `.Cancel` though)

func (*Queue) ErrC

func (q *Queue) ErrC() <-chan error

ErrC returns any errors that occurred during the Pop background routine.

func (*Queue) Get

func (q *Queue) Get(anime domain.AnimeBase) (value *State, ok bool)

Get will return the value in pointer form, if it returns nil, the item probably does not exist.

func (*Queue) GetLimit

func (q *Queue) GetLimit() int

func (*Queue) InactiveItems

func (q *Queue) InactiveItems() Items

func (*Queue) IsActive

func (q *Queue) IsActive(anime domain.AnimeBase) (isActive bool)

IsActive will return true if the key is stored in the active sub-queue.

func (*Queue) IsBusy

func (q *Queue) IsBusy() bool

IsBusy returns true if there's items in the queue and there's a semaphore allocation (someone's ready to take the work).

func (*Queue) L

func (q *Queue) L(ctx context.Context) *zap.Logger

func (*Queue) Len

func (q *Queue) Len() int64

Len returns the number of active and inactive items

func (*Queue) Notifier

func (q *Queue) Notifier() <-chan struct{}

Notifier will trigger when an item is added to the queue

func (*Queue) Pause

func (q *Queue) Pause()

func (*Queue) Paused

func (q *Queue) Paused() bool

func (*Queue) Pop

func (q *Queue) Pop() (anime domain.AnimeBase, state *State, ok bool)

func (*Queue) PushBack

func (q *Queue) PushBack(anime domain.AnimeBase, state *State) (pushed bool)

func (*Queue) PushFront

func (q *Queue) PushFront(anime domain.AnimeBase, state *State) (pushed bool)

func (*Queue) Restart

func (q *Queue) Restart(anime domain.AnimeBase)

Restart attempts to restart the queue item.

func (*Queue) Resume

func (q *Queue) Resume()

func (*Queue) Update

func (q *Queue) Update(anime domain.AnimeBase, state *State)

func (*Queue) UpdateProgress

func (q *Queue) UpdateProgress(anime domain.AnimeBase, progress Progress)

func (*Queue) UpdateUnmanaged

func (q *Queue) UpdateUnmanaged(cb func(Key, *State) bool)

type StartedFrom

type StartedFrom int
const (
	StartedFrom_AutoDownload StartedFrom = iota << 1
	StartedFrom_Web
)

func (StartedFrom) String

func (s StartedFrom) String() string

StartedFrom implements fmt.Stringer

type State

type State struct {
	Progress    Progress
	Details     []Detail
	StartedFrom StartedFrom
	// contains filtered or unexported fields
}

State is the queue struct, where we hold the available items along with the progress. Details contain all the possible iterations

func (*State) Active

func (v *State) Active() bool

func (*State) Cancel

func (v *State) Cancel()

Cancel will attempt to cancel the active value.

func (*State) Clone

func (v *State) Clone() (value *State)

Clone will copy the queue value, but only the details are copied.

func (*State) Close

func (v *State) Close() (err error)

func (*State) Current

func (v *State) Current() (*Detail, bool)

Current returns the current detail.

func (*State) Done

func (v *State) Done(err error)

Done will attempt to close off any progress to the value given the error passed.

func (*State) Init

func (v *State) Init()

func (*State) IsUnmanaged

func (v *State) IsUnmanaged() bool

IsUnmanaged returns if the queue value is managed by us, or we're awaiting something.

func (*State) OnUpdate

func (v *State) OnUpdate()

func (*State) Restart

func (v *State) Restart()

Restart will attempt to restart the queue.

func (*State) TimedOut

func (v *State) TimedOut(from time.Time) bool

TimedOut checks if the value is native, if it's not native it'll check if the `updatedAt` entry is less than the time provided minus 30 seconds.

func (*State) ToggleUnmanaged

func (v *State) ToggleUnmanaged()

func (*State) UpdateProgress

func (v *State) UpdateProgress(progress Progress)

func (*State) UpdatedAt

func (v *State) UpdatedAt() time.Time

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL