nerv

package module
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 17, 2024 License: Apache-2.0 Imports: 7 Imported by: 2

README

nerv-go

Nerv

Nerv is a simple pub/sub eventing engine for applications that want to to process events in parallel. It operates on a standard topic/consumer/producer model where topics can be added, and consumers can subscribe to the topic(s). When an event is submitted to the engine, the event is given out to consumers based on the configurations of the respective topic (Broadcast distribution, or Direct distribution with Arbitrary, RoundRobin, and Random selection methods.)

Nerv is meant to be the central driver of an event-driven application, and so, it offers a simple interface to create "modules" that can be set to start/stop along-with the engine while providing configurations for routing/ forwarding events.

Within the source code there are two examples of modules being used.

The first is module_test which creates a TCP listener that forwards net.conn objects to the consumers of that module in a round-robin fasion. While "load balancing" this internally may not make sense, its used strictly for testing.

The second is modhttp, a module meant specifically to open the event bus to the world (or local network, etc) via http. This module contains client and server functions along with an optional "authorization wrapper + callback" scheme that can allow a user to filter out any submissions that have invalid or nonexistent API tokens, etc. See modhttp_test.go in the modhttp directory.

The Examples

As a means to demonstrate/ test/ and debug nerv instances, the cli in examples/http_app was made. This cli has daemon-like functionality along with shutdown timers (using the event engine via a reaper.) You can use this application to check the status of a nerv instance, run a nerv instance, submit events, and generally just use it to see what nerv is meant for.

Since the example is really the bread and butter of nerv's use-case it is further elaborated on below.

Building

Since nerv doesn't use anything other than vanilla go all we have to do to build the example all you need to do is go into the example directory and run go build . For the sake of following along with the below commands however, I recommend that the following is run in the example directory:

    go build -o nerv .
Starting/ Stopping server instance

Start server at 9092 with specified grace shudown time and server process file.

    ./nerv -up -address 127.0.0.1:9092 -grace 8 -rti /tmp/my_rti.json

Immediatly kill and re-serve server (note: all registered senders and current messages will be dropped, meaning registered producers will require a re-registration if -filter is enabled.

    ./nerv -down -force -clean -up -address 127.0.0.1:9092 -rti /tmp/my_rti.json

'force' is required to ensure that the server will come back up immediatly without socket conflicts, which means 'clean' is also required for 'up' to work.

Same things as above, but with defaults:

    ./nerv -up
    ./nerv -down -force -clean -up
Posting Event
./nerv -emit -topic "some.topic" -prod "my.producer.id" -data "some fancy string data"
Optional HTTP request Auth

Handing the http server a function to use and callback-on when a submission request comes in will enable authentication. Any submission that comes in must include the Auth member of RequestEventSubmission found in modhttp/modhttp.go. The server and client do not care what form this authentication is, it simply forwards the data back to the user to see if its valid.

Using api token with CLI:

    ./nerv -emit -token "my-special-api-token" -topic "test" -prod "bosley" -data "hello, world!"

For examples of usage you can see main.go in cli/, or see modhttp/modhttp_test.go.

Routing

For some async use-cases its not required for there to be a managed lifetime (live and die with engine), and there is not always a need to have access to engine controls from something that sends/ receives events. Sometimes it is preferred to just have a mechanism that can deliver events somewhere in-app. This is what the route functionality of the engine is for.


package main

import (
  "fmt"
  "github.com/bosley/nerv-go"
)

func main() {

  engine := nerv.NewEngin()

  producer, err := engine.AddRoute("/some/route/name", func (c *nerv.Context) {

    fmt.Println("Received event generated at", c.Event.Spawned)
  })

  if err != nil {
    panic(err.Error())
  }

  if err := engine.Start(); err != nil {
    panic(err.Error())
  }

  // Now we can submit to that function at any time using `producer`

  producer("My data")

  time.Sleep(5 * time.Second)

  if err := engine.Shutdown(); err != nil {
    panic(err.Error())
  }
}

The example above doesn't shed much light on the usefulness here as its a bit of a contrived example. However, what we gain from this is that we can produce in a non-blocking way at any time from any thread without having to setup channels, workers, and controls, along with handing said structures around the application.

While this is not the foundational reason nerv was created, it is a neat, and potentially useful feature.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrEngineAlreadyRunning = errors.New("engine already running")
View Source
var ErrEngineDuplicateTopic = errors.New("duplicate topic")
View Source
var ErrEngineNotRunning = errors.New("engine not running")
View Source
var ErrEngineUnknownConsumer = errors.New("unknown consumer")
View Source
var ErrEngineUnknownModule = errors.New("unknown module")
View Source
var ErrEngineUnknownTopic = errors.New("unknown topic")
View Source
var ErrTopicNoSubscriberFound = errors.New("no subscriber found")

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	Id string
	Fn EventRecvr
}

The receiver of events, potentially subscribed to multiple topics

type Context added in v0.2.1

type Context struct {
	Event *Event
}

Context hands the event that has occurred along with a producer to publish back onto the engine. Since there is no connection directly to the sender, the state of the conversation must be saved to track state over time if so desired.

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

func NewEngine

func NewEngine() *Engine

func (*Engine) AddRoute added in v0.2.1

func (eng *Engine) AddRoute(topic string, route Route) (Producer, error)

A route is an abstraction over consumer/topic/producer to streamline the engine interaction for smaller and/or simpler use-cases than a module. Given the nature and purpose of Nerv, the producer handed back can be called from any thread at any time worry-free as long as the engine is running

func (*Engine) ContainsConsumer

func (eng *Engine) ContainsConsumer(id *string) bool

func (*Engine) ContainsTopic

func (eng *Engine) ContainsTopic(topic *string) bool

func (*Engine) CreateTopic

func (eng *Engine) CreateTopic(cfg *TopicCfg) error

func (*Engine) DeleteTopic

func (eng *Engine) DeleteTopic(topicId string)

func (*Engine) GetModuleMeta added in v0.2.0

func (eng *Engine) GetModuleMeta(name string) interface{}

func (*Engine) Register

func (eng *Engine) Register(sub Consumer)

func (*Engine) SetModuleMeta added in v0.2.0

func (eng *Engine) SetModuleMeta(name string, data interface{}) error

Store a piece of meta information for a module. This can help users track information about their own modules and permit inter-module communication of state

func (*Engine) Start

func (eng *Engine) Start() error

func (*Engine) Stop

func (eng *Engine) Stop() error

func (*Engine) Submit

func (eng *Engine) Submit(id string, topic string, data interface{}) error

func (*Engine) SubmitEvent

func (eng *Engine) SubmitEvent(event Event) error

func (*Engine) SubscribeTo

func (eng *Engine) SubscribeTo(topicId string, consumers ...string) error

Does not check for duplicate subscriptions

func (*Engine) UseModule

func (eng *Engine) UseModule(
	mod Module,
	topics []*TopicCfg)

func (*Engine) WithCallbacks

func (eng *Engine) WithCallbacks(cbs EngineCallbacks) *Engine

func (*Engine) WithTopics

func (eng *Engine) WithTopics(topics []*TopicCfg) *Engine

type EngineCallbacks

type EngineCallbacks struct {
	RegisterCb EventRecvr
	NewTopicCb EventRecvr
	ConsumeCb  EventRecvr
	SubmitCb   EventRecvr
}

type Event

type Event struct {
	Spawned  time.Time   `json:spawned`
	Topic    string      `json:topic`
	Producer string      `json:producer`
	Data     interface{} `json:data`
}

Event structure that is pushed through the event engine and delivered to the subscriber(s) of topics

type EventRecvr

type EventRecvr func(event *Event)

Something that receives a nerv event

type Module

type Module interface {
	GetName() string
	RecvModulePane(pane *ModulePane)
	Start() error
	Shutdown()
}

Interface used in nerv engine to manage modules loaded in by the user

type ModulePane added in v0.2.0

type ModulePane struct {

	// Retrieve whatever meta-data the users stored
	// with the module
	GetModuleMeta func(moduleName string) interface{}

	// Subscribe a set of consumers to a topic. If register is TRUE, then the
	// engine will be momentarily locked to ensure that the consumer is registered
	// as subscription requires a registered consumer. It is safe to always pass TRUE
	// but it map cause performance overhead if its called a lot as such
	SubscribeTo func(topic string, consumers []Consumer, register bool) error

	// Permits module to submit raw data as an event onto
	// its associated topics as its own producer, where consumers
	// registered to that function will recieve it
	SubmitTo func(topic string, data interface{})

	// Place an event onto the bus from the module that may or may
	// not go to consumers of the module. This function is useful
	// for fowarding events through a module without obfuscating
	// the original event
	SubmitEvent EventRecvr
}

Interface for module to take action without access to engine object

type Producer added in v0.2.1

type Producer func(data interface{}) error

Generalized "producer" that can be set to publish to an event system in different ways (remotely, locally, to multople topics, etc)

type Route added in v0.2.1

type Route func(c *Context)

A route is just a context receiver that can be handed around when a "Route" is added to an engine. These "routes" are a simple abstraction over the topic/producer/consumer module that makes it simple to leverage the eventing system in an application for smaller taskes without requiring the implementation of an entire module

type TopicCfg

type TopicCfg struct {
	Name          string
	DistType      int
	SelectionType int
}

func NewTopic

func NewTopic(name string) *TopicCfg

func (*TopicCfg) UsingArbitrary

func (t *TopicCfg) UsingArbitrary() *TopicCfg

func (*TopicCfg) UsingBroadcast

func (t *TopicCfg) UsingBroadcast() *TopicCfg

func (*TopicCfg) UsingDirect

func (t *TopicCfg) UsingDirect() *TopicCfg

func (*TopicCfg) UsingNoSelection

func (t *TopicCfg) UsingNoSelection() *TopicCfg

func (*TopicCfg) UsingRandomSelection

func (t *TopicCfg) UsingRandomSelection() *TopicCfg

func (*TopicCfg) UsingRoundRobinSelection

func (t *TopicCfg) UsingRoundRobinSelection() *TopicCfg

Directories

Path Synopsis
modules
modhttp Module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL