machine

package module
v0.3.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 17, 2020 License: Apache-2.0 Imports: 14 Imported by: 5

README

Machine GoDoc

import "github.com/autom8ter/machine"

m := machine.New(context.Background(),
	// functions are added to a FIFO channel that will block when active routines == max routines. 
	machine.WithMaxRoutines(10),
        // every function executed by machine.Go will recover from panics
	machine.WithMiddlewares(machine.PanicRecover()),
	// WithValue passes the value to the root context of the machine- it is available in the context of all child machine's & all Routine's
        machine.WithValue("testing", true),
        // WithTimeout cancels the machine's context after the given timeout
        machine.WithTimeout(30 *time.Second)
)
defer m.Close()

channelName := "acme.com"

// start a goroutine that subscribes to all messages sent to the target channel for 5 seconds
m.Go(func(routine machine.Routine) {
		routine.Subscribe(channelName, func(obj interface{}) {
			fmt.Printf("%v | subscription msg received! channel = %v msg = %v stats = %s\n",
				routine.PID(), channelName, obj, m.Stats().String())
		})
	},
	machine.GoWithTags("subscribe"),
	machine.GoWithTimeout(5*time.Second),
)

// start another goroutine that publishes to the target channel every second for 5 seconds
m.Go(func(routine machine.Routine) {
		fmt.Printf("%v | streaming msg to channel = %v stats = %s\n", routine.PID(), channelName, routine.Machine().Stats().String())
		// publish message to channel
		routine.Publish(channelName, "hey there bud!")
	},
	machine.GoWithTags("publish"),
	machine.GoWithTimeout(5*time.Second),
	machine.GoWithMiddlewares(
		// run every second until context cancels
		machine.Cron(time.NewTicker(1*time.Second)),
	),
)

m.Wait()

Machine is a zero dependency library for highly concurrent Go applications. It is inspired by errgroup.Group with extra bells & whistles:

  • throttled goroutines

  • self-cancellable goroutines with Context

  • global-cancellable goroutines with context (see Cancel)

  • goroutines have IDs and optional tags for easy debugging (see Stats)

  • publish/subscribe to channels for broadcasting messages to active goroutines

  • middlewares for wrapping/decorating functions

  • "sub" machines for creating a dependency tree between groups of goroutines

  • greater than 80% test coverage

  • goroutine leak prevention

Use Cases

Machine is meant to be completely agnostic and dependency free- its use cases are expected to be emergent. Really, it can be used anywhere goroutines are used.

Highly concurrent and/or asynchronous applications include:

  • gRPC streaming servers

  • websocket servers

  • pubsub servers

  • reverse proxies

  • cron jobs

  • custom database/cache

  • ETL pipelines

  • log sink

  • filesystem walker

  • code generation

Examples

All examples are < 500 lines of code(excluding code generation)

Documentation

Index

Examples

Constants

View Source
const DefaultMaxRoutines = 1000

Variables

This section is empty.

Functions

This section is empty.

Types

type Cache

type Cache interface {
	// Get gets the value for the given key in the given namespace.
	Get(namespace string, key interface{}) (interface{}, bool)
	// Set sets a value for the given key in the given namespace with an expiration duration.
	// If the duration is 0 or less, it will be stored forever.
	Set(namespace string, key interface{}, value interface{}, duration time.Duration)
	// Range calls f sequentially for each key and value present within the given namespace.
	// If f returns false, range stops the iteration.
	Range(namespace string, f func(key, value interface{}) bool)
	// Delete deletes the key and its value from the given namespace.
	Delete(namespace string, key interface{})
	// Len returns total kv pairs within namespace
	Len(namespace string) int
	// Close closes the Cache and frees up resources.
	Close()
}

Cache is a concurrency safe cache that stores arbitrary, namespaced data with optional TTL.

type Func

type Func func(routine Routine)

Func is the function passed into machine.Go. The Routine is passed into this function at runtime.

type GoOpt

type GoOpt func(o *goOpts)

GoOpt is a function that configures GoOpts

func GoWithDeadline added in v0.2.0

func GoWithDeadline(deadline time.Time) GoOpt

GoWithDeadline is a GoOpt that creates the Routine's context with the given deadline.

func GoWithMiddlewares added in v0.0.8

func GoWithMiddlewares(middlewares ...Middleware) GoOpt

GoWithMiddlewares wraps the gived function with the input middlewares.

func GoWithPID added in v0.0.8

func GoWithPID(id string) GoOpt

GoWithPID is a GoOpt that sets/overrides the process ID of the Routine. A random id is assigned if this option is not used.

func GoWithTags added in v0.0.8

func GoWithTags(tags ...string) GoOpt

GoWithTags is a GoOpt that adds an array of strings as "tags" to the Routine.

func GoWithTimeout added in v0.0.8

func GoWithTimeout(to time.Duration) GoOpt

GoWithTimeout is a GoOpt that creates the Routine's context with the given timeout value

func GoWithValues added in v0.1.1

func GoWithValues(key, val interface{}) GoOpt

GoWithValues adds the k/v to the routine's root context. It can be retrieved with routine.Context().Value()

type Machine

type Machine struct {
	// contains filtered or unexported fields
}

Machine is a zero dependency runtime for managed goroutines. It is inspired by errgroup.Group with extra bells & whistles:

func New

func New(ctx context.Context, options ...Opt) *Machine

New Creates a new machine instance with the given root context & options

Example
package main

import (
	"context"
	"fmt"
	"github.com/autom8ter/machine"
	"time"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	m := machine.New(ctx,
		machine.WithMaxRoutines(10),
		machine.WithMiddlewares(machine.PanicRecover()),
	)
	defer m.Close()

	channelName := "acme.com"

	// start a goroutine that subscribes to all messages sent to the target channel for 5 seconds
	m.Go(func(routine machine.Routine) {
		routine.Subscribe(channelName, func(obj interface{}) {
			fmt.Printf("%v | subscription msg received! channel = %v msg = %v stats = %s\n", routine.PID(), channelName, obj, m.Stats().String())
		})
	}, machine.GoWithTags("subscribe"),
		machine.GoWithTimeout(5*time.Second),
	)

	// start another goroutine that publishes to the target channel every second for 5 seconds
	m.Go(func(routine machine.Routine) {
		fmt.Printf("%v | streaming msg to channel = %v stats = %s\n", routine.PID(), channelName, routine.Machine().Stats().String())
		// publish message to channel
		routine.Publish(channelName, "hey there bud!")
	}, machine.GoWithTags("publish"),
		machine.GoWithTimeout(5*time.Second),
		machine.GoWithMiddlewares(
			// run every second until context cancels
			machine.Cron(time.NewTicker(1*time.Second)),
		),
	)

	m.Wait()
}
Output:

func (*Machine) Active added in v0.0.8

func (p *Machine) Active() int

Active returns current active managed goroutine count

func (*Machine) Cache

func (m *Machine) Cache() Cache

Cache returns the machine's cache implementation. One is automatically set if not provided as an Opt on machine instance creation.

func (*Machine) Cancel

func (p *Machine) Cancel()

Cancel cancels every goroutines context within the machine instance & it's children

func (*Machine) CancelRoutine added in v0.2.0

func (m *Machine) CancelRoutine(id string)

CancelRoutine cancels the context of the active routine with the given id if it exists.

func (*Machine) Close added in v0.0.5

func (m *Machine) Close()

Close completely closes the machine instance & all of it's children

func (*Machine) Go

func (m *Machine) Go(fn Func, opts ...GoOpt)

Go calls the given function in a new goroutine. it is passed information about the goroutine at runtime via the Routine interface

func (*Machine) HasRoutine added in v0.2.0

func (m *Machine) HasRoutine(id string) bool

HasRoutine returns true if the machine has a active routine with the given id

func (*Machine) ID added in v0.2.0

func (m *Machine) ID() string

ID returns the machine instance's unique id.

func (*Machine) Parent added in v0.0.8

func (m *Machine) Parent() *Machine

Parent returns the parent Machine instance if it exists and nil if not.

func (*Machine) Stats

func (m *Machine) Stats() *Stats

Stats returns Goroutine information from the machine and all of it's children

func (*Machine) Sub added in v0.0.8

func (m *Machine) Sub(opts ...Opt) *Machine

Sub returns a nested Machine instance that is dependent on the parent machine's context. It inherits the parent's pubsub/cache implementation & middlewares if none are provided Sub machine's do not inherit their parents max routine setting

func (*Machine) Tags added in v0.0.9

func (p *Machine) Tags() []string

Tags returns the machine's tags

func (*Machine) Total

func (p *Machine) Total() int

Total returns total goroutines that have been executed by the machine

func (*Machine) Wait

func (m *Machine) Wait()

Wait blocks until total active goroutine count reaches zero for the instance and all of it's children. At least one goroutine must have finished in order for wait to un-block

type Middleware

type Middleware func(fn Func) Func

Middleware is a function that wraps/modifies the behavior of a machine.Func.

func After added in v0.0.4

func After(afterFunc func(routine Routine)) Middleware

After exectues the afterFunc after the main goroutine exits.

func Before added in v0.0.4

func Before(beforeFunc func(routine Routine)) Middleware

Before exectues the beforeFunc before the main goroutine is executed.

func Cron

func Cron(ticker *time.Ticker) Middleware

Cron is a middleware that execute the function every time the ticker ticks until the goroutine's context cancels

func Decider added in v0.0.4

func Decider(deciderFunc func(routine Routine) bool) Middleware

Decider exectues the deciderFunc before the main goroutine is executed. If it returns false, the goroutine won't be executed.

func PanicRecover added in v0.0.9

func PanicRecover() Middleware

PanicRecover wraps a goroutine with a middleware the recovers from panics.

func While added in v0.2.0

func While(deciderFunc func(routine Routine) bool) Middleware

While is a middleware that will continue to execute the Func while deciderFunc() returns true. The loop breaks the first time deciderFunc() returns false or the routine's context cancels

type Opt

type Opt func(o *option)

Opt is a single option when creating a machine instance with New

func WithCache

func WithCache(cache Cache) Opt

WithCache sets the cache implementation for the machine instance. An inmemory implementation is used if none is provided.

func WithChildren added in v0.0.8

func WithChildren(children ...*Machine) Opt

WithChildren sets the machine instances children

func WithDeadline added in v0.2.0

func WithDeadline(deadline time.Time) Opt

WithDeadline is an Opt that creates the Machine's context with the given deadline.

func WithID added in v0.2.0

func WithID(id string) Opt

WithID sets the machine instances unique id. If one isn't provided, a unique id will be assigned

func WithMaxRoutines

func WithMaxRoutines(max int) Opt

WithMaxRoutines throttles goroutines at the input number. It will panic if <= zero.

func WithMiddlewares

func WithMiddlewares(middlewares ...Middleware) Opt

WithMiddlewares wraps every goroutine function executed by the machine with the given middlewares. Middlewares can be added to individual goroutines with GoWithMiddlewares

func WithPubSub added in v0.0.5

func WithPubSub(pubsub PubSub) Opt

WithPubSub sets the pubsub implementation for the machine instance. An inmemory implementation is used if none is provided.

func WithTags

func WithTags(tags ...string) Opt

WithTags sets the machine instances tags

func WithTimeout

func WithTimeout(to time.Duration) Opt

WithTimeout is an Opt that creates the Machine's context with the given timeout value

func WithValue added in v0.2.0

func WithValue(key, val interface{}) Opt

WithValue adds the k/v to the Machine's root context. It can be retrieved with context.Value() in all sub routine contexts

type PubSub added in v0.0.5

type PubSub interface {
	// Publish publishes the object to the channel by name
	Publish(channel string, obj interface{}) error
	// Subscribe subscribes to the given channel
	Subscribe(ctx context.Context, channel string, handler func(obj interface{})) error
	Close()
}

PubSub is used to asynchronously pass messages between routines.

type Routine

type Routine interface {
	// Context returns the goroutines unique context that may be used for cancellation
	Context() context.Context
	// Cancel cancels the context returned from Context()
	Cancel()
	// PID() is the goroutines unique process id
	PID() string
	// Tags() are the tags associated with the goroutine
	Tags() []string
	// Start is when the goroutine started
	Start() time.Time
	// Duration is the duration since the goroutine started
	Duration() time.Duration
	// Publish publishes the object to the given channel
	Publish(channel string, obj interface{}) error
	// Subscribe subscribes to a channel and executes the function on every message passed to it. It exits if the goroutines context is cancelled.
	Subscribe(channel string, handler func(obj interface{})) error
	// Machine returns the underlying routine's machine instance
	Machine() *Machine
}

Routine is an interface representing a goroutine

type RoutineStats

type RoutineStats struct {
	PID      string        `json:"pid"`
	Start    time.Time     `json:"start"`
	Duration time.Duration `json:"duration"`
	Tags     []string      `json:"tags"`
}

RoutineStats holds information about a single goroutine

type Stats

type Stats struct {
	ID               string         `json:"id"`
	Tags             []string       `json:"tags"`
	TotalRoutines    int            `json:"totalRoutines"`
	ActiveRoutines   int            `json:"activeRoutines"`
	Routines         []RoutineStats `json:"routines"`
	TotalChildren    int            `json:"totalChildren"`
	HasParent        bool           `json:"hasParent"`
	TotalMiddlewares int            `json:"totalMiddlewares"`
	Timeout          time.Duration  `json:"timeout"`
	Deadline         time.Time      `json:"deadline"`
	Children         []*Stats       `json:"children"`
}

Stats holds information about goroutines

func (Stats) String

func (s Stats) String() string

String prints a pretty json string of the stats

Directories

Path Synopsis
examples module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL