Documentation ¶
Index ¶
- Constants
- type Func
- type GoOpt
- type Machine
- func (p *Machine) Active() int
- func (p *Machine) Cancel()
- func (m *Machine) CancelRoutine(id string)
- func (m *Machine) Close()
- func (m *Machine) Go(fn Func, opts ...GoOpt) string
- func (m *Machine) HasRoutine(id string) bool
- func (m *Machine) ID() string
- func (m *Machine) Parent() *Machine
- func (m *Machine) Stats() *Stats
- func (m *Machine) Sub(opts ...Opt) *Machine
- func (p *Machine) Tags() []string
- func (p *Machine) Total() int
- func (m *Machine) Wait()
- type Middleware
- func After(afterFunc func(routine Routine)) Middleware
- func Before(beforeFunc func(routine Routine)) Middleware
- func Cron(ticker *time.Ticker) Middleware
- func Decider(deciderFunc func(routine Routine) bool) Middleware
- func PanicRecover() Middleware
- func While(deciderFunc func(routine Routine) bool) Middleware
- type Opt
- func WithChildren(children ...*Machine) Opt
- func WithClosers(closers ...func()) Opt
- func WithDeadline(deadline time.Time) Opt
- func WithID(id string) Opt
- func WithMaxRoutines(max int) Opt
- func WithMiddlewares(middlewares ...Middleware) Opt
- func WithPubSub(pubsub pubsub.PubSub) Opt
- func WithTags(tags ...string) Opt
- func WithTimeout(to time.Duration) Opt
- func WithValue(key, val interface{}) Opt
- type Routine
- type RoutineStats
- type Stats
Examples ¶
Constants ¶
const DefaultMaxRoutines = 1000
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Func ¶
type Func func(routine Routine)
Func is the function passed into machine.Go. The Routine is passed into this function at runtime.
type GoOpt ¶
type GoOpt func(o *goOpts)
GoOpt is a function that configures GoOpts
func GoWithDeadline ¶ added in v0.2.0
GoWithDeadline is a GoOpt that creates the Routine's context with the given deadline.
func GoWithMiddlewares ¶ added in v0.0.8
func GoWithMiddlewares(middlewares ...Middleware) GoOpt
GoWithMiddlewares wraps the gived function with the input middlewares.
func GoWithPID ¶ added in v0.0.8
GoWithPID is a GoOpt that sets/overrides the process ID of the Routine. A random id is assigned if this option is not used.
func GoWithTags ¶ added in v0.0.8
GoWithTags is a GoOpt that adds an array of strings as "tags" to the Routine.
func GoWithTimeout ¶ added in v0.0.8
GoWithTimeout is a GoOpt that creates the Routine's context with the given timeout value
func GoWithValues ¶ added in v0.1.1
func GoWithValues(key, val interface{}) GoOpt
GoWithValues adds the k/v to the routine's root context. It can be retrieved with routine.Context().Value()
type Machine ¶
type Machine struct {
// contains filtered or unexported fields
}
Machine is a zero dependency runtime for managed goroutines. It is inspired by errgroup.Group with extra bells & whistles:
func New ¶
New Creates a new machine instance with the given root context & options
Example ¶
package main import ( "context" "fmt" "github.com/autom8ter/machine" "time" ) func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() m := machine.New(ctx, machine.WithMaxRoutines(10), machine.WithMiddlewares(machine.PanicRecover()), ) defer m.Close() channelName := "acme.com" const publisherID = "publisher" // start another goroutine that publishes to the target channel every second for 5 seconds OR the routine's context cancels m.Go(func(routine machine.Routine) { fmt.Printf("%v | streaming msg to channel = %v stats = %s\n", routine.PID(), channelName, routine.Machine().Stats().String()) // publish message to channel routine.Publish(channelName, "hey there bud!") }, machine.GoWithTags("publish"), machine.GoWithPID(publisherID), machine.GoWithTimeout(5*time.Second), machine.GoWithMiddlewares( // run every second until context cancels machine.Cron(time.NewTicker(1*time.Second)), ), ) // start a goroutine that subscribes to all messages sent to the target channel for 3 seconds OR the routine's context cancels m.Go(func(routine machine.Routine) { routine.Subscribe(channelName, func(obj interface{}) { fmt.Printf("%v | subscription msg received! channel = %v msg = %v stats = %s\n", routine.PID(), channelName, obj, m.Stats().String()) }) }, machine.GoWithTags("subscribe"), machine.GoWithTimeout(3*time.Second), ) // start a goroutine that subscribes to just the first two messages it receives on the channel OR the routine's context cancels m.Go(func(routine machine.Routine) { routine.SubscribeN(channelName, 2, func(obj interface{}) { fmt.Printf("%v | subscription msg received! channel = %v msg = %v stats = %s\n", routine.PID(), channelName, obj, m.Stats().String()) }) }, machine.GoWithTags("subscribeN")) // check if the machine has the publishing routine exitAfterPublisher := func() bool { return m.HasRoutine(publisherID) } // start a goroutine that subscribes to the channel until the publishing goroutine exits OR the routine's context cancels m.Go(func(routine machine.Routine) { routine.SubscribeUntil(channelName, exitAfterPublisher, func(obj interface{}) { fmt.Printf("%v | subscription msg received! channel = %v msg = %v stats = %s\n", routine.PID(), channelName, obj, m.Stats().String()) }) }, machine.GoWithTags("subscribeUntil")) m.Wait() }
Output:
func (*Machine) Cancel ¶
func (p *Machine) Cancel()
Cancel cancels every goroutines context within the machine instance & it's children
func (*Machine) CancelRoutine ¶ added in v0.2.0
CancelRoutine cancels the context of the active routine with the given id if it exists.
func (*Machine) Close ¶ added in v0.0.5
func (m *Machine) Close()
Close completely closes the machine's pubsub instance & all of it's closer functions. It also closes all of it's child machines(if they exist)
func (*Machine) Go ¶
Go calls the given function in a new goroutine and returns the goroutine's unique id it is passed information about the goroutine at runtime via the Routine interface
func (*Machine) HasRoutine ¶ added in v0.2.0
HasRoutine returns true if the machine has a active routine with the given id
func (*Machine) Parent ¶ added in v0.0.8
Parent returns the parent Machine instance if it exists and nil if not.
func (*Machine) Stats ¶
Stats returns Goroutine information from the machine and all of it's children
func (*Machine) Sub ¶ added in v0.0.8
Sub returns a nested Machine instance that is dependent on the parent machine's context. It inherits the parent's pubsub implementation & middlewares if none are provided Sub machine's do not inherit their parents max routine setting
type Middleware ¶
Middleware is a function that wraps/modifies the behavior of a machine.Func.
func After ¶ added in v0.0.4
func After(afterFunc func(routine Routine)) Middleware
After exectues the afterFunc after the main goroutine exits.
func Before ¶ added in v0.0.4
func Before(beforeFunc func(routine Routine)) Middleware
Before exectues the beforeFunc before the main goroutine is executed.
func Cron ¶
func Cron(ticker *time.Ticker) Middleware
Cron is a middleware that execute the function every time the ticker ticks until the goroutine's context cancels
func Decider ¶ added in v0.0.4
func Decider(deciderFunc func(routine Routine) bool) Middleware
Decider exectues the deciderFunc before the main goroutine is executed. If it returns false, the goroutine won't be executed.
func PanicRecover ¶ added in v0.0.9
func PanicRecover() Middleware
PanicRecover wraps a goroutine with a middleware the recovers from panics.
func While ¶ added in v0.2.0
func While(deciderFunc func(routine Routine) bool) Middleware
While is a middleware that will execute the Func while deciderFunc() returns true or the context cancels.
type Opt ¶
type Opt func(o *option)
Opt is a single option when creating a machine instance with New
func WithChildren ¶ added in v0.0.8
WithChildren sets the machine instances children
func WithClosers ¶ added in v0.7.2
func WithClosers(closers ...func()) Opt
func WithDeadline ¶ added in v0.2.0
WithDeadline is an Opt that creates the Machine's context with the given deadline.
func WithID ¶ added in v0.2.0
WithID sets the machine instances unique id. If one isn't provided, a unique id will be assigned
func WithMaxRoutines ¶
WithMaxRoutines throttles goroutines at the input number. It will panic if <= zero.
func WithMiddlewares ¶
func WithMiddlewares(middlewares ...Middleware) Opt
WithMiddlewares wraps every goroutine function executed by the machine with the given middlewares. Middlewares can be added to individual goroutines with GoWithMiddlewares
func WithPubSub ¶ added in v0.0.5
WithPubSub sets the pubsub implementation for the machine instance. An inmemory implementation is used if none is provided.
func WithTimeout ¶
WithTimeout is an Opt that creates the Machine's context with the given timeout value
type Routine ¶
type Routine interface { // Context returns the goroutines unique context that may be used for cancellation Context() context.Context // Cancel cancels the context returned from Context() Cancel() // PID() is the goroutines unique process id PID() string // Tags() are the tags associated with the goroutine Tags() []string // Start is when the goroutine started Start() time.Time // Duration is the duration since the goroutine started Duration() time.Duration // Publish publishes the object to the given channel Publish(channel string, obj interface{}) error // PublishN publishes the object to the channel by name to the first N subscribers of the channel PublishN(channel string, obj interface{}, n int) error // Subscribe subscribes to a channel and executes the function on every message passed to it. It exits if the goroutines context is cancelled. Subscribe(channel string, handler func(obj interface{})) error // SubscribeN subscribes to the given channel until it receives N messages or its context is cancelled SubscribeN(channel string, n int, handler func(msg interface{})) error // SubscribeUntil subscribes to the given channel until the decider returns false for the first time. The subscription breaks when the routine's context is cancelled or the decider returns false. SubscribeUntil(channel string, decider func() bool, handler func(msg interface{})) error // SubscribeWhile subscribes to the given channel while the decider returns true. The subscription breaks when the routine's context is cancelled. SubscribeWhile(channel string, decider func() bool, handler func(msg interface{})) error // SubscribeFilter subscribes to the given channel with the given filter. The subscription breaks when the routine's context is cancelled. SubscribeFilter(channel string, filter func(msg interface{}) bool, handler func(msg interface{})) error // TraceLog logs a message within the goroutine execution tracer. ref: https://golang.org/pkg/runtime/trace/#example_ TraceLog(message string) // Machine returns the underlying routine's machine instance Machine() *Machine }
Routine is an interface representing a goroutine
type RoutineStats ¶
type RoutineStats struct { PID string `json:"pid"` Start time.Time `json:"start"` Duration time.Duration `json:"duration"` Tags []string `json:"tags"` }
RoutineStats holds information about a single goroutine
type Stats ¶
type Stats struct { ID string `json:"id"` Tags []string `json:"tags"` TotalRoutines int `json:"totalRoutines"` ActiveRoutines int `json:"activeRoutines"` Routines []RoutineStats `json:"routines"` TotalChildren int `json:"totalChildren"` HasParent bool `json:"hasParent"` TotalMiddlewares int `json:"totalMiddlewares"` Timeout time.Duration `json:"timeout"` Deadline time.Time `json:"deadline"` Children []*Stats `json:"children"` }
Stats holds information about goroutines