Documentation ¶
Overview ¶
"router" is a Go package for peer-peer pub/sub message passing. The basic usage is to attach a send channel to an id in router to send messages, and attach a recv channel to an id to receive messages. If these 2 ids match, the messages from send channel will be "routed" to recv channel, e.g.
rot := router.New(...) chan1 := make(chan string) chan2 := make(chan string) chan3 := make(chan string) rot.AttachSendChan(PathID("/sports/basketball"), chan1) rot.AttachRecvChan(PathID("/sports/basketball"), chan2) rot.AttachRecvChan(PathID("/sports/*"), chan3)
We can use integers, strings, pathnames, or structs as Ids in router (maybe regex ids and tuple id in future).
we can connect two routers so that channels attached to router1 can communicate with channels attached to router2 transparently.
Index ¶
- Constants
- Variables
- func Broadcast(v reflect.Value, recvers []*RoutedChan)
- func ExportedId(id Id) bool
- func KeepLatestBroadcast(v reflect.Value, recvers []*RoutedChan)
- type BindEvent
- type BindEventType
- type ChanInfo
- type ChanInfoMsg
- type ChanReadyInfo
- type ChanState
- type Channel
- type ConnInfoMsg
- type ConnReadyMsg
- type Demarshaler
- type DispatchFunc
- type DispatchPolicy
- type Dispatcher
- type FaultRaiser
- type FaultRecord
- type FlowControlPolicy
- type FlowRecver
- type FlowSender
- type Id
- type IdFilter
- type IdTranslator
- type IntId
- func (id IntId) Clone(args ...int) (nnid Id, err error)
- func (id IntId) Key() interface{}
- func (id1 IntId) Match(id2 Id) bool
- func (id IntId) MatchType() MatchType
- func (id IntId) Member() int
- func (id IntId) Scope() int
- func (id IntId) String() string
- func (id IntId) SysID(indx int, args ...int) (ssid Id, err error)
- func (id IntId) SysIdIndex() int
- type LogPriority
- type LogRecord
- type LogSink
- type Logger
- type Marshaler
- type MarshalingPolicy
- type MatchType
- type MsgId
- func (id MsgId) Clone(args ...int) (nnid Id, err error)
- func (id MsgId) Key() interface{}
- func (id1 MsgId) Match(id2 Id) bool
- func (id MsgId) MatchType() MatchType
- func (id MsgId) Member() int
- func (id MsgId) Scope() int
- func (id MsgId) String() string
- func (id MsgId) SysID(indx int, args ...int) (ssid Id, err error)
- func (id MsgId) SysIdIndex() int
- type MsgTag
- type PathId
- func (id PathId) Clone(args ...int) (nnid Id, err error)
- func (id PathId) Key() interface{}
- func (id1 PathId) Match(id2 Id) bool
- func (id PathId) MatchType() MatchType
- func (id PathId) Member() int
- func (id PathId) Scope() int
- func (id PathId) String() string
- func (id PathId) SysID(indx int, args ...int) (ssid Id, err error)
- func (id PathId) SysIdIndex() int
- type PolicyFunc
- type Proxy
- type RandomDispatcher
- type RecvChan
- type Recver
- type Roundrobin
- type RoutedChan
- type Router
- type SendChan
- type Sender
- type StrId
- func (id StrId) Clone(args ...int) (nnid Id, err error)
- func (id StrId) Key() interface{}
- func (id1 StrId) Match(id2 Id) bool
- func (id StrId) MatchType() MatchType
- func (id StrId) Member() int
- func (id StrId) Scope() int
- func (id StrId) String() string
- func (id StrId) SysID(indx int, args ...int) (ssid Id, err error)
- func (id StrId) SysIdIndex() int
- type WindowFlowControlPolicy
- type XOnOffFlowControlPolicy
Constants ¶
const ( MemberLocal = iota //peers (send chans and recv chans) are from the same router MemberRemote //peers (send chans and recv chans) are from diff routers NumMembership )
Membership identifies whether communicating peers (send chans and recv chans) are from the same router or diff routers
const ( ScopeGlobal = iota // send to or recv from both local and remote peers ScopeRemote // send to or recv from remote peers ScopeLocal // send to or recv from local peers NumScope )
Scope is the scope to publish/subscribe (or send/recv) msgs
const ( ConnId = iota //msgs for router connection DisconnId //msgs for router disconnection ErrorId //msgs sent when one side detect errors ReadyId //msgs sent when router's chans ready to recv more msgs PubId //send new publications (set<id, chan type info>) UnPubId //remove publications from connected routers SubId //send new subscriptions (set<id, chan type info>) UnSubId //remove subscriptions from connected routers NumSysIds )
Indices for sys msgs, used for creating SysIds
const ( RouterLogId = NumSysIds + iota RouterFaultId NumSysInternalIds )
Some system level internal ids (for router internal logging and fault reporting)
const ( DefLogBufSize = 256 DefDataChanBufSize = 32 DefCmdChanBufSize = 64 UnlimitedBuffer = -1 )
Default size settings in router
Variables ¶
var IntSysIdBase int = -10101 //Base value for SysIds of IntId
define 8 system msg ids
var PathSysIdBase string = "/10101" //Base value for SysIds of PathId
define 8 system msg ids
var StrSysIdBase string = "-10101" //Base value for SysIds of StrId
define 8 system msg ids
Functions ¶
func Broadcast ¶
func Broadcast(v reflect.Value, recvers []*RoutedChan)
Simple broadcast is a plain function
func ExportedId ¶
A function used as predicate in router.idsForSend()/idsForRecv() to find all ids in a router's namespace which are exported to outside
func KeepLatestBroadcast ¶
func KeepLatestBroadcast(v reflect.Value, recvers []*RoutedChan)
KeepLastBroadcast never block. if running out of Chan buffer, drop old items and keep the latest items
Types ¶
type BindEvent ¶
type BindEvent struct { Type BindEventType Count int //total attached }
a message struct containing information for peer (sender/recver) binding/connection. sent by router whenever peer attached or detached.
Type: the type of event just happened: PeerAttach/PeerDetach/EndOfData Count: how many peers are still bound now
type BindEventType ¶
type BindEventType int8
const ( PeerAttach BindEventType = iota PeerDetach EndOfData )
type ChanInfoMsg ¶
type ChanInfoMsg struct {
Info []*ChanInfo
}
a message struct for propagating router's namespace changes (chan attachments or detachments)
type ChanReadyInfo ¶
recver-router notify sender-router which channel are ready to recv how many msgs
func (ChanReadyInfo) String ¶
func (cri ChanReadyInfo) String() string
type ChanState ¶
type ChanState interface { Type() reflect.Type Interface() interface{} IsNil() bool Cap() int Len() int }
basic chan state
type Channel ¶
Channel interface defines functional api of Go's channel: based on reflect.Value's channel related method set allow programming "generic" channels with reflect.Value as msgs add some utility Channel types
type ConnInfoMsg ¶
type ConnInfoMsg struct { ConnInfo string Error string Id Id Type string //async/flowControlled/raw }
a message struct containing information about remote router connection
type ConnReadyMsg ¶
type ConnReadyMsg struct {
Info []*ChanReadyInfo
}
type Demarshaler ¶
type Demarshaler interface {
Demarshal(interface{}) error
}
the common interface of all demarshaler such as GobDemarshaler and JsonDemarshaler
type DispatchFunc ¶
type DispatchFunc func(v reflect.Value, recvers []*RoutedChan)
DispatchFunc is a wrapper to convert a plain function into a dispatcher
func (DispatchFunc) Dispatch ¶
func (f DispatchFunc) Dispatch(v reflect.Value, recvers []*RoutedChan)
type DispatchPolicy ¶
type DispatchPolicy interface {
NewDispatcher() Dispatcher
}
The programming of Dispatchers: 1. do not depend on specific chan types 2. messages sent are represented as reflect.Value 3. receivers are array of RoutedChans with Channel interface of Send()/Recv()/...
DispatchPolicy is used to generate concrete dispatcher instances. For the kind of dispatcher which has no internal state, the same instance can be returned.
var BroadcastPolicy DispatchPolicy = PolicyFunc(func() Dispatcher { return DispatchFunc(Broadcast) })
BroadcastPolicy is used to generate broadcast dispatcher instances
var KeepLatestBroadcastPolicy DispatchPolicy = PolicyFunc(func() Dispatcher { return DispatchFunc(KeepLatestBroadcast) })
KeepLatestBroadcastPolicy is used to generate KeepLatest broadcast dispatcher instances
var RandomPolicy DispatchPolicy = PolicyFunc(func() Dispatcher { return NewRandomDispatcher() })
RandomPolicy is used to generate random dispatchers
var RoundRobinPolicy DispatchPolicy = PolicyFunc(func() Dispatcher { return NewRoundrobin() })
RoundRobinPolicy is ued to generate roundrobin dispatchers
type Dispatcher ¶
type Dispatcher interface {
Dispatch(v reflect.Value, recvers []*RoutedChan)
}
Dispatcher is the common interface of all dispatchers
type FaultRaiser ¶
FaultRaiser can be embedded into user structs/ types, which then can call Raise() directly
func NewFaultRaiser ¶
func NewFaultRaiser(id Id, r Router, src string) *FaultRaiser
create a new FaultRaiser to send FaultRecords to id in router "r"
func (*FaultRaiser) Close ¶
func (l *FaultRaiser) Close()
func (*FaultRaiser) Init ¶
func (l *FaultRaiser) Init(id Id, r Router, src string) *FaultRaiser
func (*FaultRaiser) Raise ¶
func (r *FaultRaiser) Raise(msg error)
raise a fault - send a FaultRecord to faultId in router
type FaultRecord ¶
FaultRecord records some details about fault
type FlowControlPolicy ¶
type FlowControlPolicy interface { NewFlowSender(ch Channel, args ...interface{}) (FlowSender, error) NewFlowRecver(ch Channel, ack func(int), args ...interface{}) (FlowRecver, error) //Stringer interface, return name of FlowControlPolicy String() string }
FlowControlPolicy: implement diff flow control protocols. a flow control protocol has two parts:
sender: which send msgs and recv acks from recver. recver: which recv msgs and send acks to sender.
So besides wrapping a transport Channel (for send/recv msgs)
FlowSender expose Ack(int) method to recv acks FlowRecver constructor will take as argument a ack(int) callback for sending acks.
The following Windowing and XOnOff protocols are copied from Chapter 4 of "Design And Validation Of Computer Protocols" by Gerard J. Holzmann.
var WindowFlowController FlowControlPolicy = WindowFlowControlPolicy(0)
var XOnOffFlowController FlowControlPolicy = &XOnOffFlowControlPolicy{0.75, 0.25}
type FlowRecver ¶
type FlowRecver interface { Channel }
type FlowSender ¶
type Id ¶
type Id interface { //methods to query Id content Scope() int Member() int //key value for storing Id in map Key() interface{} //for id matching Match(Id) bool MatchType() MatchType //Generators for creating other ids of same type. Since often we don't //know the exact types of Id.Val, so we have to create new ones from an existing id Clone(...int) (Id, error) //create a new id with same id, but possible diff scope & membership SysID(int, ...int) (Id, error) //generate sys ids, also called as method of Router SysIdIndex() int //return (0 - NumSysInternalIds) for SysIds, return -1 for others //Stringer interface String() string }
Id defines the common interface shared by all kinds of ids: integers/strings/pathnames...
var ( DummyIntId Id = &IntId{Val: -10201} DummyStrId Id = &StrId{Val: "-10201"} DummyPathId Id = &PathId{Val: "-10201"} DummyMsgId Id = &MsgId{Val: MsgTag{-10201, -10201}} )
Some dummy ids, often used as seedId when creating router
func IntID ¶
func IntID(args ...interface{}) Id
IntId constructor, accepting the following arguments: Val int ScopeVal int MemberVal int
func MsgID ¶
func MsgID(args ...interface{}) Id
MsgId constructor, accepting the following arguments: Family int Tag int ScopeVal int MemberVal int
type IdFilter ¶
IdFilter: the common interface of filters. concrete filters should be defined by apps with app-specific rules. if no filter defined, there is no id filtering. 1. bound with specific proxy 2. defines which ids can pass in / out to router thru this proxy 3. only filter the ids of application msgs (NOT system msgs), only involved in processing namespace change msgs: PubId/SubId 4. by default, if no filter is defined, everything is allowed 5. filters are used against ids in local namespace, not translated ones
type IdTranslator ¶
IdTransltor: the common interface of translators. concrete transltors should be defined by apps with app-specific rules. if no translator defined, there is no id transltions. 1. bound with specific proxy 2. translate ids of in / out msgs thru this proxy, effectively "mount" the msgs thru this proxy / conn to a subrange of router's id space 3. only translate the ids of application msgs (NOT system msgs), and it will affect the ids of every app msgs passed thru this proxy - must be highly efficient 4. by default, if no translator is defined, no translation
type LogPriority ¶
type LogPriority int
const ( LOG_INFO LogPriority = iota LOG_DEBUG LOG_WARN LOG_ERROR )
func (LogPriority) String ¶
func (lp LogPriority) String() string
type LogRecord ¶
type LogRecord struct { Pri LogPriority Source string Info interface{} Timestamp int64 }
LogRecord stores the log information
type LogSink ¶
type LogSink struct {
// contains filtered or unexported fields
}
A simple log sink, showing log messages in console.
func NewLogSink ¶
create a new log sink, which receives log messages from id in router "r"
type Logger ¶
Logger can be embedded into user structs / types, which then can use Log() / LogError() directly
func NewLogger ¶
NewLogger will create a Logger object which sends log messages thru id in router "r"
func (*Logger) Log ¶
func (l *Logger) Log(p LogPriority, msg interface{})
send a log record to log id in router
type Marshaler ¶
type Marshaler interface {
Marshal(interface{}) error
}
the common interface of all marshaler such as GobMarshaler and JsonMarshaler
type MarshalingPolicy ¶
type MarshalingPolicy interface { NewMarshaler(io.Writer) Marshaler NewDemarshaler(io.Reader) Demarshaler Register(interface{}) }
the common interface of all Marshaling policy such as GobMarshaling and JsonMarshaling
var GobMarshaling MarshalingPolicy = &gobMarshalingPolicy{registry: make(map[interface{}]bool)}
use package "gob" for marshaling
var JsonMarshaling MarshalingPolicy = jsonMarshalingPolicy(1)
use package "json" for marshaling
type MatchType ¶
type MatchType int
MatchType describes the types of namespaces and match algorithms used for id-matching
type MsgId ¶
func (MsgId) SysIdIndex ¶
type MsgTag ¶
type MsgTag struct { Family int //divide all msgs into families: system, fault, provision,... Tag int //further division inside a family }
Use a common msgTag as Id
define 8 system msg ids
type PathId ¶
Use file-system like pathname as ids PathId has diff Match() algo from StrId
func (PathId) SysIdIndex ¶
type PolicyFunc ¶
type PolicyFunc func() Dispatcher
func (PolicyFunc) NewDispatcher ¶
func (f PolicyFunc) NewDispatcher() Dispatcher
type Proxy ¶
type Proxy interface { //Connect to a local router Connect(Proxy) error //Connect to a remote router thru io conn //1. io.ReadWriteCloser: transport connection //2. MarshalingPolicy: gob or json marshaling //3. remaining args can be a FlowControlPolicy (e.g. window based or XOnOff) ConnectRemote(io.ReadWriteCloser, MarshalingPolicy, ...interface{}) error //close proxy and disconnect from peer Close() //query messaging interface with peer LocalPubInfo() []*ChanInfo LocalSubInfo() []*ChanInfo PeerPubInfo() []*ChanInfo PeerSubInfo() []*ChanInfo }
Proxy is the primary interface to connect router to its peer router. At both ends of a connection, there is a proxy object for its router. Simple router connection can be set up thru calling Router.Connect(). Proxy.Connect() can be used to set up more complicated connections, such as setting IdFilter to allow only a subset of messages pass thru the connection, or setting IdTranslator which can "relocate" remote message ids into a subspace in local router's namespace, or setting a flow control policy. Proxy.Close() is called to disconnect router from its peer.
func NewProxy ¶
func NewProxy(r Router, name string, f IdFilter, t IdTranslator) Proxy
Proxy constructor. It accepts the following arguments:
- r: the router which will be bound with this proxy and be owner of this proxy
- name: proxy's name, used in log messages if owner router's log is turned on
- f: IdFilter to be installed at this proxy
- t: IdTranslator to be installed at this proxy
type RandomDispatcher ¶
Random dispatcher
func NewRandomDispatcher ¶
func NewRandomDispatcher() *RandomDispatcher
func (*RandomDispatcher) Dispatch ¶
func (rd *RandomDispatcher) Dispatch(v reflect.Value, recvers []*RoutedChan)
type Roundrobin ¶
type Roundrobin int
Roundrobin dispatcher will keep the "next" index as its state
func NewRoundrobin ¶
func NewRoundrobin() *Roundrobin
func (*Roundrobin) Dispatch ¶
func (r *Roundrobin) Dispatch(v reflect.Value, recvers []*RoutedChan)
type RoutedChan ¶
type RoutedChan struct { Dir reflect.ChanDir Id Id Channel //external SendChan/RecvChan, attached by clients // contains filtered or unexported fields }
RoutedChan represents channels which are attached to router. They expose Channel's interface: Send()/TrySend()/Recv()/TryRecv()/... and additional info:
- Id - the id which channel is attached to
- NumPeers() - return the number of bound peers
- Peers() - array of bound peers RoutedChan
- Detach() - detach the channel from router
func (*RoutedChan) Detach ¶
func (e *RoutedChan) Detach()
func (*RoutedChan) Interface ¶
func (e *RoutedChan) Interface() interface{}
func (*RoutedChan) NumPeers ¶
func (e *RoutedChan) NumPeers() int
func (*RoutedChan) Peers ¶
func (e *RoutedChan) Peers() (copySet []*RoutedChan)
type Router ¶
type Router interface { //---- core api ---- //Attach chans to id in router, with an optional argument (chan *BindEvent) //When specified, the optional argument will serve two purposes: //1. used to tell when the remote peers connecting/disconn //2. in AttachRecvChan, used as a flag to ask router to keep recv chan open when all senders close //the returned RoutedChan object can be used to find the number of bound peers: routCh.NumPeers() AttachSendChan(Id, interface{}, ...interface{}) (*RoutedChan, error) //3. When attaching recv chans, an optional integer can specify the internal buffering size AttachRecvChan(Id, interface{}, ...interface{}) (*RoutedChan, error) //Detach sendChan/recvChan from router DetachChan(Id, interface{}) error //Shutdown router, and close attached proxies and chans Close() //Connect to a local router Connect(Router) (Proxy, Proxy, error) //Connect to a remote router thru io conn //1. io.ReadWriteCloser: transport connection //2. MarshalingPolicy: gob or json marshaling //3. remaining args can be a FlowControlPolicy (e.g. window based or XOnOff) ConnectRemote(io.ReadWriteCloser, MarshalingPolicy, ...interface{}) (Proxy, error) //--- other utils --- //return pre-created SysIds according to the router's id-type, with ScopeGlobal / MemberLocal SysID(idx int) Id //create a new SysId with "args..." specifying scope/membership NewSysID(idx int, args ...int) Id //return all ids and their ChanTypes from router's namespace which satisfy predicate IdsForSend(predicate func(id Id) bool) map[interface{}]*ChanInfo IdsForRecv(predicate func(id Id) bool) map[interface{}]*ChanInfo }
Router is the main access point to functionality. Applications will create an instance of it thru router.New(...) and attach channels to it
func New ¶
func New(seedId Id, bufSize int, disp DispatchPolicy, args ...interface{}) Router
New is router constructor. It accepts the following arguments:
- seedId: a dummy id to show what type of ids will be used. New ids will be type-checked against this.
- bufSize: the buffer size used for router's internal channels. if bufSize >= 0, its value will be used if bufSize < 0, it means unlimited buffering, so router is async and sending on attached channels will never block
- disp: dispatch policy for router. by default, it is BroadcastPolicy
- optional arguments ...: name: router's name, if name is defined, router internal logging will be turned on, ie LogRecord generated LogScope: if this is set, a console log sink is installed to show router internal log if logScope == ScopeLocal, only log msgs from local router will show up if logScope == ScopeGlobal, all log msgs from connected routers will show up
type WindowFlowControlPolicy ¶
type WindowFlowControlPolicy byte
WindowFlowController: simple window flow control protocol for lossless transport the transport Channel between Sender, Recver should have capacity >= expected credit Figure 4.5 in Gerard's book
func (WindowFlowControlPolicy) NewFlowRecver ¶
func (wfc WindowFlowControlPolicy) NewFlowRecver(ch Channel, ack func(int), args ...interface{}) (FlowRecver, error)
func (WindowFlowControlPolicy) NewFlowSender ¶
func (wfc WindowFlowControlPolicy) NewFlowSender(ch Channel, args ...interface{}) (FlowSender, error)
func (WindowFlowControlPolicy) String ¶
func (wfc WindowFlowControlPolicy) String() string
type XOnOffFlowControlPolicy ¶
type XOnOffFlowControlPolicy struct {
// contains filtered or unexported fields
}
X-on/X-off protocol Figure 4.2 and Figure 4.3 in Gerard's book
func (*XOnOffFlowControlPolicy) NewFlowRecver ¶
func (fcp *XOnOffFlowControlPolicy) NewFlowRecver(ch Channel, ack func(int), args ...interface{}) (FlowRecver, error)
func (*XOnOffFlowControlPolicy) NewFlowSender ¶
func (fcp *XOnOffFlowControlPolicy) NewFlowSender(ch Channel, args ...interface{}) (FlowSender, error)
func (*XOnOffFlowControlPolicy) String ¶
func (fcp *XOnOffFlowControlPolicy) String() string