Documentation
¶
Index ¶
- Constants
- Variables
- func AddShutdown(curr, next func()) func()
- func DrainAndClose(duration time.Duration, c chan *Message)
- func EventErrorStatus(agentId string, msg *Message) *core.Status
- func IsFinalized(attempts int, sleep time.Duration, finalized func() bool) bool
- func MessageContentTypeErrorStatus(agentId string, msg *Message) *core.Status
- func Ping(ex *Exchange, uri any) *core.Status
- func Receiver(interval time.Duration, reply <-chan *Message, result chan<- *core.Status, ...)
- func SendReply(msg *Message, status *core.Status)
- type Agent
- type Cache
- func (r *Cache) Add(msg *Message) error
- func (r *Cache) Count() int
- func (r *Cache) ErrorList() []error
- func (r *Cache) Exclude(event string, status int) []string
- func (r *Cache) Filter(event string, code int, include bool) []string
- func (r *Cache) Get(uri string) (*Message, bool)
- func (r *Cache) Include(event string, status int) []string
- func (r *Cache) Uri() []string
- type Channel
- type DoneFunc
- type Exchange
- func (e *Exchange) Broadcast(msg *Message)
- func (e *Exchange) Count() int
- func (e *Exchange) Get(uri string) Agent
- func (e *Exchange) GetMailbox(uri string) Mailbox
- func (e *Exchange) IsFinalized() bool
- func (e *Exchange) List() []string
- func (e *Exchange) Register(agent Agent) error
- func (e *Exchange) RegisterMailbox(m Mailbox) error
- func (e *Exchange) Send(msg *Message) error
- func (e *Exchange) Shutdown()
- type Finalizer
- type Handler
- type Mailbox
- type Map
- type Message
- func NewControlMessage(to, from, event string) *Message
- func NewControlMessageWithBody(to, from, event string, body any) *Message
- func NewMessage(channel, to, from, event string, body any) *Message
- func NewMessageWithReply(channel, to, from, event string, replyTo Handler) *Message
- func NewMessageWithStatus(channel, to, from, event string, status *core.Status) *Message
- func (m *Message) Channel() string
- func (m *Message) Config() map[string]string
- func (m *Message) Content() (string, any, bool)
- func (m *Message) ContentType() string
- func (m *Message) Event() string
- func (m *Message) ForwardTo() string
- func (m *Message) From() string
- func (m *Message) IsContentType(ct string) bool
- func (m *Message) RelatesTo() string
- func (m *Message) SetContent(contentType string, content any) error
- func (m *Message) SetContentType(contentType string)
- func (m *Message) SetFrom(uri string)
- func (m *Message) SetTo(uri string)
- func (m *Message) Status() *core.Status
- func (m *Message) String() string
- func (m *Message) To() string
- type Notifier
- type OnShutdown
- type OpsAgent
- type SendFunc
- type Ticker
- func (t *Ticker) C() <-chan time.Time
- func (t *Ticker) Duration() time.Duration
- func (t *Ticker) IsFinalized() bool
- func (t *Ticker) IsStopped() bool
- func (t *Ticker) Name() string
- func (t *Ticker) Reset()
- func (t *Ticker) Start(newDuration time.Duration)
- func (t *Ticker) Stop()
- func (t *Ticker) String() string
- type TraceDispatcher
- type TraceFilter
- type Tracer
Examples ¶
Constants ¶
const ( EmissaryChannel = "emissary" MasterChannel = "master" PrimaryChannel = "primary" )
const ( StartupEvent = "event:startup" ShutdownEvent = "event:shutdown" RestartEvent = "event:restart" ProcessEvent = "event:process" HostStartupEvent = "event:host-startup" PingEvent = "event:ping" ReconfigureEvent = "event:reconfigure" ChangesetApplyEvent = "event:changeset-apply" ChangesetRollbackEvent = "event:changeset-rollback" DataEvent = "event:data" StatusEvent = "event:status" DataChangeEvent = "event:data-change" ObservationEvent = "event:observation" TickEvent = "event:tick" PauseEvent = "event:pause" // disable data channel receive ResumeEvent = "event:resume" // enable data channel receive ContentType = "Content-Type" XRelatesTo = "x-relates-to" XMessageId = "x-message-id" XTo = "x-to" XFrom = "x-from" XEvent = "x-event" XChannel = "x-channel" XAgentId = "x-agent-id" XForwardTo = "x-forward-to" ContentTypeStatus = "application/status" ContentTypeConfig = "application/config" DataChannelType = "DATA" ControlChannelType = "CTRL" )
const (
ChannelSize = 16
)
const (
PkgPath = "github/advanced-go/stdlib/messaging"
)
const (
PrimaryTicker = "PRIMARY"
)
Variables ¶
var ( LogErrorNotifier = new(logError) OutputErrorNotifier = new(outputError) )
var (
DefaultTracer = new(defaultTracer)
)
Functions ¶
func AddShutdown ¶
func AddShutdown(curr, next func()) func()
func DrainAndClose ¶
func Receiver ¶
func Receiver(interval time.Duration, reply <-chan *Message, result chan<- *core.Status, done DoneFunc)
Receiver - receives reply messages and forwards to a function which will return true if the receiving is complete. The interval bounds the time spent receiving, and result status is sent on the status channel.
Types ¶
type Agent ¶
Agent - intelligent agent TODO : Track agent assignment as part of the URI or separate identifier?? //Uri() string
//Message(m *Message) Track agent NID or class/type?
func NewControlAgent ¶
NewControlAgent - create an agent that only listens on a control channel, and has a default AgentRun func
Example ¶
package main import ( "fmt" "time" ) func newAgentCtrlHandler(msg *Message) { fmt.Printf(fmt.Sprintf("test: NewControlAgent_CtrlHandler() -> %v\n", msg.Event())) } func main() { defer func() { if r := recover(); r != nil { fmt.Printf("test: NewControlAgent() -> [recovered:%v]\n", r) } }() //ex := NewExchange() //any(NewDirectory()).(*directory) //c := make(chan Message, 16) uri := "github.com/advanced-go/example-domain/activity" a, error0 := NewControlAgent(uri, newAgentCtrlHandler) if error0 != nil { fmt.Printf("test: NewControlAgent() -> [err:%v]\n", error0) } //status = a.Register(agentDir) //if !status.OK() { // fmt.Printf("test: Register() -> [status:%v]\n", status) //} // 1 -10 Nanoseconds works for a direct send to a channel, sending via an controller2 needs a longer sleep time //d := time.Nanosecond * 10 // Needed time.Nanoseconds * 50 for directory send with mutex // Needed time.Nanoseconds * 1 for directory send via sync.Map d := time.Nanosecond * 1 a.Run() a.Message(NewControlMessage(uri, "", StartupEvent)) //c <- Message{To: "", From: "", Event: core.StartupEvent, RelatesTo: "", Status: nil, Content: nil, ReplyTo: nil} time.Sleep(d) a.Message(NewControlMessage(uri, "", PauseEvent)) //c <- Message{To: "", From: "", Event: core.PauseEvent, RelatesTo: "", Status: nil, Content: nil, ReplyTo: nil} time.Sleep(d) a.Message(NewControlMessage(uri, "", ResumeEvent)) //c <- Message{To: "", From: "", Event: core.ResumeEvent, RelatesTo: "", Status: nil, Content: nil, ReplyTo: nil} time.Sleep(d) a.Message(NewControlMessage(uri, "", PingEvent)) //c <- Message{To: "", From: "", Event: core.PingEvent, RelatesTo: "", Status: nil, Content: nil, ReplyTo: nil} time.Sleep(d) a.Message(NewControlMessage(uri, "", ReconfigureEvent)) //c <- Message{To: "", From: "", Event: core.ReconfigureEvent, RelatesTo: "", Status: nil, Content: nil, ReplyTo: nil} time.Sleep(d) a.Shutdown() //.SendCtrl(Message{To: uri, From: "", Event: core.ShutdownEvent}) //c <- Message{To: "", From: "", Event: core.ShutdownEvent, RelatesTo: "", Status: nil, Content: nil, ReplyTo: nil} time.Sleep(time.Millisecond * 100) a.Shutdown() // will panic //c <- Message{} }
Output: test: NewControlAgent_CtrlHandler() -> event:startup test: NewControlAgent_CtrlHandler() -> event:pause test: NewControlAgent_CtrlHandler() -> event:resume test: NewControlAgent_CtrlHandler() -> event:ping test: NewControlAgent_CtrlHandler() -> event:reconfigure test: NewControlAgent_CtrlHandler() -> event:shutdown
type Cache ¶
type Cache struct {
// contains filtered or unexported fields
}
Cache - message cache by uri
func (*Cache) Add ¶
Add - add a message
Example ¶
resp := NewCache() resp.Add(NewMessageWithStatus(channelNone, "to-uri", "from-uri-0", StartupEvent, core.NewStatus(StatusNotProvided))) resp.Add(NewMessageWithStatus(channelNone, "to-uri", "from-uri-1", StartupEvent, core.StatusOK())) resp.Add(NewMessageWithStatus(channelNone, "to-uri", "from-uri-2", PingEvent, core.NewStatus(StatusNotProvided))) resp.Add(NewMessageWithStatus(channelNone, "to-uri", "from-uri-3", PingEvent, core.NewStatus(StatusNotProvided))) resp.Add(NewMessageWithStatus(channelNone, "to-uri", "from-uri-4", PingEvent, core.StatusOK())) fmt.Printf("test: count() -> : %v\n", resp.Count()) m, ok := resp.Get("invalid") fmt.Printf("test: Get(%v) -> : [ok:%v] [msg-nil:%v]\n", "invalid", ok, m == nil) m, ok = resp.Get("from-uri-3") fmt.Printf("test: Get(%v) -> : [ok:%v] [msg-to:%v]\n", "from-uri-3", ok, len(m.To()) > 0) fmt.Printf("test: include(%v,%v) -> : %v\n", ShutdownEvent, StatusNotProvided, resp.Include(ShutdownEvent, StatusNotProvided)) fmt.Printf("test: exclude(%v,%v) -> : %v\n", ShutdownEvent, StatusNotProvided, resp.Exclude(ShutdownEvent, StatusNotProvided)) fmt.Printf("test: include(%v,%v) -> : %v\n", StartupEvent, StatusNotProvided, resp.Include(StartupEvent, StatusNotProvided)) fmt.Printf("test: exclude(%v,%v) -> : %v\n", StartupEvent, StatusNotProvided, resp.Exclude(StartupEvent, StatusNotProvided)) fmt.Printf("test: include(%v,%v) -> : %v\n", PingEvent, StatusNotProvided, resp.Include(PingEvent, StatusNotProvided)) fmt.Printf("test: exclude(%v,%v) -> : %v\n", PingEvent, StatusNotProvided, resp.Exclude(PingEvent, StatusNotProvided))
Output: test: count() -> : 5 test: Get(invalid) -> : [ok:false] [msg-nil:true] test: Get(from-uri-3) -> : [ok:true] [msg-to:true] test: include(event:shutdown,95) -> : [] test: exclude(event:shutdown,95) -> : [from-uri-0 from-uri-1 from-uri-2 from-uri-3 from-uri-4] test: include(event:startup,95) -> : [from-uri-0] test: exclude(event:startup,95) -> : [from-uri-1 from-uri-2 from-uri-3 from-uri-4] test: include(event:ping,95) -> : [from-uri-2 from-uri-3] test: exclude(event:ping,95) -> : [from-uri-0 from-uri-1 from-uri-4]
func (*Cache) Uri ¶
Uri - list the URI's in the cache
Example ¶
resp := NewCache() resp.Add(NewMessageWithStatus(channelNone, "to-uri", "from-uri-0", StartupEvent, core.NewStatus(StatusNotProvided))) resp.Add(NewMessageWithStatus(channelNone, "to-uri", "from-uri-1", StartupEvent, core.StatusOK())) resp.Add(NewMessageWithStatus(channelNone, "to-uri", "from-uri-2", PingEvent, core.NewStatus(StatusNotProvided))) resp.Add(NewMessageWithStatus(channelNone, "to-uri", "from-uri-3", PingEvent, core.NewStatus(StatusNotProvided))) resp.Add(NewMessageWithStatus(channelNone, "to-uri", "from-uri-4", PingEvent, core.StatusOK())) fmt.Printf("test: count() -> : %v\n", resp.Count()) m, ok := resp.Get("invalid") fmt.Printf("test: Get(%v) -> : [ok:%v] [msg-nil:%v]\n", "invalid", ok, m == nil) m, ok = resp.Get("from-uri-3") fmt.Printf("test: Get(%v) -> : [ok:%v] [msg-to:%v]\n", "from-uri-3", ok, m.To()) fmt.Printf("test: include(%v,%v) -> : %v\n", ShutdownEvent, StatusNotProvided, resp.Include(ShutdownEvent, StatusNotProvided)) fmt.Printf("test: exclude(%v,%v) -> : %v\n", ShutdownEvent, StatusNotProvided, resp.Exclude(ShutdownEvent, StatusNotProvided)) fmt.Printf("test: include(%v,%v) -> : %v\n", StartupEvent, StatusNotProvided, resp.Include(StartupEvent, StatusNotProvided)) fmt.Printf("test: exclude(%v,%v) -> : %v\n", StartupEvent, StatusNotProvided, resp.Exclude(StartupEvent, StatusNotProvided)) fmt.Printf("test: include(%v,%v) -> : %v\n", PingEvent, StatusNotProvided, resp.Include(PingEvent, StatusNotProvided)) fmt.Printf("test: exclude(%v,%v) -> : %v\n", PingEvent, StatusNotProvided, resp.Exclude(PingEvent, StatusNotProvided))
Output: test: count() -> : 5 test: Get(invalid) -> : [ok:false] [msg-nil:true] test: Get(from-uri-3) -> : [ok:true] [msg-to:to-uri] test: include(event:shutdown,95) -> : [] test: exclude(event:shutdown,95) -> : [from-uri-0 from-uri-1 from-uri-2 from-uri-3 from-uri-4] test: include(event:startup,95) -> : [from-uri-0] test: exclude(event:startup,95) -> : [from-uri-1 from-uri-2 from-uri-3 from-uri-4] test: include(event:ping,95) -> : [from-uri-2 from-uri-3] test: exclude(event:ping,95) -> : [from-uri-0 from-uri-1 from-uri-4]
type Channel ¶
type Channel struct { C chan *Message // contains filtered or unexported fields }
func NewChannel ¶
Example ¶
c := NewChannel("test", false) fmt.Printf("test: NewChannel() -> [name:%v]\n", c) fmt.Printf("test: NewChannel() -> [enabled:%v]\n", c.IsEnabled()) c.Enable() fmt.Printf("test: NewChannel_Enable() -> [enabled:%v]\n", c.IsEnabled()) c.Disable() fmt.Printf("test: NewChannel_Disable() -> [enabled:%v]\n", c.IsEnabled()) c.Close() fmt.Printf("test: NewChannel_Close() -> [closed:%v]\n", c.C == nil)
Output: test: NewChannel() -> [name:test] test: NewChannel() -> [enabled:false] test: NewChannel_Enable() -> [enabled:true] test: NewChannel_Disable() -> [enabled:false] test: NewChannel_Close() -> [closed:true]
func NewEmissaryChannel ¶
func NewMasterChannel ¶
func NewPrimaryChannel ¶
func (*Channel) IsFinalized ¶
type Exchange ¶
type Exchange struct {
// contains filtered or unexported fields
}
Exchange - controller2 directory
func (*Exchange) Broadcast ¶
Broadcast - broadcast a message to all entries, deleting the entry if the message event is Shutdown
func (*Exchange) GetMailbox ¶
GetMailbox - find a mailbox
func (*Exchange) IsFinalized ¶
IsFinalized - determine if all agents have been shutdown and removed from the exchange
func (*Exchange) RegisterMailbox ¶
RegisterMailbox - register a mailbox
type Handler ¶
type Handler func(msg *Message)
Handler - uniform interface for message handling
func NewCacheHandler ¶
NewCacheHandler - handler to receive messages into a cache.
func NewReceiverReplyTo ¶
type Message ¶
Message - message
func NewControlMessage ¶
func NewMessage ¶
func NewMessageWithReply ¶
func NewMessageWithStatus ¶
func (*Message) ContentType ¶
func (*Message) IsContentType ¶
func (*Message) SetContentType ¶
type OnShutdown ¶
type OnShutdown interface {
Add(func())
}
OnShutdown - add functions to be run on shutdown
Example ¶
uri := "urn:agent007" a := newTestAgent(uri, nil, nil) a.running = true a.Shutdown() a1 := newTestAgent(uri, nil, nil) if sd, ok := any(a1).(OnShutdown); ok { sd.Add(func() { fmt.Printf("test: OnShutdown() -> func-1()\n") }) sd.Add(func() { fmt.Printf("test: OnShutdown() -> func-2()\n") }) sd.Add(func() { fmt.Printf("test: OnShutdown() -> func-3()\n") }) } a1.running = true a1.Shutdown()
Output: test: OnShutdown() -> func-1() test: OnShutdown() -> func-2() test: OnShutdown() -> func-3()
type OpsAgent ¶
func OpsAgentCast ¶
type Ticker ¶
type Ticker struct {
// contains filtered or unexported fields
}
func NewPrimaryTicker ¶
func (*Ticker) IsFinalized ¶
type TraceDispatcher ¶
type TraceDispatcher interface { Tracer }
func NewTraceDispatcher ¶
func NewTraceDispatcher(events []string, channel string) TraceDispatcher
type TraceFilter ¶
func NewTraceFilter ¶
func NewTraceFilter(channel, event string) *TraceFilter
func (*TraceFilter) Access ¶
func (f *TraceFilter) Access(channel, event string) bool