machine

package module
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 18, 2020 License: Apache-2.0 Imports: 15 Imported by: 5

README

Machine GoDoc

import "github.com/autom8ter/machine"

m := machine.New(context.Background(),
	// functions are added to a FIFO channel that will block when active routines == max routines. 
	machine.WithMaxRoutines(10),
        // every function executed by machine.Go will recover from panics
	machine.WithMiddlewares(machine.PanicRecover()),
	// WithValue passes the value to the root context of the machine- it is available in the context of all child machine's & all Routine's
        machine.WithValue("testing", true),
        // WithTimeout cancels the machine's context after the given timeout
        machine.WithTimeout(30 *time.Second)
)
defer m.Close()

channelName := "acme.com"

// start a goroutine that subscribes to all messages sent to the target channel for 5 seconds
m.Go(func(routine machine.Routine) {
		routine.Subscribe(channelName, func(obj interface{}) {
			fmt.Printf("%v | subscription msg received! channel = %v msg = %v stats = %s\n",
				routine.PID(), channelName, obj, m.Stats().String())
		})
	},
	machine.GoWithTags("subscribe"),
	machine.GoWithTimeout(5*time.Second),
)

// start another goroutine that publishes to the target channel every second for 5 seconds
m.Go(func(routine machine.Routine) {
		fmt.Printf("%v | streaming msg to channel = %v stats = %s\n", routine.PID(), channelName, routine.Machine().Stats().String())
		// publish message to channel
		routine.Publish(channelName, "hey there bud!")
	},
	machine.GoWithTags("publish"),
	machine.GoWithTimeout(5*time.Second),
	machine.GoWithMiddlewares(
		// run every second until context cancels
		machine.Cron(time.NewTicker(1*time.Second)),
	),
)

m.Wait()

Machine is a zero dependency library for highly concurrent Go applications. It is inspired by errgroup.Group with extra bells & whistles:

  • throttled goroutines

  • self-cancellable goroutines with Context

  • global-cancellable goroutines with context (see Cancel)

  • goroutines have IDs and optional tags for easy debugging (see Stats)

  • publish/subscribe to channels for broadcasting messages to active goroutines

  • middlewares for wrapping/decorating functions

  • "sub" machines for creating a dependency tree between groups of goroutines

  • namespaced, concurrency safe cache

  • greater than 80% test coverage

  • goroutine leak prevention

Use Cases

Machine is meant to be completely agnostic and dependency free- its use cases are expected to be emergent. Really, it can be used anywhere goroutines are used.

Highly concurrent and/or asynchronous applications include:

  • gRPC streaming servers

  • websocket servers

  • pubsub servers

  • reverse proxies

  • cron jobs

  • custom database/cache

  • ETL pipelines

  • log sink

  • filesystem walker

  • code generation

Examples

All examples are < 500 lines of code(excluding code generation)

Documentation

Index

Examples

Constants

View Source
const DefaultMaxRoutines = 1000

Variables

This section is empty.

Functions

This section is empty.

Types

type Cache

type Cache interface {
	// Namespaces returns all of the registered namespaces(so far) in the cache. Namespaces are created automatically with Set().
	Namespaces() []string
	// Get gets the value for the given key in the given namespace.
	Get(namespace string, key interface{}) (interface{}, bool)
	// Set sets a value for the given key in the given namespace.
	Set(namespace string, key interface{}, value interface{})
	// SetAll sets all values from the map in the given namespace
	SetAll(namespace string, m Map)
	// Range calls f sequentially for each key and value present within the given namespace.
	// If f returns false, range stops the iteration.
	Range(namespace string, f func(key, value interface{}) bool)
	// Delete deletes the key and its value from the given namespace.
	Delete(namespace string, key interface{})
	// Len returns total kv pairs within namespace
	Len(namespace string) int
	// Exists returns whether the key exists within the namespace
	Exists(namespace string, key interface{}) bool
	// Copy returns an Map with all of the values in the namespace
	Copy(namespace string) Map
	// Filter iterates over all values in the namespace and returns an Map with all of the values in the namespace that the filter returns true for
	Filter(namespace string, filter func(k, v interface{}) bool) Map
	// Intersection returns a Map of all of the values that are within both namespaces
	Intersection(namespace1, namespace2 string) Map
	// Union returns a Map with a union of the two namespaces
	Union(namespace1, namespace2 string) Map
	// Map returns every value in the namespace
	Map(namespace string) Map
	// Close closes the Cache and frees up resources.
	Close()
}

Cache is a concurrency safe cache that stores arbitrary, namespaced data with optional TTL.

type Func

type Func func(routine Routine)

Func is the function passed into machine.Go. The Routine is passed into this function at runtime.

type GoOpt

type GoOpt func(o *goOpts)

GoOpt is a function that configures GoOpts

func GoWithDeadline added in v0.2.0

func GoWithDeadline(deadline time.Time) GoOpt

GoWithDeadline is a GoOpt that creates the Routine's context with the given deadline.

func GoWithMiddlewares added in v0.0.8

func GoWithMiddlewares(middlewares ...Middleware) GoOpt

GoWithMiddlewares wraps the gived function with the input middlewares.

func GoWithPID added in v0.0.8

func GoWithPID(id string) GoOpt

GoWithPID is a GoOpt that sets/overrides the process ID of the Routine. A random id is assigned if this option is not used.

func GoWithTags added in v0.0.8

func GoWithTags(tags ...string) GoOpt

GoWithTags is a GoOpt that adds an array of strings as "tags" to the Routine.

func GoWithTimeout added in v0.0.8

func GoWithTimeout(to time.Duration) GoOpt

GoWithTimeout is a GoOpt that creates the Routine's context with the given timeout value

func GoWithValues added in v0.1.1

func GoWithValues(key, val interface{}) GoOpt

GoWithValues adds the k/v to the routine's root context. It can be retrieved with routine.Context().Value()

type Machine

type Machine struct {
	// contains filtered or unexported fields
}

Machine is a zero dependency runtime for managed goroutines. It is inspired by errgroup.Group with extra bells & whistles:

func New

func New(ctx context.Context, options ...Opt) *Machine

New Creates a new machine instance with the given root context & options

Example
package main

import (
	"context"
	"fmt"
	"github.com/autom8ter/machine"
	"time"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	m := machine.New(ctx,
		machine.WithMaxRoutines(10),
		machine.WithMiddlewares(machine.PanicRecover()),
	)
	defer m.Close()

	channelName := "acme.com"

	// start a goroutine that subscribes to all messages sent to the target channel for 5 seconds
	m.Go(func(routine machine.Routine) {
		routine.Subscribe(channelName, func(obj interface{}) {
			fmt.Printf("%v | subscription msg received! channel = %v msg = %v stats = %s\n", routine.PID(), channelName, obj, m.Stats().String())
		})
	}, machine.GoWithTags("subscribe"),
		machine.GoWithTimeout(5*time.Second),
	)

	// start another goroutine that publishes to the target channel every second for 5 seconds
	m.Go(func(routine machine.Routine) {
		fmt.Printf("%v | streaming msg to channel = %v stats = %s\n", routine.PID(), channelName, routine.Machine().Stats().String())
		// publish message to channel
		routine.Publish(channelName, "hey there bud!")
	}, machine.GoWithTags("publish"),
		machine.GoWithTimeout(5*time.Second),
		machine.GoWithMiddlewares(
			// run every second until context cancels
			machine.Cron(time.NewTicker(1*time.Second)),
		),
	)

	m.Wait()
}
Output:

func (*Machine) Active added in v0.0.8

func (p *Machine) Active() int

Active returns current active managed goroutine count

func (*Machine) Cache

func (m *Machine) Cache() Cache

Cache returns the machine's cache implementation. One is automatically set if not provided as an Opt on machine instance creation.

func (*Machine) Cancel

func (p *Machine) Cancel()

Cancel cancels every goroutines context within the machine instance & it's children

func (*Machine) CancelRoutine added in v0.2.0

func (m *Machine) CancelRoutine(id string)

CancelRoutine cancels the context of the active routine with the given id if it exists.

func (*Machine) Close added in v0.0.5

func (m *Machine) Close()

Close completely closes the machine instance & all of it's children

func (*Machine) Go

func (m *Machine) Go(fn Func, opts ...GoOpt)

Go calls the given function in a new goroutine. it is passed information about the goroutine at runtime via the Routine interface

func (*Machine) HasRoutine added in v0.2.0

func (m *Machine) HasRoutine(id string) bool

HasRoutine returns true if the machine has a active routine with the given id

func (*Machine) ID added in v0.2.0

func (m *Machine) ID() string

ID returns the machine instance's unique id.

func (*Machine) Parent added in v0.0.8

func (m *Machine) Parent() *Machine

Parent returns the parent Machine instance if it exists and nil if not.

func (*Machine) Stats

func (m *Machine) Stats() *Stats

Stats returns Goroutine information from the machine and all of it's children

func (*Machine) Sub added in v0.0.8

func (m *Machine) Sub(opts ...Opt) *Machine

Sub returns a nested Machine instance that is dependent on the parent machine's context. It inherits the parent's pubsub/cache implementation & middlewares if none are provided Sub machine's do not inherit their parents max routine setting

func (*Machine) Tags added in v0.0.9

func (p *Machine) Tags() []string

Tags returns the machine's tags

func (*Machine) Total

func (p *Machine) Total() int

Total returns total goroutines that have been fully executed by the machine

func (*Machine) Wait

func (m *Machine) Wait()

Wait blocks until total active goroutine count reaches zero for the instance and all of it's children. At least one goroutine must have finished in order for wait to un-block

type Map added in v0.3.7

type Map map[interface{}]interface{}

Map is a functional map for storing arbitrary data. It is not concurrency safe

func (Map) Copy added in v0.3.7

func (m Map) Copy() Map

Copy creates a replica of the Map

func (Map) Del added in v0.3.7

func (m Map) Del(k interface{})

Del deletes the entry from the map by key

func (Map) Exists added in v0.3.7

func (m Map) Exists(key interface{}) bool

Exists returns true if the key exists in the map

func (Map) Filter added in v0.3.7

func (m Map) Filter(filter func(k, v interface{}) bool) Map

Filter returns a map of the values that return true from the filter function

func (Map) Get added in v0.3.7

func (m Map) Get(k interface{}) (interface{}, bool)

Get gets an entry from the map by key

func (Map) Intersection added in v0.3.7

func (m Map) Intersection(other Map) Map

Intersection returns the values that exist in both maps ref: https://en.wikipedia.org/wiki/Intersection_(set_theory)#:~:text=In%20mathematics%2C%20the%20intersection%20of,that%20also%20belong%20to%20A).

func (Map) Range added in v0.3.7

func (m Map) Range(iterator func(k, v interface{}) bool)

Range iterates over the map with the function. If the function returns false, the iteration exits.

func (Map) Set added in v0.3.7

func (m Map) Set(k, v interface{})

Set set an entry in the map

func (Map) Union added in v0.4.0

func (m Map) Union(other Map) Map

Union returns the all values in both maps ref: https://en.wikipedia.org/wiki/Union_(set_theory)

type Middleware

type Middleware func(fn Func) Func

Middleware is a function that wraps/modifies the behavior of a machine.Func.

func After added in v0.0.4

func After(afterFunc func(routine Routine)) Middleware

After exectues the afterFunc after the main goroutine exits.

func Before added in v0.0.4

func Before(beforeFunc func(routine Routine)) Middleware

Before exectues the beforeFunc before the main goroutine is executed.

func Cron

func Cron(ticker *time.Ticker) Middleware

Cron is a middleware that execute the function every time the ticker ticks until the goroutine's context cancels

func Decider added in v0.0.4

func Decider(deciderFunc func(routine Routine) bool) Middleware

Decider exectues the deciderFunc before the main goroutine is executed. If it returns false, the goroutine won't be executed.

func PanicRecover added in v0.0.9

func PanicRecover() Middleware

PanicRecover wraps a goroutine with a middleware the recovers from panics.

func While added in v0.2.0

func While(deciderFunc func(routine Routine) bool) Middleware

While is a middleware that will continue to execute the Func while deciderFunc() returns true. The loop breaks the first time deciderFunc() returns false or the routine's context cancels

type Opt

type Opt func(o *option)

Opt is a single option when creating a machine instance with New

func WithCache

func WithCache(cache Cache) Opt

WithCache sets the cache implementation for the machine instance. An inmemory implementation is used if none is provided.

func WithChildren added in v0.0.8

func WithChildren(children ...*Machine) Opt

WithChildren sets the machine instances children

func WithDeadline added in v0.2.0

func WithDeadline(deadline time.Time) Opt

WithDeadline is an Opt that creates the Machine's context with the given deadline.

func WithID added in v0.2.0

func WithID(id string) Opt

WithID sets the machine instances unique id. If one isn't provided, a unique id will be assigned

func WithMaxRoutines

func WithMaxRoutines(max int) Opt

WithMaxRoutines throttles goroutines at the input number. It will panic if <= zero.

func WithMiddlewares

func WithMiddlewares(middlewares ...Middleware) Opt

WithMiddlewares wraps every goroutine function executed by the machine with the given middlewares. Middlewares can be added to individual goroutines with GoWithMiddlewares

func WithPubSub added in v0.0.5

func WithPubSub(pubsub PubSub) Opt

WithPubSub sets the pubsub implementation for the machine instance. An inmemory implementation is used if none is provided.

func WithTags

func WithTags(tags ...string) Opt

WithTags sets the machine instances tags

func WithTimeout

func WithTimeout(to time.Duration) Opt

WithTimeout is an Opt that creates the Machine's context with the given timeout value

func WithValue added in v0.2.0

func WithValue(key, val interface{}) Opt

WithValue adds the k/v to the Machine's root context. It can be retrieved with context.Value() in all sub routine contexts

type PubSub added in v0.0.5

type PubSub interface {
	// Publish publishes the object to the channel by name
	Publish(channel string, obj interface{}) error
	// Subscribe subscribes to the given channel
	Subscribe(ctx context.Context, channel string, handler func(obj interface{})) error
	Close()
}

PubSub is used to asynchronously pass messages between routines.

type Routine

type Routine interface {
	// Context returns the goroutines unique context that may be used for cancellation
	Context() context.Context
	// Cancel cancels the context returned from Context()
	Cancel()
	// PID() is the goroutines unique process id
	PID() string
	// Tags() are the tags associated with the goroutine
	Tags() []string
	// Start is when the goroutine started
	Start() time.Time
	// Duration is the duration since the goroutine started
	Duration() time.Duration
	// Publish publishes the object to the given channel
	Publish(channel string, obj interface{}) error
	// Subscribe subscribes to a channel and executes the function on every message passed to it. It exits if the goroutines context is cancelled.
	Subscribe(channel string, handler func(obj interface{})) error
	// TraceLog logs a message within the goroutine execution tracer. ref: https://golang.org/pkg/runtime/trace/#example_
	TraceLog(message string)
	// Machine returns the underlying routine's machine instance
	Machine() *Machine
}

Routine is an interface representing a goroutine

type RoutineStats

type RoutineStats struct {
	PID      string        `json:"pid"`
	Start    time.Time     `json:"start"`
	Duration time.Duration `json:"duration"`
	Tags     []string      `json:"tags"`
}

RoutineStats holds information about a single goroutine

type Stats

type Stats struct {
	ID               string         `json:"id"`
	Tags             []string       `json:"tags"`
	TotalRoutines    int            `json:"totalRoutines"`
	ActiveRoutines   int            `json:"activeRoutines"`
	Routines         []RoutineStats `json:"routines"`
	TotalChildren    int            `json:"totalChildren"`
	HasParent        bool           `json:"hasParent"`
	TotalMiddlewares int            `json:"totalMiddlewares"`
	Timeout          time.Duration  `json:"timeout"`
	Deadline         time.Time      `json:"deadline"`
	Children         []*Stats       `json:"children"`
}

Stats holds information about goroutines

func (Stats) String

func (s Stats) String() string

String prints a pretty json string of the stats

Directories

Path Synopsis
examples module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL