Documentation ¶
Overview ¶
Package collect provides facilities for the Publish/Subscribe/Collect pattern of message propagation and data collections, useful for distributed query.
Index ¶
- Constants
- Variables
- func DefaultMsgIDFn(pmsg *pubsub.PbMessage) string
- type AsyncPubSub
- func (ap *AsyncPubSub) Close() error
- func (ap *AsyncPubSub) LoadTopicItem(topic string, key interface{}) (value interface{}, err error)
- func (ap *AsyncPubSub) Publish(ctx context.Context, topic string, data []byte) (err error)
- func (ap *AsyncPubSub) SetTopicItem(topic string, key interface{}, value interface{}) error
- func (ap *AsyncPubSub) Subscribe(topic string, handle TopicHandle) (err error)
- func (ap *AsyncPubSub) Topics() (out []string)
- func (ap *AsyncPubSub) Unsubscribe(topic string) (err error)
- type BasicPubSubCollector
- func (bpsc *BasicPubSubCollector) Close() (err error)
- func (bpsc *BasicPubSubCollector) Join(topic string, opts ...JoinOpt) (err error)
- func (bpsc *BasicPubSubCollector) Leave(topic string) (err error)
- func (bpsc *BasicPubSubCollector) Publish(topic string, payload []byte, opts ...PubOpt) (err error)
- type Conf
- type FinalRespHandler
- type HostWires
- func (hw *HostWires) ClosedStream(network.Network, network.Stream)
- func (hw *HostWires) Connected(n network.Network, c network.Conn)
- func (hw *HostWires) Disconnected(n network.Network, c network.Conn)
- func (hw *HostWires) ID() peer.ID
- func (hw *HostWires) Listen(network.Network, ma.Multiaddr)
- func (hw *HostWires) ListenClose(network.Network, ma.Multiaddr)
- func (hw *HostWires) Neighbors() []peer.ID
- func (hw *HostWires) OpenedStream(network.Network, network.Stream)
- func (hw *HostWires) SendMsg(to peer.ID, data []byte) error
- func (hw *HostWires) SetListener(wn WireListener)
- func (hw *HostWires) SetMsgHandler(h MsgHandler)
- type InitOpt
- type InitOpts
- type IntBFS
- type IntBFSCollector
- type IntBFSOptions
- type Intermediate
- type JoinOpt
- type JoinOpts
- type Logger
- type Message
- type Msg
- type MsgHandler
- type PeerSet
- type Profile
- type ProfileFactory
- type PubOpt
- type PubOpts
- type PubReq
- type PubSubCollector
- type PubSubFactory
- type RelayPubSubCollector
- type ReqIDFn
- type Request
- type RequestHandler
- type RequestID
- type RequestResult
- type Response
- type TopicHandle
- type TopicHub
- func (th *TopicHub) Close() error
- func (th *TopicHub) HandlePeerDown(p peer.ID, topic string)
- func (th *TopicHub) HandlePeerUp(p peer.ID, topic string)
- func (th *TopicHub) Join(topic string) error
- func (th *TopicHub) JoinWithNotifiee(topic string, wn WireListener) error
- func (th *TopicHub) Leave(topic string) error
- func (th *TopicHub) Wires(topic string) Wires
- type TopicMsgHandler
- type TopicOpt
- type TopicWireListener
- type TopicWires
- type WireListener
- type Wires
Constants ¶
const (
HostWiresProcotolID = "/hostwires/1.0.0"
)
const (
IntBFSProtocolID = protocol.ID("/intbfs/1.0.0")
)
Variables ¶
Functions ¶
func DefaultMsgIDFn ¶
DefaultMsgIDFn should be used with DefaultMsgIDFn.
Types ¶
type AsyncPubSub ¶
type AsyncPubSub struct {
// contains filtered or unexported fields
}
AsyncPubSub encapsulates pubsub, provides async methods to get subscribe messages. AsyncPubSub also manages the joined topics
func NewAsyncPubSub ¶
func NewAsyncPubSub(h host.Host, opts ...TopicOpt) (apsub *AsyncPubSub, err error)
NewAsyncPubSub returns a new Topics instance. If WithCustomPubSubFactory is not set, a default randomsub will be used.
func (*AsyncPubSub) LoadTopicItem ¶
func (ap *AsyncPubSub) LoadTopicItem(topic string, key interface{}) (value interface{}, err error)
LoadTopicItem .
func (*AsyncPubSub) SetTopicItem ¶
func (ap *AsyncPubSub) SetTopicItem(topic string, key interface{}, value interface{}) error
SetTopicItem .
func (*AsyncPubSub) Subscribe ¶
func (ap *AsyncPubSub) Subscribe(topic string, handle TopicHandle) (err error)
Subscribe a topic Subscribe a same topic is ok, but the previous handle will be replaced.
func (*AsyncPubSub) Topics ¶
func (ap *AsyncPubSub) Topics() (out []string)
Topics returns the subscribed topics
func (*AsyncPubSub) Unsubscribe ¶
func (ap *AsyncPubSub) Unsubscribe(topic string) (err error)
Unsubscribe the given topic
type BasicPubSubCollector ¶
type BasicPubSubCollector struct {
// contains filtered or unexported fields
}
BasicPubSubCollector implements of psc.BasicPubSubCollector Interface. It broadcasts request by libp2p.PubSub. When the remote nodes receive the request, they will try to dial the request node and return the response directly.
func NewBasicPubSubCollector ¶
func NewBasicPubSubCollector(h host.Host, options ...InitOpt) (bpsc *BasicPubSubCollector, err error)
NewBasicPubSubCollector returns a new BasicPubSubCollector
func (*BasicPubSubCollector) Close ¶
func (bpsc *BasicPubSubCollector) Close() (err error)
Close the BasicPubSubCollector.
func (*BasicPubSubCollector) Join ¶
func (bpsc *BasicPubSubCollector) Join(topic string, opts ...JoinOpt) (err error)
Join the overlay network defined by topic Join the same topic is allowed here. Rejoin will refresh the requestHandler.
func (*BasicPubSubCollector) Leave ¶
func (bpsc *BasicPubSubCollector) Leave(topic string) (err error)
Leave the overlay. The registered topichandles and responseHandlers will be closed.
type Conf ¶
type Conf struct { // Router is an option to select different router type // Router must be one of `basic`, `relay` and `intbfs` Router string `json:"router"` // ProtocolPrefix is the protocol name prefix ProtocolPrefix string `json:"protocol"` // RequestCacheSize . // RequestCache is used to store the request control message, // which is for response routing. RequestCacheSize int `json:"requestCacheSize"` // ResponseCacheSize . // ResponseCache is used to deduplicate the response. ResponseCacheSize int `json:"responseCacheSize"` // Fanout . Fanout int `json:"fanout"` // RandomFanout . RandomFanout int `json:"randomFanout"` // MaxHitsToSend MaxHitsToSend int `json:"maxHitsToSend"` }
Conf is the static configuration of PubSubCollector
type FinalRespHandler ¶
FinalRespHandler is the callback function when the root node receiving a response. It will be called only in the root node. It will be called more than one time when the number of responses is larger than one.
type HostWires ¶ added in v0.0.9
func NewHostWires ¶ added in v0.0.9
func (*HostWires) ClosedStream ¶ added in v0.0.9
func (*HostWires) Disconnected ¶ added in v0.0.9
func (*HostWires) ListenClose ¶ added in v0.0.9
func (*HostWires) OpenedStream ¶ added in v0.0.9
func (*HostWires) SetListener ¶ added in v0.0.9
func (hw *HostWires) SetListener(wn WireListener)
func (*HostWires) SetMsgHandler ¶ added in v0.0.9
func (hw *HostWires) SetMsgHandler(h MsgHandler)
type InitOpt ¶
InitOpt is options used in NewBasicPubSubCollector
func WithRequestIDGenerator ¶
WithRequestIDGenerator .
type InitOpts ¶
type InitOpts struct { Conf Conf ReqIDFn ReqIDFn MsgIDFn pubsub.MsgIDFn Logger Logger Wires TopicWires }
InitOpts is options used in NewBasicPubSubCollector
func NewInitOpts ¶
NewInitOpts returns initopts
type IntBFS ¶ added in v0.0.9
type IntBFS struct {
// contains filtered or unexported fields
}
IntBFS don't care about topic. IntBFS contains 5 parts: 1. query machanism 2. peer profiles 3. peer ranking 4. distance function 5. random perturbation
func (*IntBFS) HandlePeerDown ¶ added in v0.0.9
func (*IntBFS) HandlePeerUp ¶ added in v0.0.9
type IntBFSCollector ¶ added in v0.0.9
type IntBFSCollector struct {
// contains filtered or unexported fields
}
IntBFSCollector . We used a topic-defined overlay here.
func NewIntBFSCollector ¶ added in v0.0.9
func NewIntBFSCollector(h host.Host, opts ...InitOpt) (*IntBFSCollector, error)
func (*IntBFSCollector) Close ¶ added in v0.0.9
func (ic *IntBFSCollector) Close() error
func (*IntBFSCollector) Join ¶ added in v0.0.9
func (ic *IntBFSCollector) Join(topic string, opts ...JoinOpt) (err error)
func (*IntBFSCollector) Leave ¶ added in v0.0.9
func (ic *IntBFSCollector) Leave(topic string) (err error)
type IntBFSOptions ¶ added in v0.0.9
type IntBFSOptions struct { Fanout int RandomFanout int MaxHitsToSend int ProfileFactory RequestHandler ReqIDFn Logger Topic string }
IntBFSOptions .
func MakeDefaultIntBFSOptions ¶ added in v0.0.9
func MakeDefaultIntBFSOptions() *IntBFSOptions
MakeDefaultIntBFSOptions .
type JoinOpt ¶
JoinOpt is optional options in PubSubCollector.Join
func WithProfileFactory ¶ added in v0.0.9
func WithProfileFactory(pf ProfileFactory) JoinOpt
func WithRequestHandler ¶
func WithRequestHandler(rqhandle RequestHandler) JoinOpt
WithRequestHandler registers request handler
type JoinOpts ¶
type JoinOpts struct { RequestHandler ProfileFactory }
JoinOpts is the aggregated options
func NewJoinOptions ¶
NewJoinOptions returns an option collection
type Logger ¶
type Logger interface {
Logf(level, format string, args ...interface{})
}
Logger . level is {"debug", "info", "warn", "error", "fatal"}; format and args are compatable with fmt.Printf.
type PeerSet ¶
type PeerSet struct {
// contains filtered or unexported fields
}
func NewPeerSet ¶
func NewPeerSet() *PeerSet
type Profile ¶ added in v0.0.9
type Profile interface { // Insert(req *Request, resp *Response) // Less(that Profile, req *Request) bool }
Profile stores query profiles
type ProfileFactory ¶ added in v0.0.9
type ProfileFactory func() Profile
ProfileFactory generates a Profile
type PubOpt ¶
PubOpt is optional options in PubSubCollector.Publish
func WithFinalRespHandler ¶
func WithFinalRespHandler(handler FinalRespHandler) PubOpt
WithFinalRespHandler registers notifHandler
func WithRequestContext ¶
WithRequestContext adds cancellation or timeout for a request default is withCancel. (ctx will be cancelled when request is closed)
type PubOpts ¶
type PubOpts struct { RequestContext context.Context FinalRespHandle FinalRespHandler }
PubOpts is the aggregated options
func NewPublishOptions ¶
NewPublishOptions returns an option collection
type PubSubCollector ¶
type PubSubCollector interface { // Join the overlay network defined by topic. // Register RequestHandle and ResponseHandle in opts. Join(topic string, opts ...JoinOpt) error // Publish a serialized request. Request should be encasulated in data argument. Publish(topic string, data []byte, opts ...PubOpt) error // Leave the overlay Leave(topic string) error io.Closer }
PubSubCollector is a group communication module on topic-based overlay network. It helps to dispatch request, and wait for corresponding responses. In relay mode, PubSubCollector can also help to reduce the response.
func NewCollector ¶ added in v0.0.9
func NewCollector(h host.Host, opts ...InitOpt) (PubSubCollector, error)
NewCollector creates PubSubCollector.
type PubSubFactory ¶
PubSubFactory initializes a pubsub in Topics
type RelayPubSubCollector ¶
type RelayPubSubCollector struct {
// contains filtered or unexported fields
}
RelayPubSubCollector .
func NewRelayPubSubCollector ¶
func NewRelayPubSubCollector(h host.Host, options ...InitOpt) (r *RelayPubSubCollector, err error)
NewRelayPubSubCollector .
func (*RelayPubSubCollector) Close ¶
func (r *RelayPubSubCollector) Close() (err error)
Close the BasicPubSubCollector.
func (*RelayPubSubCollector) Join ¶
func (r *RelayPubSubCollector) Join(topic string, options ...JoinOpt) (err error)
Join the overlay network defined by topic. Register RequestHandle and ResponseHandle in opts.
func (*RelayPubSubCollector) Leave ¶
func (r *RelayPubSubCollector) Leave(topic string) (err error)
Leave the overlay
type RequestHandler ¶
type RequestHandler func(ctx context.Context, req *Request) *Intermediate
RequestHandler is the callback function when receiving a request. It will be called in every node joined the network. The return value will be sent to the root (directly or relayedly).
type RequestID ¶
RequestID type alias
func DefaultReqIDFn ¶
DefaultReqIDFn returns default ReqIDGenerator. fnv hash function is called in it.
type RequestResult ¶ added in v0.0.12
type RequestResult struct {
// contains filtered or unexported fields
}
type TopicHandle ¶
TopicHandle is the handle function of subscription. WARNING: DO NOT change msg, if a msg includes multiple topics, they share a message.
type TopicHub ¶ added in v0.0.9
type TopicHub struct {
// contains filtered or unexported fields
}
func NewTopicHub ¶ added in v0.0.9
func NewTopicHub(tw TopicWires) *TopicHub
func (*TopicHub) HandlePeerDown ¶ added in v0.0.9
func (*TopicHub) HandlePeerUp ¶ added in v0.0.9
func (*TopicHub) JoinWithNotifiee ¶ added in v0.0.9
func (th *TopicHub) JoinWithNotifiee(topic string, wn WireListener) error
type TopicMsgHandler ¶ added in v0.0.9
type TopicMsgHandler = pubsub.TopicMsgHandler
TopicMsgHandler .
type TopicOpt ¶
type TopicOpt func(*AsyncPubSub) error
TopicOpt is the option in NewTopics
func WithCustomPubSubFactory ¶
func WithCustomPubSubFactory(psubFact PubSubFactory) TopicOpt
WithCustomPubSubFactory initials the pubsub based on given factory
func WithSelfNotif ¶
WithSelfNotif decides whether a node receives self-published messages. If WithSelfNotif is set to true, the node will receive messages published by itself. Default is set to false.
type TopicWireListener ¶ added in v0.0.9
type TopicWireListener = pubsub.TopicWireListener
TopicWireListener .