machine

package module
v0.0.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 10, 2020 License: Apache-2.0 Imports: 8 Imported by: 5

README

Machine

 import "github.com/autom8ter/machine"

Machine is a zero dependency runtime for managed goroutines. It is inspired by errgroup.Group with extra bells & whistles:

  • throttled goroutines

  • self-cancellable goroutines with context

  • global-cancellable goroutines with context (see Cancel)

  • goroutines have IDs and optional tags for easy debugging(see Stats)

  • publish/subscribe to channels for passing messages between goroutines

  • middlewares for wrapping/decorating functions

  • panic recovery

  • global concurrency safe cache

Usage

var ErrNoExist = errors.New("machine: does not exit")

ErrNoExist is returned by the default Cache implementation if a record is not found

type Cache
type Cache interface {
	// Get get a value by key and an error if one exists
	Get(key string) (interface{}, error)
	// Range executes the given function on the cache. If the function returns false, the iteration stops.
	Range(fn func(k string, val interface{}) bool) error
	// Set sets the key and value in the cache
	Set(key string, val interface{}) error
	// Del deletes the value by key from the map
	Del(key string) error
	// Close closes the cache
	Close() error
}

Cache is a concurrency safe cache implementation used by Machine. A default sync.Map implementation is used if one isn't provided via WithCache

type Func
type Func func(routine Routine)

Func is the function passed into machine.Go. The Routine is passed into this function at runtime.

type GoOpt
type GoOpt func(o *goOpts)

GoOpt is a function that configures GoOpts

func WithMiddlewares
func WithMiddlewares(middlewares ...Middleware) GoOpt

WithMiddlewares wraps the gived function with the input middlewares.

func WithPID
func WithPID(id int) GoOpt

WithPID is a GoOpt that sets/overrides the process ID of the Routine. A random id is assigned if this option is not used.

func WithTags
func WithTags(tags ...string) GoOpt

WithTags is a GoOpt that adds an array of strings as "tags" to the Routine.

func WithTimeout
func WithTimeout(to time.Duration) GoOpt

WithTimeout is a GoOpt that creates the Routine's context with the given timeout value

type Machine
type Machine struct {
}

Machine is a zero dependency runtime for managed goroutines. It is inspired by errgroup.Group with extra bells & whistles:

func New
func New(ctx context.Context, options ...Opt) *Machine

New Creates a new machine instance with the given root context & options

func (*Machine) Cache
func (m *Machine) Cache() Cache

Cache returns the machines Cache implementation

func (*Machine) Cancel
func (p *Machine) Cancel()

Cancel cancels every goroutines context

func (*Machine) Close
func (m *Machine) Close() error
func (*Machine) Current
func (p *Machine) Current() int

Current returns current managed goroutine count

func (*Machine) Go
func (m *Machine) Go(fn Func, opts ...GoOpt)

Go calls the given function in a new goroutine.

The first call to return a non-nil error who's cause is machine.Cancel cancels the context of every job. All errors that are not of type machine.Cancel will be returned by Wait.

func (*Machine) Stats
func (m *Machine) Stats() Stats

Stats returns Goroutine information from the machine

func (*Machine) Total
func (p *Machine) Total() int

Total returns total goroutines that have been executed by the machine

func (*Machine) Wait
func (m *Machine) Wait()

Wait blocks until all goroutines exit. This MUST be called after all routines are added via machine.Go in order for a machine instance to work as intended.

type Middleware
type Middleware func(fn Func) Func

Middleware is a function that wraps/modifies the behavior of a machine.Func.

func After
func After(afterFunc func(routine Routine)) Middleware

After exectues the afterFunc after the main goroutine exits.

func Before
func Before(beforeFunc func(routine Routine)) Middleware

Before exectues the beforeFunc before the main goroutine is executed.

func Cron
func Cron(ticker *time.Ticker) Middleware

Cron is a middleware that execute the function every time the ticker ticks until the goroutine's context cancels

func Decider
func Decider(deciderFunc func(routine Routine) bool) Middleware

Decider exectues the deciderFunc before the main goroutine is executed. If it returns false, the goroutine won't be executed.

type Opt
type Opt func(o *option)

Opt is a single option when creating a machine instance with New

func WithCache
func WithCache(cache Cache) Opt

WithCache sets the in memory, concurrency safe cache. If not set, a default sync.Map implementation is used.

func WithMaxRoutines
func WithMaxRoutines(max int) Opt

WithMaxRoutines throttles goroutines at the input number. It will panic if <= zero.

func WithPubSub
func WithPubSub(pubsub PubSub) Opt

WithPubSub sets the pubsub implementation for the machine instance. An inmemory implementation is used if none is provided.

func WithSubscribeChannelBuffer
func WithSubscribeChannelBuffer(length int) Opt

WithSubscribeChannelBuffer sets the buffer length of the channel returned from a Routine subscribeTo

type PubSub
type PubSub interface {
	// Publish publishes the object to the channel by name
	Publish(channel string, obj interface{}) error
	// Subscribe subscribes to the given channel
	Subscribe(ctx context.Context, channel string, handler func(obj interface{})) error
	Close() error
}

PubSub is used to asynchronously pass messages between routines.

type Routine
type Routine interface {
	// Context returns the goroutines unique context that may be used for cancellation
	Context() context.Context
	// Cancel cancels the context returned from Context()
	Cancel()
	// PID() is the goroutines unique process id
	PID() int
	// Tags() are the tags associated with the goroutine
	Tags() []string
	// Start is when the goroutine started
	Start() time.Time
	// Duration is the duration since the goroutine started
	Duration() time.Duration
	// Publish publishes the object to the given channel
	Publish(channel string, obj interface{}) error
	// Subscribe subscribes to a channel and executes the function on every message passed to it. It exits if the goroutines context is cancelled.
	Subscribe(channel string, handler func(obj interface{})) error
	// Machine returns the underlying routine's machine instance
	Machine() *Machine
}

Routine is an interface representing a goroutine

type RoutineStats
type RoutineStats struct {
	PID      int           `json:"pid"`
	Start    time.Time     `json:"start"`
	Duration time.Duration `json:"duration"`
	Tags     []string      `json:"tags"`
}

RoutineStats holds information about a single goroutine

type Stats
type Stats struct {
	Count    int            `json:"count"`
	Routines []RoutineStats `json:"routines"`
}

Stats holds information about goroutines

func (Stats) String
func (s Stats) String() string

String prints a pretty json string of the stats

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrNoExist = errors.New("machine: does not exit")

ErrNoExist is returned by the default Cache implementation if a record is not found

Functions

This section is empty.

Types

type Cache

type Cache interface {
	// Get get a value by key and an error if one exists
	Get(key string) (interface{}, error)
	// Range executes the given function on the cache. If the function returns false, the iteration stops.
	Range(fn func(k string, val interface{}) bool) error
	// Set sets the key and value in the cache
	Set(key string, val interface{}) error
	// Del deletes the value by key from the map
	Del(key string) error
	// Close closes the cache
	Close() error
}

Cache is a concurrency safe cache implementation used by Machine. A default sync.Map implementation is used if one isn't provided via WithCache

type Func

type Func func(routine Routine)

Func is the function passed into machine.Go. The Routine is passed into this function at runtime.

type GoOpt

type GoOpt func(o *goOpts)

GoOpt is a function that configures GoOpts

func WithMiddlewares

func WithMiddlewares(middlewares ...Middleware) GoOpt

WithMiddlewares wraps the gived function with the input middlewares.

func WithPID

func WithPID(id int) GoOpt

WithPID is a GoOpt that sets/overrides the process ID of the Routine. A random id is assigned if this option is not used.

func WithTags

func WithTags(tags ...string) GoOpt

WithTags is a GoOpt that adds an array of strings as "tags" to the Routine.

func WithTimeout

func WithTimeout(to time.Duration) GoOpt

WithTimeout is a GoOpt that creates the Routine's context with the given timeout value

type Machine

type Machine struct {
	// contains filtered or unexported fields
}

Machine is a zero dependency runtime for managed goroutines. It is inspired by errgroup.Group with extra bells & whistles:

func New

func New(ctx context.Context, options ...Opt) *Machine

New Creates a new machine instance with the given root context & options

func (*Machine) Cache

func (m *Machine) Cache() Cache

Cache returns the machines Cache implementation

func (*Machine) Cancel

func (p *Machine) Cancel()

Cancel cancels every goroutines context

func (*Machine) Close added in v0.0.5

func (m *Machine) Close() error

func (*Machine) Current

func (p *Machine) Current() int

Current returns current managed goroutine count

func (*Machine) Go

func (m *Machine) Go(fn Func, opts ...GoOpt)

Go calls the given function in a new goroutine.

The first call to return a non-nil error who's cause is machine.Cancel cancels the context of every job. All errors that are not of type machine.Cancel will be returned by Wait.

func (*Machine) Stats

func (m *Machine) Stats() Stats

Stats returns Goroutine information from the machine

func (*Machine) Total

func (p *Machine) Total() int

Total returns total goroutines that have been executed by the machine

func (*Machine) Wait

func (m *Machine) Wait()

Wait blocks until all goroutines exit. This MUST be called after all routines are added via machine.Go in order for a machine instance to work as intended.

type Middleware

type Middleware func(fn Func) Func

Middleware is a function that wraps/modifies the behavior of a machine.Func.

func After added in v0.0.4

func After(afterFunc func(routine Routine)) Middleware

After exectues the afterFunc after the main goroutine exits.

func Before added in v0.0.4

func Before(beforeFunc func(routine Routine)) Middleware

Before exectues the beforeFunc before the main goroutine is executed.

func Cron

func Cron(ticker *time.Ticker) Middleware

Cron is a middleware that execute the function every time the ticker ticks until the goroutine's context cancels

func Decider added in v0.0.4

func Decider(deciderFunc func(routine Routine) bool) Middleware

Decider exectues the deciderFunc before the main goroutine is executed. If it returns false, the goroutine won't be executed.

type Opt

type Opt func(o *option)

Opt is a single option when creating a machine instance with New

func WithCache

func WithCache(cache Cache) Opt

WithCache sets the in memory, concurrency safe cache. If not set, a default sync.Map implementation is used.

func WithMaxRoutines

func WithMaxRoutines(max int) Opt

WithMaxRoutines throttles goroutines at the input number. It will panic if <= zero.

func WithPubSub added in v0.0.5

func WithPubSub(pubsub PubSub) Opt

WithPubSub sets the pubsub implementation for the machine instance. An inmemory implementation is used if none is provided.

func WithSubscribeChannelBuffer

func WithSubscribeChannelBuffer(length int) Opt

WithSubscribeChannelBuffer sets the buffer length of the channel returned from a Routine subscribeTo

type PubSub added in v0.0.5

type PubSub interface {
	// Publish publishes the object to the channel by name
	Publish(channel string, obj interface{}) error
	// Subscribe subscribes to the given channel
	Subscribe(ctx context.Context, channel string, handler func(obj interface{})) error
	Close() error
}

PubSub is used to asynchronously pass messages between routines.

type Routine

type Routine interface {
	// Context returns the goroutines unique context that may be used for cancellation
	Context() context.Context
	// Cancel cancels the context returned from Context()
	Cancel()
	// PID() is the goroutines unique process id
	PID() int
	// Tags() are the tags associated with the goroutine
	Tags() []string
	// Start is when the goroutine started
	Start() time.Time
	// Duration is the duration since the goroutine started
	Duration() time.Duration
	// Publish publishes the object to the given channel
	Publish(channel string, obj interface{}) error
	// Subscribe subscribes to a channel and executes the function on every message passed to it. It exits if the goroutines context is cancelled.
	Subscribe(channel string, handler func(obj interface{})) error
	// Machine returns the underlying routine's machine instance
	Machine() *Machine
}

Routine is an interface representing a goroutine

type RoutineStats

type RoutineStats struct {
	PID      int           `json:"pid"`
	Start    time.Time     `json:"start"`
	Duration time.Duration `json:"duration"`
	Tags     []string      `json:"tags"`
}

RoutineStats holds information about a single goroutine

type Stats

type Stats struct {
	Count    int            `json:"count"`
	Routines []RoutineStats `json:"routines"`
}

Stats holds information about goroutines

func (Stats) String

func (s Stats) String() string

String prints a pretty json string of the stats

Directories

Path Synopsis
examples module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL