routine

package
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 7, 2023 License: MIT Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const (
	CallbackDone callbackType = 1 + iota
	CallbackFail
	CallbackAlways
	CallbackCancel
)
View Source
const (
	ResultSuccess resultType = iota
	ResultFailure
	ResultCancelled
)
View Source
const (
	ErrTimeout        utils.Error = "operation timed out"
	ErrPoolOverload   utils.Error = "too many goroutines blocked on submit or Nonblocking is set"
	ErrDuplicatedName utils.Error = "duplicated goroutine pool name"
)

Variables

View Source
var (
	ErrCancelled error = &CancelledError{}
)

Functions

func AppName

func AppName(name string) utils.OptionFunc[candyOption]

func Args

func Args(args ...any) utils.OptionFunc[candyOption]

func Channel

func Channel(ch chan<- any) utils.OptionFunc[candyOption]

func Construct

func Construct(ctx context.Context, conf Conf, opts ...utils.OptionExtender) func()

func Go

func Go(task any, opts ...utils.OptionExtender)

func Goc

func Goc(ctx context.Context, task any, opts ...utils.OptionExtender)

func Loop

func Loop(task any, opts ...utils.OptionExtender)

func Loopc

func Loopc(ctx context.Context, task any, opts ...utils.OptionExtender)

func NewPromise

func NewPromise() *promise

NewPromise is factory function for promise

func WaitGroup

func WaitGroup(wg *sync.WaitGroup) utils.OptionFunc[candyOption]

func WithoutTimeout

func WithoutTimeout() utils.OptionFunc[NewPoolOption]

Types

type AggregateError

type AggregateError struct {
	InnerErrs []error
	// contains filtered or unexported fields
}

AggregateError aggregate multi errors into an error

func (*AggregateError) Error

func (e *AggregateError) Error() string

type CancelledError

type CancelledError struct {
}

CancelledError present the Future object is cancelled.

func (*CancelledError) Error

func (e *CancelledError) Error() string

type Canceller

type Canceller interface {
	IsCancelled() bool
	Cancel()
}

Canceller is used to check if the future is cancelled It be usually passed to the future task function for future task function can check if the future is cancelled.

type Conf

type Conf struct {
	// MaxGoroutineAmount 最大协程数量
	MaxRoutineAmount int `yaml:"max_routine_amount" json:"max_routine_amount" toml:"max_routine_amount" default:"-1"`

	// MaxReleaseTimePerPool 优雅退出时单个 pool 最大等待时间
	MaxReleaseTimePerPool string `yaml:"max_release_time_per_pool" json:"max_release_time_per_pool" toml:"max_release_time_per_pool" default:"30s"`

	// ForceSync will synchronously execute Go, promise function if true
	ForceSync bool `yaml:"force_sync" json:"force_sync" toml:"force_sync" default:"false"`

	// Logger is the customized logger for logging info, if it is not set,
	// default standard logger from log package is used.
	EnabledLogger bool   `yaml:"enabled_logger" json:"enabled_logger" toml:"enabled_logger" default:"false"`
	Logger        string `yaml:"logger" json:"logger" toml:"logger" default:"github.com/wfusion/gofusion/log/customlogger.routineLogger"`
	LogInstance   string `yaml:"log_instance" json:"log_instance" toml:"log_instance" default:"default"`
}

Conf routine configure nolint: revive // struct tag too long issue

type Future

type Future struct {
	Id      int // ID can be used as identity of Future
	AppName string
	// contains filtered or unexported fields
}

Future provides a read-only view of promise, the value is set by using Resolve, Reject and Cancel methods of related promise

func Promise

func Promise(fn any, async bool, opts ...utils.OptionExtender) *Future

func WhenAll

func WhenAll(acts ...any) (fu *Future)

WhenAll receives function slice and returns a Future. If all Futures are resolved, this Future will be resolved and return results slice. Otherwise, it will be rejected with results slice returned by all Futures Legit types of act are same with Start function

func WhenAny

func WhenAny(acts ...any) *Future

WhenAny returns a Future. If any Future is resolved, this Future will be resolved and return result of resolved Future. Otherwise, it will be rejected with results slice returned by all Futures Legit types of act are same with Start function

func WhenAnyMatched

func WhenAnyMatched(predicate func(any) bool, acts ...any) *Future

WhenAnyMatched returns a Future. If any Future is resolved and match the predicate, this Future will be resolved and return result of resolved Future. If all Futures are cancelled, this Future will be cancelled. Otherwise, it will be rejected with a NoMatchedError included results slice returned by all Futures Legit types of act are same with Start function

func WrapFuture

func WrapFuture(value any, opts ...utils.OptionExtender) *Future

WrapFuture return a Future that presents the wrapped value

func (*Future) Cancel

func (f *Future) Cancel() (e error)

Cancel sets the status of promise to ResultCancelled. If promise is cancelled, Get() will return nil and CANCELLED error. All callback functions will be not called if promise is cancelled.

func (*Future) Canceller

func (f *Future) Canceller() Canceller

Canceller returns a canceller object related to future.

func (*Future) Get

func (f *Future) Get() (val any, err error)

Get will block current goroutines until the Future is resolved/rejected/cancelled. If Future is resolved, value and nil will be returned If Future is rejected, nil and error will be returned. If Future is cancelled, nil and CANCELLED error will be returned.

func (*Future) GetChan

func (f *Future) GetChan() <-chan *Result

GetChan returns a channel than can be used to receive result of promise

func (*Future) GetOrTimeout

func (f *Future) GetOrTimeout(mm uint) (val any, err error, timout bool)

GetOrTimeout is similar to Get(), but GetOrTimeout will not block after timeout. If GetOrTimeout returns with a timeout, timeout value will be true in return values. The unit of parameter is millisecond.

func (*Future) IsCancelled

func (f *Future) IsCancelled() bool

IsCancelled returns true if the promise is cancelled, otherwise false

func (*Future) OnCancel

func (f *Future) OnCancel(callback func()) *Future

OnCancel registers a callback function that will be called when promise is cancelled. If promise is already cancelled, the callback will immediately be called.

func (*Future) OnComplete

func (f *Future) OnComplete(callback func(v any)) *Future

OnComplete register a callback function that will be called when promise is rejected or resolved. If promise is already rejected or resolved, the callback will immediately be called. According to the status of promise, value or error will be parameter of Always callback function. Value is the parameter if promise is resolved, or error is the parameter if promise is rejected. Always callback will be not called if promise be called.

func (*Future) OnFailure

func (f *Future) OnFailure(callback func(v any)) *Future

OnFailure registers a callback function that will be called when promise is rejected. If promise is already rejected, the callback will immediately be called. The error of promise will be parameter of Fail callback function.

func (*Future) OnSuccess

func (f *Future) OnSuccess(callback func(v any)) *Future

OnSuccess registers a callback function that will be called when promise is resolved. If promise is already resolved, the callback will immediately be called. The value of promise will be parameter of Done callback function.

func (*Future) Pipe

func (f *Future) Pipe(callbacks ...any) (result *Future, ok bool)

Pipe registers one or two functions that returns a Future, and returns a proxy of pipeline Future. First function will be called when Future is resolved, the returned Future will be as pipeline Future. Secondary function will be called when Future is rejected, the returned Future will be as pipeline Future.

func (*Future) SetTimeout

func (f *Future) SetTimeout(mm int) *Future

SetTimeout sets the future task will be cancelled if future is not complete before time out

type NewPoolOption

type NewPoolOption struct {
	PoolOption
	// ApplyTimeout is the timeout duration for applying a goroutine pool
	// Default = 0 means non-blocking and directly panic;
	// < 0 means blocking and wait;
	// > 0 means block and panic after timeout
	ApplyTimeout time.Duration
}

type NoMatchedError

type NoMatchedError struct {
	Results []any
}

NoMatchedError presents no future that returns matched result in WhenAnyTrue function.

func (*NoMatchedError) Error

func (e *NoMatchedError) Error() string

func (*NoMatchedError) HasError

func (e *NoMatchedError) HasError() bool

type Pool

type Pool interface {
	Submit(task any, opts ...utils.OptionExtender) error
	Running() int
	Free() int
	Waiting() int
	Cap() int
	IsClosed() bool
	Release(opts ...utils.OptionExtender)
	ReleaseTimeout(timeout time.Duration, opts ...utils.OptionExtender) error
}

func NewPool

func NewPool(name string, size int, opts ...utils.OptionExtender) (p Pool)

type PoolOption

type PoolOption struct {
	// ExpiryDuration is a period for the scavenger goroutine to clean up those expired workers,
	// the scavenger scans all workers every `ExpiryDuration` and clean up those workers that haven't been
	// used for more than `ExpiryDuration`.
	ExpiryDuration time.Duration

	// PreAlloc indicates whether to make memory pre-allocation when initializing Pool.
	PreAlloc bool

	// Max number of goroutine blocking on pool.Submit.
	// 0 (default value) means no such limit.
	MaxBlockingTasks int

	// When Nonblocking is true, Pool.Submit will never be blocked.
	// ErrPoolOverload will be returned when Pool.Submit cannot be done at once.
	// When Nonblocking is true, MaxBlockingTasks is inoperative.
	Nonblocking bool

	// PanicHandler is used to handle panics from each worker goroutine.
	// if nil, panics will be thrown out again from worker goroutines.
	PanicHandler func(any)

	// Logger is the customized logger for logging info, if it is not set,
	// default standard logger from log package is used.
	Logger ants.Logger

	// When DisablePurge is true, workers are not purged and are resident.
	DisablePurge bool
}

type Result

type Result struct {
	Result any        //result of the promise
	Typ    resultType //success, failure, or cancelled?
}

Result presents the result of a promise. If Typ is ResultSuccess, Result field will present the returned value of Future task. If Typ is ResultFailure, Result field will present a related error . If Typ is ResultCancelled, Result field will be null.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL