messaging

package
v0.0.0-...-a3b39dc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 2, 2024 License: BSD-3-Clause Imports: 10 Imported by: 21

Documentation

Index

Examples

Constants

View Source
const (
	EmissaryChannel = "emissary"
	MasterChannel   = "master"
	PrimaryChannel  = "primary"
)
View Source
const (
	StartupEvent           = "event:startup"
	ShutdownEvent          = "event:shutdown"
	RestartEvent           = "event:restart"
	ProcessEvent           = "event:process"
	HostStartupEvent       = "event:host-startup"
	PingEvent              = "event:ping"
	ReconfigureEvent       = "event:reconfigure"
	ChangesetApplyEvent    = "event:changeset-apply"
	ChangesetRollbackEvent = "event:changeset-rollback"
	DataEvent              = "event:data"
	StatusEvent            = "event:status"
	DataChangeEvent        = "event:data-change"
	ObservationEvent       = "event:observation"
	TickEvent              = "event:tick"

	PauseEvent  = "event:pause"  // disable data channel receive
	ResumeEvent = "event:resume" // enable data channel receive

	ContentType        = "Content-Type"
	XRelatesTo         = "x-relates-to"
	XMessageId         = "x-message-id"
	XTo                = "x-to"
	XFrom              = "x-from"
	XEvent             = "x-event"
	XChannel           = "x-channel"
	XAgentId           = "x-agent-id"
	XForwardTo         = "x-forward-to"
	ContentTypeStatus  = "application/status"
	ContentTypeConfig  = "application/config"
	DataChannelType    = "DATA"
	ControlChannelType = "CTRL"
)
View Source
const (
	ChannelSize = 16
)
View Source
const (
	PkgPath = "github/advanced-go/stdlib/messaging"
)
View Source
const (
	PrimaryTicker = "PRIMARY"
)

Variables

View Source
var (
	LogErrorNotifier    = new(logError)
	OutputErrorNotifier = new(outputError)
)
View Source
var (
	DefaultTracer = new(defaultTracer)
)

Functions

func AddShutdown

func AddShutdown(curr, next func()) func()

func DrainAndClose

func DrainAndClose(duration time.Duration, c chan *Message)

func EventErrorStatus

func EventErrorStatus(agentId string, msg *Message) *core.Status

func IsFinalized

func IsFinalized(attempts int, sleep time.Duration, finalized func() bool) bool

func MessageContentTypeErrorStatus

func MessageContentTypeErrorStatus(agentId string, msg *Message) *core.Status

func Ping

func Ping(ex *Exchange, uri any) *core.Status

Ping - function to "ping" an agent

func Receiver

func Receiver(interval time.Duration, reply <-chan *Message, result chan<- *core.Status, done DoneFunc)

Receiver - receives reply messages and forwards to a function which will return true if the receiving is complete. The interval bounds the time spent receiving, and result status is sent on the status channel.

func SendReply

func SendReply(msg *Message, status *core.Status)

SendReply - function used by message recipient to reply with a Status

Types

type Agent

type Agent interface {
	Mailbox
	Finalizer
	Run()
	Shutdown()
}

Agent - intelligent agent TODO : Track agent assignment as part of the URI or separate identifier?? //Uri() string

//Message(m *Message)
Track agent NID or class/type?

func AgentCast

func AgentCast(agent any) Agent

func NewControlAgent

func NewControlAgent(uri string, handler Handler) (Agent, error)

NewControlAgent - create an agent that only listens on a control channel, and has a default AgentRun func

Example
package main

import (
	"fmt"
	"time"
)

func newAgentCtrlHandler(msg *Message) {
	fmt.Printf(fmt.Sprintf("test: NewControlAgent_CtrlHandler() -> %v\n", msg.Event()))
}

func main() {
	defer func() {
		if r := recover(); r != nil {
			fmt.Printf("test: NewControlAgent() -> [recovered:%v]\n", r)
		}
	}()
	//ex := NewExchange() //any(NewDirectory()).(*directory)
	//c := make(chan Message, 16)
	uri := "github.com/advanced-go/example-domain/activity"

	a, error0 := NewControlAgent(uri, newAgentCtrlHandler)
	if error0 != nil {
		fmt.Printf("test: NewControlAgent() -> [err:%v]\n", error0)
	}
	//status = a.Register(agentDir)
	//if !status.OK() {
	//	fmt.Printf("test: Register() -> [status:%v]\n", status)
	//}
	// 1 -10 Nanoseconds works for a direct send to a channel, sending via an controller2 needs a longer sleep time
	//d := time.Nanosecond * 10
	// Needed time.Nanoseconds * 50 for directory send with mutex
	// Needed time.Nanoseconds * 1 for directory send via sync.Map
	d := time.Nanosecond * 1
	a.Run()
	a.Message(NewControlMessage(uri, "", StartupEvent))
	//c <- Message{To: "", From: "", Event: core.StartupEvent, RelatesTo: "", Status: nil, Content: nil, ReplyTo: nil}
	time.Sleep(d)
	a.Message(NewControlMessage(uri, "", PauseEvent))
	//c <- Message{To: "", From: "", Event: core.PauseEvent, RelatesTo: "", Status: nil, Content: nil, ReplyTo: nil}
	time.Sleep(d)
	a.Message(NewControlMessage(uri, "", ResumeEvent))
	//c <- Message{To: "", From: "", Event: core.ResumeEvent, RelatesTo: "", Status: nil, Content: nil, ReplyTo: nil}
	time.Sleep(d)
	a.Message(NewControlMessage(uri, "", PingEvent))
	//c <- Message{To: "", From: "", Event: core.PingEvent, RelatesTo: "", Status: nil, Content: nil, ReplyTo: nil}
	time.Sleep(d)
	a.Message(NewControlMessage(uri, "", ReconfigureEvent))
	//c <- Message{To: "", From: "", Event: core.ReconfigureEvent, RelatesTo: "", Status: nil, Content: nil, ReplyTo: nil}
	time.Sleep(d)
	a.Shutdown() //.SendCtrl(Message{To: uri, From: "", Event: core.ShutdownEvent})
	//c <- Message{To: "", From: "", Event: core.ShutdownEvent, RelatesTo: "", Status: nil, Content: nil, ReplyTo: nil}
	time.Sleep(time.Millisecond * 100)

	a.Shutdown()
	// will panic
	//c <- Message{}

}
Output:

test: NewControlAgent_CtrlHandler() -> event:startup
test: NewControlAgent_CtrlHandler() -> event:pause
test: NewControlAgent_CtrlHandler() -> event:resume
test: NewControlAgent_CtrlHandler() -> event:ping
test: NewControlAgent_CtrlHandler() -> event:reconfigure
test: NewControlAgent_CtrlHandler() -> event:shutdown

type Cache

type Cache struct {
	// contains filtered or unexported fields
}

Cache - message cache by uri

func NewCache

func NewCache() *Cache

NewCache - create a message cache

func (*Cache) Add

func (r *Cache) Add(msg *Message) error

Add - add a message

Example
resp := NewCache()

resp.Add(NewMessageWithStatus(channelNone, "to-uri", "from-uri-0", StartupEvent, core.NewStatus(StatusNotProvided)))
resp.Add(NewMessageWithStatus(channelNone, "to-uri", "from-uri-1", StartupEvent, core.StatusOK()))
resp.Add(NewMessageWithStatus(channelNone, "to-uri", "from-uri-2", PingEvent, core.NewStatus(StatusNotProvided)))
resp.Add(NewMessageWithStatus(channelNone, "to-uri", "from-uri-3", PingEvent, core.NewStatus(StatusNotProvided)))
resp.Add(NewMessageWithStatus(channelNone, "to-uri", "from-uri-4", PingEvent, core.StatusOK()))

fmt.Printf("test: count() -> : %v\n", resp.Count())

m, ok := resp.Get("invalid")
fmt.Printf("test: Get(%v) -> : [ok:%v] [msg-nil:%v]\n", "invalid", ok, m == nil)

m, ok = resp.Get("from-uri-3")
fmt.Printf("test: Get(%v) -> : [ok:%v] [msg-to:%v]\n", "from-uri-3", ok, len(m.To()) > 0)

fmt.Printf("test: include(%v,%v) -> : %v\n", ShutdownEvent, StatusNotProvided, resp.Include(ShutdownEvent, StatusNotProvided))
fmt.Printf("test: exclude(%v,%v) -> : %v\n", ShutdownEvent, StatusNotProvided, resp.Exclude(ShutdownEvent, StatusNotProvided))

fmt.Printf("test: include(%v,%v) -> : %v\n", StartupEvent, StatusNotProvided, resp.Include(StartupEvent, StatusNotProvided))
fmt.Printf("test: exclude(%v,%v) -> : %v\n", StartupEvent, StatusNotProvided, resp.Exclude(StartupEvent, StatusNotProvided))

fmt.Printf("test: include(%v,%v) -> : %v\n", PingEvent, StatusNotProvided, resp.Include(PingEvent, StatusNotProvided))
fmt.Printf("test: exclude(%v,%v) -> : %v\n", PingEvent, StatusNotProvided, resp.Exclude(PingEvent, StatusNotProvided))
Output:

test: count() -> : 5
test: Get(invalid) -> : [ok:false] [msg-nil:true]
test: Get(from-uri-3) -> : [ok:true] [msg-to:true]
test: include(event:shutdown,95) -> : []
test: exclude(event:shutdown,95) -> : [from-uri-0 from-uri-1 from-uri-2 from-uri-3 from-uri-4]
test: include(event:startup,95) -> : [from-uri-0]
test: exclude(event:startup,95) -> : [from-uri-1 from-uri-2 from-uri-3 from-uri-4]
test: include(event:ping,95) -> : [from-uri-2 from-uri-3]
test: exclude(event:ping,95) -> : [from-uri-0 from-uri-1 from-uri-4]

func (*Cache) Count

func (r *Cache) Count() int

Count - return the count of items

func (*Cache) ErrorList

func (r *Cache) ErrorList() []error

ErrorList - list of errors

func (*Cache) Exclude

func (r *Cache) Exclude(event string, status int) []string

Exclude - filter for items that do not include a specific event

func (*Cache) Filter

func (r *Cache) Filter(event string, code int, include bool) []string

Filter - apply a filter against a traversal of all items

func (*Cache) Get

func (r *Cache) Get(uri string) (*Message, bool)

Get - get a message based on a URI

func (*Cache) Include

func (r *Cache) Include(event string, status int) []string

Include - filter for items that include a specific event

func (*Cache) Uri

func (r *Cache) Uri() []string

Uri - list the URI's in the cache

Example
resp := NewCache()

resp.Add(NewMessageWithStatus(channelNone, "to-uri", "from-uri-0", StartupEvent, core.NewStatus(StatusNotProvided)))
resp.Add(NewMessageWithStatus(channelNone, "to-uri", "from-uri-1", StartupEvent, core.StatusOK()))
resp.Add(NewMessageWithStatus(channelNone, "to-uri", "from-uri-2", PingEvent, core.NewStatus(StatusNotProvided)))
resp.Add(NewMessageWithStatus(channelNone, "to-uri", "from-uri-3", PingEvent, core.NewStatus(StatusNotProvided)))
resp.Add(NewMessageWithStatus(channelNone, "to-uri", "from-uri-4", PingEvent, core.StatusOK()))

fmt.Printf("test: count() -> : %v\n", resp.Count())

m, ok := resp.Get("invalid")
fmt.Printf("test: Get(%v) -> : [ok:%v] [msg-nil:%v]\n", "invalid", ok, m == nil)

m, ok = resp.Get("from-uri-3")
fmt.Printf("test: Get(%v) -> : [ok:%v] [msg-to:%v]\n", "from-uri-3", ok, m.To())

fmt.Printf("test: include(%v,%v) -> : %v\n", ShutdownEvent, StatusNotProvided, resp.Include(ShutdownEvent, StatusNotProvided))
fmt.Printf("test: exclude(%v,%v) -> : %v\n", ShutdownEvent, StatusNotProvided, resp.Exclude(ShutdownEvent, StatusNotProvided))

fmt.Printf("test: include(%v,%v) -> : %v\n", StartupEvent, StatusNotProvided, resp.Include(StartupEvent, StatusNotProvided))
fmt.Printf("test: exclude(%v,%v) -> : %v\n", StartupEvent, StatusNotProvided, resp.Exclude(StartupEvent, StatusNotProvided))

fmt.Printf("test: include(%v,%v) -> : %v\n", PingEvent, StatusNotProvided, resp.Include(PingEvent, StatusNotProvided))
fmt.Printf("test: exclude(%v,%v) -> : %v\n", PingEvent, StatusNotProvided, resp.Exclude(PingEvent, StatusNotProvided))
Output:

test: count() -> : 5
test: Get(invalid) -> : [ok:false] [msg-nil:true]
test: Get(from-uri-3) -> : [ok:true] [msg-to:to-uri]
test: include(event:shutdown,95) -> : []
test: exclude(event:shutdown,95) -> : [from-uri-0 from-uri-1 from-uri-2 from-uri-3 from-uri-4]
test: include(event:startup,95) -> : [from-uri-0]
test: exclude(event:startup,95) -> : [from-uri-1 from-uri-2 from-uri-3 from-uri-4]
test: include(event:ping,95) -> : [from-uri-2 from-uri-3]
test: exclude(event:ping,95) -> : [from-uri-0 from-uri-1 from-uri-4]

type Channel

type Channel struct {
	C chan *Message
	// contains filtered or unexported fields
}

func NewChannel

func NewChannel(name string, enabled bool) *Channel
Example
c := NewChannel("test", false)

fmt.Printf("test: NewChannel() -> [name:%v]\n", c)

fmt.Printf("test: NewChannel() -> [enabled:%v]\n", c.IsEnabled())

c.Enable()
fmt.Printf("test: NewChannel_Enable()  -> [enabled:%v]\n", c.IsEnabled())

c.Disable()
fmt.Printf("test: NewChannel_Disable() -> [enabled:%v]\n", c.IsEnabled())

c.Close()
fmt.Printf("test: NewChannel_Close()   -> [closed:%v]\n", c.C == nil)
Output:

test: NewChannel() -> [name:test]
test: NewChannel() -> [enabled:false]
test: NewChannel_Enable()  -> [enabled:true]
test: NewChannel_Disable() -> [enabled:false]
test: NewChannel_Close()   -> [closed:true]

func NewEmissaryChannel

func NewEmissaryChannel(enabled bool) *Channel

func NewMasterChannel

func NewMasterChannel(enabled bool) *Channel

func NewPrimaryChannel

func NewPrimaryChannel(enabled bool) *Channel

func (*Channel) Close

func (c *Channel) Close()

func (*Channel) Disable

func (c *Channel) Disable()

func (*Channel) Enable

func (c *Channel) Enable()

func (*Channel) IsClosed

func (c *Channel) IsClosed() bool

func (*Channel) IsEnabled

func (c *Channel) IsEnabled() bool

func (*Channel) IsFinalized

func (c *Channel) IsFinalized() bool

func (*Channel) Name

func (c *Channel) Name() string

func (*Channel) Send

func (c *Channel) Send(m *Message)

func (*Channel) String

func (c *Channel) String() string

type DoneFunc

type DoneFunc func(msg *Message) bool

type Exchange

type Exchange struct {
	// contains filtered or unexported fields
}

Exchange - controller2 directory

func NewExchange

func NewExchange() *Exchange

NewExchange - create a new controller2

func (*Exchange) Broadcast

func (e *Exchange) Broadcast(msg *Message)

Broadcast - broadcast a message to all entries, deleting the entry if the message event is Shutdown

func (*Exchange) Count

func (e *Exchange) Count() int

Count - number of agents

func (*Exchange) Get

func (e *Exchange) Get(uri string) Agent

Get - find an agent

func (*Exchange) GetMailbox

func (e *Exchange) GetMailbox(uri string) Mailbox

GetMailbox - find a mailbox

func (*Exchange) IsFinalized

func (e *Exchange) IsFinalized() bool

IsFinalized - determine if all agents have been shutdown and removed from the exchange

func (*Exchange) List

func (e *Exchange) List() []string

List - a list of agent uri's

func (*Exchange) Register

func (e *Exchange) Register(agent Agent) error

Register - register an agent

func (*Exchange) RegisterMailbox

func (e *Exchange) RegisterMailbox(m Mailbox) error

RegisterMailbox - register a mailbox

func (*Exchange) Send

func (e *Exchange) Send(msg *Message) error

Send - send a message

func (*Exchange) Shutdown

func (e *Exchange) Shutdown()

Shutdown - shutdown all agents

type Finalizer

type Finalizer interface {
	IsFinalized() bool
}

type Handler

type Handler func(msg *Message)

Handler - uniform interface for message handling

func NewCacheHandler

func NewCacheHandler(cache *Cache) Handler

NewCacheHandler - handler to receive messages into a cache.

func NewReceiverReplyTo

func NewReceiverReplyTo(reply chan *Message) Handler

type Mailbox

type Mailbox interface {
	Uri() string
	Message(m *Message)
}

type Map

type Map map[string]*Message

Map - map of messages

type Message

type Message struct {
	Header  http.Header
	Body    any
	ReplyTo Handler
}

Message - message

func NewControlMessage

func NewControlMessage(to, from, event string) *Message

func NewControlMessageWithBody

func NewControlMessageWithBody(to, from, event string, body any) *Message

func NewMessage

func NewMessage(channel, to, from, event string, body any) *Message

func NewMessageWithReply

func NewMessageWithReply(channel, to, from, event string, replyTo Handler) *Message

func NewMessageWithStatus

func NewMessageWithStatus(channel, to, from, event string, status *core.Status) *Message

func (*Message) Channel

func (m *Message) Channel() string

func (*Message) Config

func (m *Message) Config() map[string]string

func (*Message) Content

func (m *Message) Content() (string, any, bool)

func (*Message) ContentType

func (m *Message) ContentType() string

func (*Message) Event

func (m *Message) Event() string

func (*Message) ForwardTo

func (m *Message) ForwardTo() string

func (*Message) From

func (m *Message) From() string

func (*Message) IsContentType

func (m *Message) IsContentType(ct string) bool

func (*Message) RelatesTo

func (m *Message) RelatesTo() string

func (*Message) SetContent

func (m *Message) SetContent(contentType string, content any) error

func (*Message) SetContentType

func (m *Message) SetContentType(contentType string)

func (*Message) SetFrom

func (m *Message) SetFrom(uri string)

func (*Message) SetTo

func (m *Message) SetTo(uri string)

func (*Message) Status

func (m *Message) Status() *core.Status

func (*Message) String

func (m *Message) String() string

func (*Message) To

func (m *Message) To() string

type Notifier

type Notifier interface {
	Notify(status *core.Status) *core.Status
}

type OnShutdown

type OnShutdown interface {
	Add(func())
}

OnShutdown - add functions to be run on shutdown

Example
uri := "urn:agent007"

a := newTestAgent(uri, nil, nil)
a.running = true
a.Shutdown()

a1 := newTestAgent(uri, nil, nil)
if sd, ok := any(a1).(OnShutdown); ok {
	sd.Add(func() { fmt.Printf("test: OnShutdown() -> func-1()\n") })
	sd.Add(func() { fmt.Printf("test: OnShutdown() -> func-2()\n") })
	sd.Add(func() { fmt.Printf("test: OnShutdown() -> func-3()\n") })
}
a1.running = true
a1.Shutdown()
Output:

test: OnShutdown() -> func-1()
test: OnShutdown() -> func-2()
test: OnShutdown() -> func-3()

type OpsAgent

type OpsAgent interface {
	Agent
	Notifier
	Tracer
}

func OpsAgentCast

func OpsAgentCast(agent any) OpsAgent

type SendFunc

type SendFunc func(m *Message)

SendFunc - uniform interface for messaging

type Ticker

type Ticker struct {
	// contains filtered or unexported fields
}

func NewPrimaryTicker

func NewPrimaryTicker(duration time.Duration) *Ticker

func NewTicker

func NewTicker(name string, duration time.Duration) *Ticker

func (*Ticker) C

func (t *Ticker) C() <-chan time.Time

func (*Ticker) Duration

func (t *Ticker) Duration() time.Duration

func (*Ticker) IsFinalized

func (t *Ticker) IsFinalized() bool

func (*Ticker) IsStopped

func (t *Ticker) IsStopped() bool

func (*Ticker) Name

func (t *Ticker) Name() string

func (*Ticker) Reset

func (t *Ticker) Reset()

func (*Ticker) Start

func (t *Ticker) Start(newDuration time.Duration)

func (*Ticker) Stop

func (t *Ticker) Stop()

func (*Ticker) String

func (t *Ticker) String() string

type TraceDispatcher

type TraceDispatcher interface {
	Tracer
}

func NewTraceDispatcher

func NewTraceDispatcher(events []string, channel string) TraceDispatcher

type TraceFilter

type TraceFilter struct {
	Channel string
	Event   string
}

func NewTraceFilter

func NewTraceFilter(channel, event string) *TraceFilter

func (*TraceFilter) Access

func (f *TraceFilter) Access(channel, event string) bool

type Tracer

type Tracer interface {
	Trace(agent Agent, channel, event, activity string)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL