client

package
v0.2.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 11, 2024 License: BSD-2-Clause, Unlicense Imports: 25 Imported by: 0

README

core Go API

[TOC]

The Go implementation of the agent API should be considered the "reference" implementation. Our other implementations (python) seek to implement the same user-facing interfaces and pattern with language-specific concessions as needed.

The Agent API

Quick Summary

Derive your agent from a *client.BasicAgent, and fulfill the ManagedAgent interface.

type MyAgent struct {
  *client.BasicAgent
}

func (m *MyAgent) Setup() error {
  // Implement your agent setup here
  // Make any calls to ListenFor here
  return nil
}

func (m *MyAgent) Loop() (bool, error) {
  // Implement your agent's data processing here
  return true, nil //return true if you want to continue looping
  //return false, nil //return false if you want to stop loop
}
// Essential agent API for BB communication
func (a *MyAgent) Post(metadata []byte, data []byte, tags string...) error
func (a *MyAgent) Reply(post *client.Post, metadata []byte, data []byte, tags string...) error
func (a *MyAgent) ListenFor(tags...) <-chan *Post
func (a *MyAgent) Log(severity core.Log_Severity, msg string)
// Additional API provided by BasicAgent
func (a *MyAgent) Trace(post *client.Post) ([]*client.Post, error)
// Run your agent (must fulfill the managed agent interface)
my_agent := &MyAgent{
  BasicAgent: client.GetDefaultAgent(blackboard_address)
}
err := client.RunUntilComplete(my_agent)
if err != nil {
  log.Fatal(err)
}
Detailed Explanation
Importing the client package

We suggest the following for importing the client package:

import client "gitlab.com/hoffman-lab/core/cmd/client-apis/go"`
The BasicAgent type

To make an agent, embed a pointer to a BasicAgent into your structure.

type MyAgent struct {
  *client.BasicAgent
}

BasicAgent provides the essential components for an agent including but not limited to posting, replying, and receiving data from the blackboard. You are not required to utilize BasicAgent to communicate with the blackboard however we cannot provide support at this point for those more advanced use cases.

Agent lifecycle

BasicAgent actually runs several processes concurrently, which can leave dangling goroutines and unfreed memory if we're not careful. As a result, we have implemented an agent lifecycle that can handle this cleanup for you.

Utilize the lifecycle management

To utilize the lifecycle, you must implement two functions.

These allow your agent to fulfill the ManagedAgent interface.

func (a *MyAgent) Setup() error {
  return nil
}

func (a *MyAgent) Loop() (bool, error) {
  return true, nil
}

(This pattern is inspired by the Arduino lifecycle)

Setup

Setup is executed once, prior to any communication with the blackboard. Do not call any blackboard communication methods inside of setup. Although they will succeed, this interferes with proper message delivery using the ListenFor method we provide.

Any agent initialization you'd like to perform prior to communicating with the blackboard should be performed here. Additionally, any calls to ListenFor should be made in Setup.

Returning an error from Setup will prevent the agent from starting.

Loop

Loop will be executed infinitely many times until it is told to stop by returning false. Persistence can be achieved by always returning true. Returning an error in the second return parameter from a Loop does not itself cause the agent to stop executing.

Inside of the loop is where your agent should listen for messages, process data, and communicate with the blackboard.

Run your agent using our lifecycle management

An agent that has properly implemented a Setup and Loop method can then be managed using the RunUntilComplete function.

my_agent := &MyAgent{
  BasicAgent: client.GetDefaultAgent(bb_address),
}
err := client.RunUntilComplete(my_agent)
if err != nil {
  log.Fatal("agent returned error:", err)
}

The GetDefaultAgent function returns a properly initialized BasicAgent ready to connect to the blackboard running at bb_address.

Blackboard Communication

The core agent API is fundamentally three methods: Post, Reply, and ListenFor. These three methods provide all key functionality for multiagent systems.

Post

Analogous to a real blackboard, post just means "put something on the blackboard".

tags := []string{"test-message"}
err := my_agent.Post([]byte("some metadata"), []byte("some data data"), tags...)
Tagging

This version of core changes how messages are exchanged between agents.

We have ultimately done away with attributes and promises in favor of a tagging system.

Agents post their messages with a set of tags and other agents listen for a set of tags. Each matching message will surface into the appropriate ListenFor channel. (There are deep and obvious similarities here to traditional pub/sub architectures)

Messages will match if the incoming message contains at least the tags provided to ListenFor. E.g.,

ListenFor("test") matches Post(md, d, "test")
ListenFor("test") also matches Post(md, d, "test", "tag2")

ListenFor("test", "tag2") matches Post(md, d, "test", "tag2")
ListenFor("test", "tag2") does **not** match Post(md, d, "test")

This means that without care in tagging there can easily be agent "cross talk" particularly with multiple agents performing the same or similar tasks. This is intentional and by design and places a great deal of control and flexibility in the hands of the user although new users may find it challenging. Please utilize our issues page for feedback and assistance with the new tagging system if it is unclear, or some additional instruction, guidance, or debugging help is required.

ListenFor

Many agents will want to receive some data from the blackboard to begin executing. ListenFor will provide you with a go channel on which messages with matching tags will arrive. These can then be utilized as normal go channels to automatically receive data from the blackboard.

IMPORTANT: To ensure proper behavior, ListenFor must be called in Setup, before any messages have been sent to the blackboard.

Here is an example from our ping pong agents:

type PingAgent2 struct {
	prime_bb sync.Once // bootstraps the ping/pong routine by sending a ping message once
	pong_ch  <-chan *client.Post
	*client.BasicAgent
}

func (a *PingAgent2) Setup() error {
	a.pong_ch = a.ListenFor([]string{"pong-message"})
	return a.sendPing()
}

func (a *PingAgent2) Loop() (cont bool, err error) {
	select {
	case post := <-a.pong_ch:

		// channel has been closed externally
		if post == nil {
			return false, nil
		}

		if err := a.sendPing(); err != nil {
			return false, err
		}
	case <-time.After(10 * time.Second):
		a.Log(core.Log_Info, "ping agent heartbeat")
	}
	return true, nil
}
Reply

ListenFor returns a Post object. The post has metadata and data that can be used as desired, but importantly, the post object can be used to begin a "chain" of posts using the reply mechanism. A reply actually just shows up on the blackboard as a post, but you'll notice that it has one or more messages listed in the "replying_to" field. It also creates very logical and light links between messages on the blackboard that enable techniques such as tracing.

Here is an example from our Pong agent:

// Pong listens for Pings and replies with pongs
func (a *PongAgent) Loop() (cont bool, err error) {
	select {
	case post := <-a.ping_ch:

		if post == nil {
			return false, nil
		}

		time.Sleep(a.delay_ms)
		err := a.Reply([]*client.Post{post}, nil, nil, "pong-message")
		if err != nil {
			return false, err
		}
	case <-time.After(10 * time.Second):
		a.Log(core.Log_Info, "pong agent heartbeat")
	}

	return true, nil
}
Tracing
func (a *BasicAgent) Trace(post *Post) ([]*Post, error) {

Tracing in a core blackboard allows an agent to retrieve messages via a chain of replies; effectively, we walk the "linked list" of messages and give them back to the tracer.

Here is an example from our (TODO) agent:

// TODO: this is an agent example that we should implement!

Tracing is not part of the core agent interface, however is a special feature of the core base agent.

NOTE: Tracing is a method that exists slightly outside of the core agent API and is a service provided by the core blackboard (some of the secret sauce of our implementation we hope). In fact, although we have placed it into Blackboard interface, certain blackboards implementations may wish to not provide or return useful tracing information.

Logging

Logging is, strictly speaking, not a necessary part of the core protocol, however logging is so ubiquitously useful during development, execution, and monitoring that we'd be hard pressed not to include it. Defining this message type allows us to separate messages meant for agents (posts) which may or may not be human readable, from posts that should be human-readable (logs).

func (a *BasicAgent) Log(severity core.LogSeverity, message string) ([]*Post, error) {

The default severity everyone should use is core.Log_Info. This is a bread and butter log message.

We encourage the following guidelines for the other severities:

core.Log_Debug:    should not appear during normal logging
core.Log_Info:     normal logging
core.Log_Warning:  possible with errors, but can continue
core.Log_Error:    normal operation is not occuring
core.Log_Critical: 🔥🔥🔥 (possibly trigger a halt and catch fire?)
Hello and Goodbye

If using the managed lifecycle, you don't need to worry about sending Hello and Goodbye messages. If you're implementing a more "raw" agent, these messages indicate an agent's first and last connection to the blackboard, and are one way to trigger the concurrent loops necessary to handle message delivery.

We recommend using the managed lifecycle, unless you've got a lot of time to debug.

If you're dead set on skipping the lifecycle, check out agent_pipe.go for an example of how you can do the management yourself.

Examples

We have implemented and will continue to add agents to the agents directory. These are excellent references for how we have implemented and understand our own API.

We recommend your start with looking at our ping pong agents: ping.go, pong.go

These can be executed against a blackboard using

core start server # start the blackboard
core start ping   # start the ping agent
core start pong   # start the pong agent

More advanced examples can be found in the network benchmark, and agent_pipe.go

Please check out the "basic agent" implementation if you need manage agent behavior using even lower-level API elements (out of scope for this documentation).

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	DefaultAgent = &BasicAgent{
		done:      make(chan struct{}),
		AgentSpec: &core.AgentSpec{},
		BlackboardTransporter: &BlackboardHTTPClient{
			Client: &http.Client{},
		},
	}
)

The default agent is an http agent

Functions

func CompressBytes

func CompressBytes(data []byte) ([]byte, error)

CompressBytes performs zlib compression on a byte array.

func DecompressBytes

func DecompressBytes(data []byte) ([]byte, error)

DecompressBytes decompresses an incoming byte array with zlib compression.

func DeserializeBytesToFile

func DeserializeBytesToFile(data []byte, save_path string) error

DeserializeBytesToFile writes an incoming byte array to a file.

func DeserializeBytesToFileCompressed

func DeserializeBytesToFileCompressed(data_comp []byte, save_path string) error

DeserializeBytesToFileCompressed writes an incoming byte array with zlib compression to a file.

func DeserializeBytesToGonum

func DeserializeBytesToGonum(data []byte) (*mat.Dense, error)

DeserializeBytesToGonum writes an incoming byte array to a Gonum dense matrix.

func DeserializeBytesToGonumCompressed

func DeserializeBytesToGonumCompressed(data []byte) (*mat.Dense, error)

DeserializeBytesToGonumCompressed writes an incoming zlib-compressed byte array to a Gonum dense matrix.

func DeserializeToBinaryBase64

func DeserializeToBinaryBase64(data_encoded string) ([]byte, error)

DeserializeToBinary encodes a data string to bytes

func DeserializeToFileBase64

func DeserializeToFileBase64(data_encoded string, save_path string) error

DeserializeToBinary encodes a data string to a binary file

func GonumToNpy

func GonumToNpy(array *mat.Dense, save_path string) error

GonumToNpy converts a Gonum dense matrix to a numpy array npy file with underlying dtype float64.

func GonumToNumpyBytesIO

func GonumToNumpyBytesIO(array *mat.Dense) ([]byte, error)

GonumToNumpyBytesIO converts a Gonum dense matrix to a numpy array with underlying dtype float64 encoded as a byte array.

func NpyToGonum

func NpyToGonum(path string) (*mat.Dense, error)

NpyToGonum converts a .npy file to a Gonum dense matrix.

func NumpyBytesIOtoGonum

func NumpyBytesIOtoGonum(data []byte) (*mat.Dense, error)

NumpyBytesIOtoGonum converts a serialized Numpy array (as read from a BytesIO Python object) to a Gonum dense matrix.

NOTE: The underlying dtype of the numpy array must be float64 to be compatible with Gonum as a dense matrix. Users must keep this in mind if they want Python and Go agents to communicate matrix data directly.

func PackContext

func PackContext(path string) ([]byte, error)

PackContext tars the contents of a directory.

func ResolveCoreData

func ResolveCoreData(msg *core.CoreData) ([]byte, error)

ResolveCoreData extracts data from a core.CoreData object.

I don't foresee users needing to call this directly, but I also don't see a good reason to keep it private? That being said, it's not intended to be a well-maintained part of the public interface.

func RunUntilComplete

func RunUntilComplete(agt ManagedAgent) error

RunUntilComplete runs the Setup and Loop functions for a ManagedAgent until the agent terminates.

func SerializeFileToBytes

func SerializeFileToBytes(path string) ([]byte, error)

SerializeFileToBytes serializes a specified file to a byte array.

func SerializeFileToBytesCompressed

func SerializeFileToBytesCompressed(path string) ([]byte, error)

SerializeFileToBytesCompressed performs zlib compression on a specified file and serializes to a byte array.

func SerializeFromBinaryBase64

func SerializeFromBinaryBase64(data []byte) string

SerializeFromBinary encodes a byte slice with base64 serialization.

func SerializeFromFileBase64

func SerializeFromFileBase64(filepath string) (string, error)

SerializeFromFile encodes a file with base64 serialization.

func SerializeGonumToBytes

func SerializeGonumToBytes(array *mat.Dense) ([]byte, error)

SerializeGonumToBytes wraps mat.Dense.MarshalBinary.

func SerializeGonumToBytesCompressed

func SerializeGonumToBytesCompressed(array *mat.Dense) ([]byte, error)

SerializeGonumToBytes performs zlib compression on a mat.Dense object and serializes to bytes.

func UnpackContext

func UnpackContext(root_dir string, archived_context []byte) error

Unpacks the contents of an archive into root_dir. root_dir is created if it does not already exist.

Types

type Agent

type Agent interface {
	Hello() error
	Goodbye() error
	Post(metadata []byte, data []byte, tags ...string) error
	Reply(posts []*Post, metadata []byte, data []byte, tags ...string) error
	Log(severity core.Log_Severity, message string) error
	Done() <-chan struct{}
}

Agent provides the underlying framework for easy user interaction with core. It automatically starts background processes to communicate with blackboard.

type BasicAgent

type BasicAgent struct {
	*core.AgentSpec
	BlackboardTransporter

	OnNewMessage func(msg *core.Message)
	StartIdx     int
	// contains filtered or unexported fields
}

The BasicAgent type implements Agent but not ManagedAgent. User-defined agents embedding BasicAgent are expected to satisfy the ManagedAgent interface by implementing their own Setup and Loop functions.

func GetDefaultAgent

func GetDefaultAgent(addr string) *BasicAgent

GetDefaultAgent returns a BasicAgent with default initialization.

func (*BasicAgent) Done

func (a *BasicAgent) Done() <-chan struct{}

Done encapsulates the a.done channel for use in select statements.

func (*BasicAgent) Goodbye

func (a *BasicAgent) Goodbye() error

func (*BasicAgent) Hello

func (a *BasicAgent) Hello() error

func (*BasicAgent) ListenFor

func (a *BasicAgent) ListenFor(tags []string) <-chan *Post

ListenFor returns a channel on which users can receive messages matching a provided set of tags.

This channel should function as any other Go channel can/should be used in a Go select statements. The current implementation will surface multiple copies of the same message if it matches multiple filters. If different behavior is desired in the future, implementation can be customized in filter_chain.go.

For deterministic ListenFor behavior, calls must be prior to ANY other API calls that invoke the network. TODO: Add a warning if this occurs

func (*BasicAgent) Log

func (a *BasicAgent) Log(severity core.Log_Severity, message string) error

func (*BasicAgent) Post

func (a *BasicAgent) Post(metadata []byte, data []byte, tags ...string) error

Fewer rules and requirements to send data to the BB

func (*BasicAgent) Reply

func (a *BasicAgent) Reply(posts []*Post, metadata, data []byte, tags ...string) error

Reply populates the ReplyingTo field of an outgoing Post message. It is best used alongside Trace.

func (*BasicAgent) SetName

func (a *BasicAgent) SetName(name string)

SetName sets the name field of the AgentSpec.

func (*BasicAgent) Start

func (a *BasicAgent) Start()

Start may be the most complex function in the whole project. It is the core agent event loop responsible for surfacing tagged posts to awaiting agents.

func (*BasicAgent) Stop

func (a *BasicAgent) Stop()

Stop closes the agent's done channel.

func (*BasicAgent) Trace

func (a *BasicAgent) Trace(post *Post) ([]*Post, error)

type BlackboardHTTPClient

type BlackboardHTTPClient struct {
	*http.Client
	// contains filtered or unexported fields
}

BlackboardHTTPClient is the primary agent-facing interface for the Blackboard system, and is the default HTTP-based implementation of the BlackboardTransporter interface. It can be thought of as the "opposite side" of apis.BlackboardHTTPService.

func (*BlackboardHTTPClient) Len

func (c *BlackboardHTTPClient) Len() int

Get the current length of the blackboard. Efficient agent joining requires this.

func (*BlackboardHTTPClient) Read

func (c *BlackboardHTTPClient) Read(msg_idx int) (*core.Message, error)

Read reads a single message. Necessary for tracing.

func (*BlackboardHTTPClient) Slice

func (c *BlackboardHTTPClient) Slice(start_idx, end_idx int) ([]*core.Message, error)

Slice accesses multiple messages on the blackboard. TODO: add the docs for slicing patterns here

func (*BlackboardHTTPClient) Trace

func (c *BlackboardHTTPClient) Trace(msg_idx int, depth int) ([]*core.Message, error)

Trace traces a message, returning all of the traced messages

func (*BlackboardHTTPClient) Write

func (c *BlackboardHTTPClient) Write(msg *core.Message) error

Write posts a message to the Blackboard.

type BlackboardTransporter

type BlackboardTransporter interface {
	Read(msg_idx int) (*core.Message, error)
	Write(msg *core.Message) error
	Slice(start_idx, end_idx int) ([]*core.Message, error)
	Trace(msg_idx int, depth int) ([]*core.Message, error)
	Len() int // WARN: This should also return an error. I'm not 100% sure why it doesn't...
}

This is "duplicated" code (it's just the blackboard interface) but we need to avoid the import cycle. And the function here is different (allowing our agents to utilize a standard interface to different transport code.)

type Filter

type Filter interface {
	Filter(msg *core.Message) error
}

Filters and FilterChains process incoming channel messages and surface them based on their specific filter implementation. I think, subconsciously, this ideas is loosely based on firewall filters/rules. NOTE: Might be a better name for this stuff

type FilterChain

type FilterChain struct {
	// contains filtered or unexported fields
}

A filter chain provides a convenient way to surface a message accross multiple potential matches. NOTE: FilterChain should probably become an interface sooner rather than later to allow for different implementations of chain behavior.

func NewFilterChain

func NewFilterChain() *FilterChain

NewFilterChain is the constructor for the FilterChain type.

func (*FilterChain) Cleanup

func (f *FilterChain) Cleanup()

Cleanup closes the channel of each filter in the chain.

func (*FilterChain) Insert

func (f *FilterChain) Insert(filt Filter)

Insert wraps append, acting as a set method for the `filters` field.

func (*FilterChain) ListenAndFilter

func (f *FilterChain) ListenAndFilter(done_ch chan struct{}, incoming chan *core.Message)

ListenAndFilter filters an incoming message based on the filters in the chain.

type ManagedAgent

type ManagedAgent interface {
	Agent
	Setup() error
	Loop() (continue_loop bool, err error)
	SetName(name string)
}

ManagedAgent is an interface enforcing the agent-defined Setup and Loop methods and SetName (should be handled by the executable).

Free the agents: keep this interface as narrow as possible

type Post

type Post struct {
	Tags     []string
	Metadata []byte
	Data     []byte
	// contains filtered or unexported fields
}

Post is a user-facing "Post" type that abstracts the fields of core.Post

func PostFromPostMessage

func PostFromPostMessage(msg_envelope *core.Message) (*Post, error)

PostFromPostMessage converts a core.Post message to the user-facing Post type.

Ditto for this one about the public facing interface and all We need to surface the potential http errors to troubleshoot if object store is unavailable.

func (*Post) Hash

func (p *Post) Hash() string

Hash produces an md5-based hash by concatenating the md5 checksum of the metadata, data, and tags of the Post object. At present, this has no relation to the "hash" stdlib.

type TagFilter

type TagFilter struct {
	TagSet
	// contains filtered or unexported fields
}

A TagFilter filters messages using the TagSet's Match function.

func (*TagFilter) Filter

func (f *TagFilter) Filter(msg *core.Message) error

Filter filters post messages based on a tagset.

type TagSet

type TagSet map[string]struct{}

TagSet is a type alias for map[string]struct{}, which essentially mimics a set of tag string values.

func NewTagSet

func NewTagSet(tags []string) TagSet

NewTagSet generates a TagSet from a slice. This will have the effect of removing any duplicate tags from the incoming slice.

func (TagSet) Matches

func (t TagSet) Matches(incoming TagSet) bool

In general, we don't care what "Matches" actually means that much, just that it is definedby this function. However, in THIS implementation, it means the receiver tag set is a subset of the incoming tag set. E.g., if I am ListenFor(TagSet{"lung", "segmentation"}), I would return any message that contains *at least* these two tags. Other implementations may have stronger (or weaker) notions of a match.

func (TagSet) String

func (t TagSet) String() string

Fulfill the stringer interface

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL