machine

package module
v1.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 19, 2021 License: Apache-2.0 Imports: 16 Imported by: 5

README

Machine GoDoc

concurrency

import "github.com/autom8ter/machine"

        ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	m := machine.New(ctx,
		machine.WithMaxRoutines(10),
		machine.WithMiddlewares(machine.PanicRecover()),
	)
	defer m.Close()

	channelName := "acme.com"
	const publisherID = "publisher"
	// start another goroutine that publishes to the target channel every second for 5 seconds OR the routine's context cancels
	m.Go(func(routine machine.Routine) {
		fmt.Printf("%v | streaming msg to channel = %v stats = %s\n", routine.PID(), channelName, routine.Machine().Stats().String())
		// publish message to channel
		routine.Publish(channelName, "hey there bud!")
	}, machine.GoWithTags("publish"),
		machine.GoWithPID(publisherID),
		machine.GoWithTimeout(5*time.Second),
		machine.GoWithMiddlewares(
			// run every second until context cancels
			machine.Cron(time.NewTicker(1*time.Second)),
		),
	)
	// start a goroutine that subscribes to all messages sent to the target channel for 3 seconds OR the routine's context cancels
	m.Go(func(routine machine.Routine) {
		routine.Subscribe(channelName, func(obj interface{}) bool {
			fmt.Printf("%v | subscription msg received! channel = %v msg = %v stats = %s\n", routine.PID(), channelName, obj, m.Stats().String())
		    return true
        })
	}, machine.GoWithTags("subscribe"),
		machine.GoWithTimeout(3*time.Second),
	)
	m.Wait()

Machine is a zero dependency library for highly concurrent Go applications. It is inspired by errgroup.Group with extra bells & whistles:

  • throttled goroutines

  • self-cancellable goroutines with Context

  • global-cancellable goroutines with context (see Cancel)

  • goroutines have IDs and optional tags for easy debugging (see Stats)

  • native publish/subscribe implementation for broadcasting messages to active goroutines

  • middlewares for wrapping/decorating functions

  • "sub" machines for creating a dependency tree between groups of goroutines

  • goroutine leak prevention

  • native pprof & golang execution tracer integration

Use Cases

Machine is meant to be completely agnostic and dependency free- its use cases are expected to be emergent. Really, it can be used anywhere goroutines are used.

Highly concurrent and/or asynchronous applications include:

  • gRPC streaming servers

  • websocket servers

  • pubsub servers

  • reverse proxies

  • cron jobs

  • custom database/cache

  • ETL pipelines

  • log sink

  • filesystem walker

  • code generation

Examples

All examples are < 500 lines of code(excluding code generation)

Documentation

Index

Examples

Constants

View Source
const DefaultMaxRoutines = 1000

Variables

This section is empty.

Functions

This section is empty.

Types

type Func

type Func func(routine Routine)

Func is the function passed into machine.Go. The Routine is passed into this function at runtime.

type GoOpt

type GoOpt func(o *goOpts)

GoOpt is a function that configures GoOpts

func GoWithDeadline added in v0.2.0

func GoWithDeadline(deadline time.Time) GoOpt

GoWithDeadline is a GoOpt that creates the Routine's context with the given deadline.

func GoWithMiddlewares added in v0.0.8

func GoWithMiddlewares(middlewares ...Middleware) GoOpt

GoWithMiddlewares wraps the gived function with the input middlewares.

func GoWithPID added in v0.0.8

func GoWithPID(id string) GoOpt

GoWithPID is a GoOpt that sets/overrides the process ID of the Routine. A random id is assigned if this option is not used.

func GoWithTags added in v0.0.8

func GoWithTags(tags ...string) GoOpt

GoWithTags is a GoOpt that adds an array of strings as "tags" to the Routine.

func GoWithTimeout added in v0.0.8

func GoWithTimeout(to time.Duration) GoOpt

GoWithTimeout is a GoOpt that creates the Routine's context with the given timeout value

func GoWithValues added in v0.1.1

func GoWithValues(key, val interface{}) GoOpt

GoWithValues adds the k/v to the routine's root context. It can be retrieved with routine.Context().Value()

type Machine

type Machine struct {
	// contains filtered or unexported fields
}

Machine is a zero dependency runtime for managed goroutines. It is inspired by errgroup.Group with extra bells & whistles:

func New

func New(ctx context.Context, options ...Opt) *Machine

New Creates a new machine instance with the given root context & options

Example
package main

import (
	"context"
	"fmt"
	"github.com/autom8ter/machine"
	"time"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	m := machine.New(ctx,
		machine.WithMaxRoutines(10),
		machine.WithMiddlewares(machine.PanicRecover()),
	)
	defer m.Close()

	channelName := "acme.com"
	const publisherID = "publisher"
	// start another goroutine that publishes to the target channel every second for 5 seconds OR the routine's context cancels
	m.Go(func(routine machine.Routine) {
		fmt.Printf("%v | streaming msg to channel = %v stats = %s\n", routine.PID(), channelName, routine.Machine().Stats().String())
		// publish message to channel
		routine.Publish(channelName, "hey there bud!")
	}, machine.GoWithTags("publish"),
		machine.GoWithPID(publisherID),
		machine.GoWithTimeout(5*time.Second),
		machine.GoWithMiddlewares(
			// run every second until context cancels
			machine.Cron(time.NewTicker(1*time.Second)),
		),
	)
	// start a goroutine that subscribes to all messages sent to the target channel for 3 seconds OR the routine's context cancels
	m.Go(func(routine machine.Routine) {
		routine.Subscribe(channelName, func(obj interface{}) bool {
			fmt.Printf("%v | subscription msg received! channel = %v msg = %v stats = %s\n", routine.PID(), channelName, obj, m.Stats().String())
			return true
		})
	}, machine.GoWithTags("subscribe"),
		machine.GoWithTimeout(3*time.Second),
	)

	// start a goroutine that subscribes to the channel until the publishing goroutine exits OR the routine's context cancels
	m.Go(func(routine machine.Routine) {
		routine.Subscribe(channelName, func(obj interface{}) bool {
			fmt.Printf("%v | subscription msg received! channel = %v msg = %v stats = %s\n", routine.PID(), channelName, obj, m.Stats().String())
			return m.HasRoutine(publisherID)
		})
	}, machine.GoWithTags("subscribeUntil"))

	m.Wait()
}
Output:

func (*Machine) Active added in v0.0.8

func (p *Machine) Active() int

Active returns current active managed goroutine count

func (*Machine) Cancel

func (p *Machine) Cancel()

Cancel cancels every goroutines context within the machine instance & it's children

func (*Machine) CancelRoutine added in v0.2.0

func (m *Machine) CancelRoutine(id string)

CancelRoutine cancels the context of the active routine with the given id if it exists.

func (*Machine) Close added in v0.0.5

func (m *Machine) Close()

Close completely closes the machine's pubsub instance & all of it's closer functions. It also closes all of it's child machines(if they exist)

func (*Machine) Go

func (m *Machine) Go(fn Func, opts ...GoOpt) string

Go calls the given function in a new goroutine and returns the goroutine's unique id it is passed information about the goroutine at runtime via the Routine interface

func (*Machine) HasRoutine added in v0.2.0

func (m *Machine) HasRoutine(id string) bool

HasRoutine returns true if the machine has a active routine with the given id

func (*Machine) ID added in v0.2.0

func (m *Machine) ID() string

ID returns the machine instance's unique id.

func (*Machine) Parent added in v0.0.8

func (m *Machine) Parent() *Machine

Parent returns the parent Machine instance if it exists and nil if not.

func (*Machine) PubSub added in v1.0.1

func (m *Machine) PubSub() pubsub.PubSub

PubSub returns the machine's underlying pubsub implementation

func (*Machine) Stats

func (m *Machine) Stats() *Stats

Stats returns Goroutine information from the machine and all of it's children

func (*Machine) Sub added in v0.0.8

func (m *Machine) Sub(opts ...Opt) *Machine

Sub returns a nested Machine instance that is dependent on the parent machine's context. It inherits the parent's pubsub implementation & middlewares if none are provided Sub machine's do not inherit their parents max routine setting

func (*Machine) Tags added in v0.0.9

func (p *Machine) Tags() []string

Tags returns the machine's tags

func (*Machine) Total

func (p *Machine) Total() int

Total returns total goroutines that have been fully executed by the machine

func (*Machine) Wait

func (m *Machine) Wait()

Wait blocks until total active goroutine count reaches zero for the instance and all of it's children. At least one goroutine must have finished in order for wait to un-block

type Middleware

type Middleware func(fn Func) Func

Middleware is a function that wraps/modifies the behavior of a machine.Func.

func After added in v0.0.4

func After(afterFunc func(routine Routine)) Middleware

After exectues the afterFunc after the main goroutine exits.

func Before added in v0.0.4

func Before(beforeFunc func(routine Routine)) Middleware

Before exectues the beforeFunc before the main goroutine is executed.

func Cron

func Cron(ticker *time.Ticker) Middleware

Cron is a middleware that execute the function every time the ticker ticks until the goroutine's context cancels

func Decider added in v0.0.4

func Decider(deciderFunc func(routine Routine) bool) Middleware

Decider exectues the deciderFunc before the main goroutine is executed. If it returns false, the goroutine won't be executed.

func PanicRecover added in v0.0.9

func PanicRecover() Middleware

PanicRecover wraps a goroutine with a middleware the recovers from panics.

func While added in v0.2.0

func While(deciderFunc func(routine Routine) bool) Middleware

While is a middleware that will execute the Func while deciderFunc() returns true or the context cancels.

type Opt

type Opt func(o *option)

Opt is a single option when creating a machine instance with New

func WithChildren added in v0.0.8

func WithChildren(children ...*Machine) Opt

WithChildren sets the machine instances children

func WithClosers added in v0.7.2

func WithClosers(closers ...func()) Opt

WithClosers makes the Machine instance execute the given closers before it closes

func WithDeadline added in v0.2.0

func WithDeadline(deadline time.Time) Opt

WithDeadline is an Opt that creates the Machine's context with the given deadline.

func WithID added in v0.2.0

func WithID(id string) Opt

WithID sets the machine instances unique id. If one isn't provided, a unique id will be assigned

func WithMaxRoutines

func WithMaxRoutines(max int) Opt

WithMaxRoutines throttles goroutines at the input number. It will panic if <= zero.

func WithMiddlewares

func WithMiddlewares(middlewares ...Middleware) Opt

WithMiddlewares wraps every goroutine function executed by the machine with the given middlewares. Middlewares can be added to individual goroutines with GoWithMiddlewares

func WithPubSub added in v0.0.5

func WithPubSub(pubsub pubsub.PubSub) Opt

WithPubSub sets the pubsub implementation for the machine instance. An inmemory implementation is used if none is provided.

func WithTags

func WithTags(tags ...string) Opt

WithTags sets the machine instances tags

func WithTimeout

func WithTimeout(to time.Duration) Opt

WithTimeout is an Opt that creates the Machine's context with the given timeout value

func WithValue added in v0.2.0

func WithValue(key, val interface{}) Opt

WithValue adds the k/v to the Machine's root context. It can be retrieved with context.Value() in all sub routine contexts

type Routine

type Routine interface {
	// Context returns the goroutines unique context that may be used for cancellation
	Context() context.Context
	// Cancel cancels the context returned from Context()
	Cancel()
	// PID() is the goroutines unique process id
	PID() string
	// Tags() are the tags associated with the goroutine
	Tags() []string
	// Start is when the goroutine started
	Start() time.Time
	// Duration is the duration since the goroutine started
	Duration() time.Duration
	// Publish publishes the object to the given channel
	Publish(channel string, obj interface{}) error
	// Subscribe subscribes to a channel and executes the function on every message passed to it. It exits if the goroutines context is cancelled.
	Subscribe(channel string, handler pubsub.Handler, options ...pubsub.SubOpt) error
	// TraceLog logs a message within the goroutine execution tracer. ref: https://golang.org/pkg/runtime/trace/#example_
	TraceLog(message string)
	// Machine returns the underlying routine's machine instance
	Machine() *Machine
}

Routine is an interface representing a goroutine

type RoutineStats

type RoutineStats struct {
	PID      string        `json:"pid"`
	Start    time.Time     `json:"start"`
	Duration time.Duration `json:"duration"`
	Tags     []string      `json:"tags"`
}

RoutineStats holds information about a single goroutine

type Stats

type Stats struct {
	ID               string         `json:"id"`
	Tags             []string       `json:"tags"`
	TotalRoutines    int            `json:"totalRoutines"`
	ActiveRoutines   int            `json:"activeRoutines"`
	Routines         []RoutineStats `json:"routines"`
	TotalChildren    int            `json:"totalChildren"`
	HasParent        bool           `json:"hasParent"`
	TotalMiddlewares int            `json:"totalMiddlewares"`
	Timeout          time.Duration  `json:"timeout"`
	Deadline         time.Time      `json:"deadline"`
	Children         []*Stats       `json:"children"`
}

Stats holds information about goroutines

func (Stats) String

func (s Stats) String() string

String prints a pretty json string of the stats

Directories

Path Synopsis
examples module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL