Documentation ¶
Overview ¶
Package collect provides facilities for the Publish/Subscribe/Collect pattern of message propagation and data collections, useful for distributed query.
Index ¶
- Constants
- Variables
- func DefaultMsgIDFn(pmsg *pubsub.PbMessage) string
- type AsyncPubSub
- func (ap *AsyncPubSub) Close() error
- func (ap *AsyncPubSub) LoadTopicItem(topic string, key interface{}) (value interface{}, err error)
- func (ap *AsyncPubSub) Publish(ctx context.Context, topic string, data []byte) (err error)
- func (ap *AsyncPubSub) SetTopicItem(topic string, key interface{}, value interface{}) error
- func (ap *AsyncPubSub) Subscribe(topic string, handle TopicHandle) (err error)
- func (ap *AsyncPubSub) Topics() (out []string)
- func (ap *AsyncPubSub) Unsubscribe(topic string) (err error)
- type BasicPubSubCollector
- func (bpsc *BasicPubSubCollector) Close() (err error)
- func (bpsc *BasicPubSubCollector) Join(topic string, opts ...JoinOpt) (err error)
- func (bpsc *BasicPubSubCollector) Leave(topic string) (err error)
- func (bpsc *BasicPubSubCollector) Publish(topic string, payload []byte, opts ...PubOpt) (err error)
- type Conf
- type EvalReqReq
- type FinalRespHandler
- type HostWires
- func (hw *HostWires) ClosedStream(network.Network, network.Stream)
- func (hw *HostWires) Connected(n network.Network, c network.Conn)
- func (hw *HostWires) Disconnected(n network.Network, c network.Conn)
- func (hw *HostWires) ID() peer.ID
- func (hw *HostWires) Listen(network.Network, ma.Multiaddr)
- func (hw *HostWires) ListenClose(network.Network, ma.Multiaddr)
- func (hw *HostWires) Neighbors() []peer.ID
- func (hw *HostWires) OpenedStream(network.Network, network.Stream)
- func (hw *HostWires) SendMsg(to peer.ID, data []byte) error
- func (hw *HostWires) SetListener(wn WireListener)
- func (hw *HostWires) SetMsgHandler(h MsgHandler)
- type InitOpt
- type InitOpts
- type IntBFS
- type IntBFSCollector
- type IntBFSOptions
- type Intermediate
- type JoinOpt
- type JoinOpts
- type Logger
- type Message
- type Msg
- type MsgHandler
- type PeerSet
- type Profile
- type ProfileFactory
- type PubOpt
- type PubOpts
- type PubReq
- type PubSubCollector
- type PubSubFactory
- type RelayPubSubCollector
- type ReqIDFn
- type Request
- type RequestHandler
- type RequestID
- type Response
- type TopicHandle
- type TopicHub
- func (th *TopicHub) Close() error
- func (th *TopicHub) HandlePeerDown(p peer.ID, topic string)
- func (th *TopicHub) HandlePeerUp(p peer.ID, topic string)
- func (th *TopicHub) Join(topic string) error
- func (th *TopicHub) JoinWithNotifiee(topic string, wn WireListener) error
- func (th *TopicHub) Leave(topic string) error
- func (th *TopicHub) Wires(topic string) Wires
- type TopicMsgHandler
- type TopicOpt
- type TopicWireListener
- type TopicWires
- type WireListener
- type Wires
Constants ¶
const (
HostWiresProcotolID = "/hostwires/1.0.0"
)
const (
IntBFSProtocolID = protocol.ID("/intbfs/1.0.0")
)
Variables ¶
Functions ¶
func DefaultMsgIDFn ¶
DefaultMsgIDFn should be used with DefaultMsgIDFn.
Types ¶
type AsyncPubSub ¶
type AsyncPubSub struct {
// contains filtered or unexported fields
}
AsyncPubSub encapsulates pubsub, provides async methods to get subscribe messages. AsyncPubSub also manages the joined topics
func NewAsyncPubSub ¶
func NewAsyncPubSub(h host.Host, opts ...TopicOpt) (apsub *AsyncPubSub, err error)
NewAsyncPubSub returns a new Topics instance. If WithCustomPubSubFactory is not set, a default randomsub will be used.
func (*AsyncPubSub) LoadTopicItem ¶
func (ap *AsyncPubSub) LoadTopicItem(topic string, key interface{}) (value interface{}, err error)
LoadTopicItem .
func (*AsyncPubSub) SetTopicItem ¶
func (ap *AsyncPubSub) SetTopicItem(topic string, key interface{}, value interface{}) error
SetTopicItem .
func (*AsyncPubSub) Subscribe ¶
func (ap *AsyncPubSub) Subscribe(topic string, handle TopicHandle) (err error)
Subscribe a topic Subscribe a same topic is ok, but the previous handle will be replaced.
func (*AsyncPubSub) Topics ¶
func (ap *AsyncPubSub) Topics() (out []string)
Topics returns the subscribed topics
func (*AsyncPubSub) Unsubscribe ¶
func (ap *AsyncPubSub) Unsubscribe(topic string) (err error)
Unsubscribe the given topic
type BasicPubSubCollector ¶
type BasicPubSubCollector struct {
// contains filtered or unexported fields
}
BasicPubSubCollector implements of psc.BasicPubSubCollector Interface. It broadcasts request by libp2p.PubSub. When the remote nodes receive the request, they will try to dial the request node and return the response directly.
func NewBasicPubSubCollector ¶
func NewBasicPubSubCollector(h host.Host, options ...InitOpt) (bpsc *BasicPubSubCollector, err error)
NewBasicPubSubCollector returns a new BasicPubSubCollector
func (*BasicPubSubCollector) Close ¶
func (bpsc *BasicPubSubCollector) Close() (err error)
Close the BasicPubSubCollector.
func (*BasicPubSubCollector) Join ¶
func (bpsc *BasicPubSubCollector) Join(topic string, opts ...JoinOpt) (err error)
Join the overlay network defined by topic Join the same topic is allowed here. Rejoin will refresh the requestHandler.
func (*BasicPubSubCollector) Leave ¶
func (bpsc *BasicPubSubCollector) Leave(topic string) (err error)
Leave the overlay. The registered topichandles and responseHandlers will be closed.
type Conf ¶
type Conf struct { // Router is an option to select different router type // Router must be one of `basic`, `relay` and `intbfs` Router string `json:"router"` // ProtocolPrefix is the protocol name prefix ProtocolPrefix string `json:"protocol"` // RequestCacheSize . // RequestCache is used to store the request control message, // which is for response routing. RequestCacheSize int `json:"requestCacheSize"` // ResponseCacheSize . // ResponseCache is used to deduplicate the response. ResponseCacheSize int `json:"responseCacheSize"` }
Conf is the static configuration of PubSubCollector
type EvalReqReq ¶
type EvalReqReq struct {
// contains filtered or unexported fields
}
type FinalRespHandler ¶
FinalRespHandler is the callback function when the root node receiving a response. It will be called only in the root node. It will be called more than one time when the number of responses is larger than one.
type HostWires ¶
func NewHostWires ¶
func (*HostWires) Disconnected ¶
func (*HostWires) SetListener ¶
func (hw *HostWires) SetListener(wn WireListener)
func (*HostWires) SetMsgHandler ¶
func (hw *HostWires) SetMsgHandler(h MsgHandler)
type InitOpt ¶
InitOpt is options used in NewBasicPubSubCollector
func WithRequestIDGenerator ¶
WithRequestIDGenerator .
type InitOpts ¶
type InitOpts struct { Conf Conf ReqIDFn ReqIDFn MsgIDFn pubsub.MsgIDFn Logger Logger Wires TopicWires }
InitOpts is options used in NewBasicPubSubCollector
func NewInitOpts ¶
NewInitOpts returns initopts
type IntBFS ¶
type IntBFS struct {
// contains filtered or unexported fields
}
IntBFS don't care about topic. IntBFS contains 5 parts: 1. query machanism 2. peer profiles 3. peer ranking 4. distance function 5. random perturbation
func (*IntBFS) HandlePeerDown ¶
func (*IntBFS) HandlePeerUp ¶
type IntBFSCollector ¶
type IntBFSCollector struct {
// contains filtered or unexported fields
}
IntBFSCollector . We used a topic-defined overlay here.
func NewIntBFSCollector ¶
func NewIntBFSCollector(h host.Host, opts ...InitOpt) (*IntBFSCollector, error)
func (*IntBFSCollector) Close ¶
func (ic *IntBFSCollector) Close() error
func (*IntBFSCollector) Join ¶
func (ic *IntBFSCollector) Join(topic string, opts ...JoinOpt) (err error)
func (*IntBFSCollector) Leave ¶
func (ic *IntBFSCollector) Leave(topic string) (err error)
type IntBFSOptions ¶
type IntBFSOptions struct { ProfileFactory RequestHandler ReqIDFn Logger Topic string }
IntBFSOptions .
func MakeDefaultIntBFSOptions ¶
func MakeDefaultIntBFSOptions() *IntBFSOptions
MakeDefaultIntBFSOptions .
type JoinOpt ¶
JoinOpt is optional options in PubSubCollector.Join
func WithProfileFactory ¶
func WithProfileFactory(pf ProfileFactory) JoinOpt
func WithRequestHandler ¶
func WithRequestHandler(rqhandle RequestHandler) JoinOpt
WithRequestHandler registers request handler
type JoinOpts ¶
type JoinOpts struct { RequestHandler ProfileFactory }
JoinOpts is the aggregated options
func NewJoinOptions ¶
NewJoinOptions returns an option collection
type Logger ¶
type Logger interface {
Logf(level, format string, args ...interface{})
}
Logger . level is {"debug", "info", "warn", "error", "fatal"}; format and args are compatable with fmt.Printf.
type PeerSet ¶
type PeerSet struct {
// contains filtered or unexported fields
}
func NewPeerSet ¶
func NewPeerSet() *PeerSet
type Profile ¶
type Profile interface { // Insert(req *Request, resp *Response) // Less(that Profile, req *Request) bool }
Profile stores query profiles
type PubOpt ¶
PubOpt is optional options in PubSubCollector.Publish
func WithFinalRespHandler ¶
func WithFinalRespHandler(handler FinalRespHandler) PubOpt
WithFinalRespHandler registers notifHandler
func WithRequestContext ¶
WithRequestContext adds cancellation or timeout for a request default is withCancel. (ctx will be cancelled when request is closed)
type PubOpts ¶
type PubOpts struct { RequestContext context.Context FinalRespHandle FinalRespHandler }
PubOpts is the aggregated options
func NewPublishOptions ¶
NewPublishOptions returns an option collection
type PubSubCollector ¶
type PubSubCollector interface { // Join the overlay network defined by topic. // Register RequestHandle and ResponseHandle in opts. Join(topic string, opts ...JoinOpt) error // Publish a serialized request. Request should be encasulated in data argument. Publish(topic string, data []byte, opts ...PubOpt) error // Leave the overlay Leave(topic string) error io.Closer }
PubSubCollector is a group communication module on topic-based overlay network. It helps to dispatch request, and wait for corresponding responses. In relay mode, PubSubCollector can also help to reduce the response.
func NewCollector ¶
func NewCollector(h host.Host, opts ...InitOpt) (PubSubCollector, error)
NewCollector creates PubSubCollector.
type PubSubFactory ¶
PubSubFactory initializes a pubsub in Topics
type RelayPubSubCollector ¶
type RelayPubSubCollector struct {
// contains filtered or unexported fields
}
RelayPubSubCollector .
func NewRelayPubSubCollector ¶
func NewRelayPubSubCollector(h host.Host, options ...InitOpt) (r *RelayPubSubCollector, err error)
NewRelayPubSubCollector .
func (*RelayPubSubCollector) Close ¶
func (r *RelayPubSubCollector) Close() (err error)
Close the BasicPubSubCollector.
func (*RelayPubSubCollector) Join ¶
func (r *RelayPubSubCollector) Join(topic string, options ...JoinOpt) (err error)
Join the overlay network defined by topic. Register RequestHandle and ResponseHandle in opts.
func (*RelayPubSubCollector) Leave ¶
func (r *RelayPubSubCollector) Leave(topic string) (err error)
Leave the overlay
type RequestHandler ¶
type RequestHandler func(ctx context.Context, req *Request) *Intermediate
RequestHandler is the callback function when receiving a request. It will be called in every node joined the network. The return value will be sent to the root (directly or relayedly).
type RequestID ¶
RequestID type alias
func DefaultReqIDFn ¶
DefaultReqIDFn returns default ReqIDGenerator. SHA-512 hash function is called in it.
type TopicHandle ¶
TopicHandle is the handle function of subscription. WARNING: DO NOT change msg, if a msg includes multiple topics, they share a message.
type TopicHub ¶
type TopicHub struct {
// contains filtered or unexported fields
}
func NewTopicHub ¶
func NewTopicHub(tw TopicWires) *TopicHub
func (*TopicHub) JoinWithNotifiee ¶
func (th *TopicHub) JoinWithNotifiee(topic string, wn WireListener) error
type TopicOpt ¶
type TopicOpt func(*AsyncPubSub) error
TopicOpt is the option in NewTopics
func WithCustomPubSubFactory ¶
func WithCustomPubSubFactory(psubFact PubSubFactory) TopicOpt
WithCustomPubSubFactory initials the pubsub based on given factory
func WithSelfNotif ¶
WithSelfNotif decides whether a node receives self-published messages. If WithSelfNotif is set to true, the node will receive messages published by itself. Default is set to false.