machine

package module
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 20, 2022 License: MIT Imports: 4 Imported by: 13

README

Go PkgGoDev GoDoc Go Report Card Codacy Badge Codacy Badge Version Badge

Machine

Machine is a library for creating data workflows. These workflows can be either very concise or quite complex, even allowing for cycles for flows that need retry or self healing mechanisms.

Implementations of the EdgeProvider and Edge interfaces can be used for fan-out so that large volumes of data can be processed by multiple nodes.

type EdgeProvider[T Identifiable] interface {
	New(name string, option *Option[T]) Edge[T]
}

type Edge[T Identifiable] interface {
	OutputTo(ctx context.Context, channel chan []T)
	Input(payload ...T)
}

Examples of using both AWS SQS and Google Pub/Sub coming soon!


The main function types are:

// Applicative is a function that is applied on an individual
// basis for each Packet in the payload. The resulting data replaces
// the old data
type Applicative[T Identifiable] func(d T) T

// Combiner is a function used to combine a payload into a single Packet.
type Combiner[T Identifiable] func(payload []T) T

// Filter is a function that can be used to filter the payload.
type Filter[T Identifiable] func(d T) FilterResult

// Comparator is a function to compare 2 items
type Comparator[T Identifiable] func(a T, b T) int

// Window is a function to work on a window of data
type Window[T Identifiable] func(payload []T) []T

// Remover func that is used to remove Data based on a true result
type Remover[T Identifiable] func(index int, d T) bool

These all implement the Component interface, which can be use to provide a Vertex instance. The Vertex type is the main building block of a Stream. The builder method creates these under the covers, and they can be used individually for testing of single operations.

// Component is an interface for providing a Vertex that can be used to run individual components on the payload.
type Component[T Identifiable] interface {
	Component(e Edge[T]) Vertex[T]
}

Installation

Add the primary library to your project

  go get -u github.com/whitaker-io/machine

Example

Basic receive -> process -> send Flow

package main

import (
  "context"
  "fmt"
  "os"
  "os/signal"
  "time"

  "github.com/whitaker-io/machine"
)

type kv struct {
  name  string
  value int
}

func (i *kv) ID() string {
  return i.name
}

func deepcopyKV(k *kv) *kv { return &kv{name: k.name, value: k.value} }

func main() {
  ctx, cancel := context.WithCancel(context.Background())

  stream := machine.NewWithChannels(
    "test_stream",
    &Option[*kv]{
      DeepCopy:   deepcopyKV,
      FIFO:       false,
      BufferSize: 0,
    },
  )

  input := make(chan []*kv)
  out := make(chan []*kv)


  go func() {
    input <- someData //get some data from somewhere
  }()

  // this is a very simple example try experimenting 
  // with more complex flows including loops and filters
  stream.Builder().
    Map(
      func(m *kv) *kv {
        // do some processing
        return m
      },
    ).OutputTo(out)

  if err := streamm.StartWith(ctx, input); err != nil {
    fmt.Println(err)
  }


  go func() {
  Loop:
    for {
      select {
      case <-ctx.Done():
        break Loop
      case data := <-out:
        //handle the processed data
      }
    }
  }()

  // run until SIGTERM
  c := make(chan os.Signal, 1)
  signal.Notify(c, os.Interrupt)

  <-c
  cancel()

  // give some time for a graceful shutdown
  <-time.After(time.Second * 2)
}

🤝 Contributing

Contributions, issues and feature requests are welcome.
Feel free to check issues page if you want to contribute.
Check the contributing guide.

Author

👤 Jonathan Whitaker

Show your support

Please ⭐️ this repository if this project helped you!


License

Machine is provided under the MIT License.

The MIT License (MIT)

Copyright (c) 2020 Jonathan Whitaker

Documentation

Overview

Package machine - Copyright © 2020 Jonathan Whitaker <github@whitaker.io>.

Use of this source code is governed by an MIT-style license that can be found in the LICENSE file.

Package machine - Copyright © 2020 Jonathan Whitaker <github@whitaker.io>.

Use of this source code is governed by an MIT-style license that can be found in the LICENSE file.

Package machine - Copyright © 2020 Jonathan Whitaker <github@whitaker.io>.

Use of this source code is governed by an MIT-style license that can be found in the LICENSE file.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Applicative added in v0.3.0

type Applicative[T Identifiable] func(d T) T

Applicative is a function that is applied on an individual basis for each Packet in the payload. The resulting data replaces the old data

func (Applicative[T]) Component added in v1.0.0

func (x Applicative[T]) Component(output Edge[T]) Vertex[T]

type Builder

type Builder[T Identifiable] interface {
	Map(a Applicative[T]) Builder[T]
	Window(x Window[T]) Builder[T]
	Sort(x Comparator[T]) Builder[T]
	Remove(x Remover[T]) Builder[T]
	Combine(x Combiner[T]) Builder[T]
	Filter(f Filter[T]) (Builder[T], Builder[T])
	Loop(x Filter[T]) (loop, out Builder[T])
	OutputTo(x chan []T)
}

Builder is the interface provided for creating a data processing stream.

type Combiner added in v1.0.0

type Combiner[T Identifiable] func(payload []T) T

Combiner is a function used to combine a payload into a single Packet.

func (Combiner[T]) Component added in v1.0.0

func (x Combiner[T]) Component(output Edge[T]) Vertex[T]

Component is a function for providing a vertex that can be used to run individual components on the payload.

type Comparator added in v0.16.5

type Comparator[T Identifiable] func(a T, b T) int

Comparator is a function to compare 2 items

func (Comparator[T]) Component added in v1.0.0

func (x Comparator[T]) Component(output Edge[T]) Vertex[T]

Component is a function for providing a vertex that can be used to run individual components on the payload.

type Component added in v1.0.0

type Component[T Identifiable] interface {
	Component(e Edge[T]) Vertex[T]
}

Component is an interface for providing a vertex that can be used to run individual components on the payload.

type Edge added in v0.17.0

type Edge[T Identifiable] interface {
	OutputTo(ctx context.Context, channel chan []T)
	Input(payload ...T)
}

Edge is an interface that is used for transferring data between vertices

func AsEdge added in v1.0.0

func AsEdge[T Identifiable](c chan []T) Edge[T]

AsEdge is a helper function to create an edge from a channel.

type EdgeProvider added in v0.17.0

type EdgeProvider[T Identifiable] interface {
	New(name string, option *Option[T]) Edge[T]
}

EdgeProvider is an interface for providing an edge that will be used to communicate between vertices.

type Filter added in v1.0.0

type Filter[T Identifiable] func(d T) FilterResult

Filter is a function that can be used to filter the payload.

func (Filter[T]) Component added in v1.0.0

func (x Filter[T]) Component(left, right Edge[T], option *Option[T]) Vertex[T]

Component is a function for providing a vertex that can be used to run individual components on the payload.

type FilterResult added in v1.0.0

type FilterResult uint8

FilterResult is a type that is used to indicate the result of a filter.

const (
	// FilterLeft is a filter result that indicates that the item should move to the left branch.
	FilterLeft FilterResult = iota
	// FilterRight is a filter result that indicates that the item should move to the right branch.
	FilterRight
	// FilterBoth is a filter result that indicates that the item should go down both paths using the option.Deepcopy fn if provided.
	FilterBoth
)

func Duplicate added in v1.0.0

func Duplicate[T Identifiable](d T) FilterResult

Duplicate shorthand for FilterBoth.

type Identifiable added in v1.0.0

type Identifiable interface {
	ID() string
}

Identifiable is an interface that is used for providing an ID for a packet

type Option added in v0.2.0

type Option[T Identifiable] struct {
	// FIFO controls the processing order of the payloads
	// If set to true the system will wait for one payload
	// to be processed before starting the next.
	// Default: false
	FIFO bool `json:"fifo,omitempty"`
	// BufferSize sets the buffer size on the edge channels between the
	// vertices, this setting can be useful when processing large amounts
	// of data with FIFO turned on.
	// Default: 0
	BufferSize int `json:"buffer_size,omitempty"`
	// MaxParallel sets the maximum number of parallel goroutines is only applicable when FIFO is turned off.
	// Default: 0
	MaxParallel int `json:"max_parallel,omitempty"`
	// Telemetry provides the ability to enable and configure telemetry
	Telemetry Telemetry[T] `json:"telemetry,omitempty"`
	// PanicHandler is a function that is called when a panic occurs
	// Default: log the panic and no-op
	PanicHandler func(err error, payload ...T) `json:"-"`
	// DeepCopy is a function to preform a deep copy of the Payload
	DeepCopy func(T) T `json:"-"`
}

Option type for holding machine settings.

type Remover added in v0.16.5

type Remover[T Identifiable] func(index int, d T) bool

Remover func that is used to remove Data based on a true result

func (Remover[T]) Component added in v1.0.0

func (x Remover[T]) Component(output Edge[T]) Vertex[T]

Component is a function for providing a vertex that can be used to run individual components on the payload.

type Span added in v1.0.0

type Span[T Identifiable] interface {
	RecordPayload(payload ...T)
	RecordError(error)
	SpanEnd()
}

Span type for holding telemetry settings.

type Stream added in v0.5.0

type Stream[T Identifiable] interface {
	Start(ctx context.Context) error
	StartWith(ctx context.Context, input ...chan []T) error
	Consume(input ...chan []T)
	Clear()
	Builder() Builder[T]
}

Stream is a representation of a data stream and its associated logic.

The Builder method is the entrypoint into creating the data processing flow. All branches of the Stream are required to end in an OutputTo call.

func New

func New[T Identifiable](name string, provider EdgeProvider[T], options *Option[T]) Stream[T]

New is a function for creating a new Stream.

name string provider EdgeProvider[T] option *Option[T]

func NewWithChannels added in v1.0.0

func NewWithChannels[T Identifiable](name string, options *Option[T]) Stream[T]

NewWithChannels is a function for creating a new Stream using channels to pass the data.

name string option *Option[T]

type Telemetry added in v1.0.0

type Telemetry[T Identifiable] interface {
	PayloadSize(string, int64)
	IncrementPayloadCount(string)
	IncrementErrorCount(string)
	Duration(string, time.Duration)
	StartSpan(string) Span[T]
}

Telemetry type for holding telemetry settings.

type Vertex added in v0.3.0

type Vertex[T Identifiable] func(payload []T)

Vertex is a type used to process data for a stream.

func (Vertex[T]) Run added in v1.0.0

func (x Vertex[T]) Run(ctx context.Context, name string, channel chan []T, option *Option[T])

Run creates a go func to process the data in the channel until the context is canceled.

type Window added in v1.0.0

type Window[T Identifiable] func(payload []T) []T

Window is a function to work on a window of data

func (Window[T]) Component added in v1.0.0

func (x Window[T]) Component(output Edge[T]) Vertex[T]

Component is a function for providing a vertex that can be used to run individual components on the payload.

Directories

Path Synopsis
cmd module
common module
edge
http Module
pubsub Module
telemetry module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL