Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrQueryTimeout is an error returned if the worker doesn't respond // with a valid response to the request within the timeout. ErrQueryTimeout = errors.New("did not get response before timeout") // ErrPeerDisconnected is returned if the worker's peer disconnect // before the query has been answered. ErrPeerDisconnected = errors.New("peer disconnected") // ErrJobCanceled is returned if the job is canceled before the query // has been answered. ErrJobCanceled = errors.New("job canceled") )
var ( // ErrWorkManagerShuttingDown will be returned in case the WorkManager // is in the process of exiting. ErrWorkManagerShuttingDown = errors.New("WorkManager shutting down") )
Functions ¶
func DisableLog ¶
func DisableLog()
DisableLog disables all library log output. Logging output is disabled by default until either UseLogger or SetLogWriter are called.
Types ¶
type Config ¶
type Config struct { // ConnectedPeers is a function that returns a channel where all // connected peers will be sent. It is assumed that all current peers // will be sent imemdiately, and new peers as they connect. // // The returned function closure is called to cancel the subscription. ConnectedPeers func() (<-chan Peer, func(), error) // NewWorker is function closure that should start a new worker. We // make this configurable to easily mock the worker used during tests. NewWorker func(Peer) Worker // Ranking is used to rank the connected peers when determining who to // give work to. Ranking PeerRanking }
Config holds the configuration options for a new WorkManager.
type Dispatcher ¶
type Dispatcher interface { // Query distributes the slice of requests to the set of connected // peers. It returns an error channel where the final result of the // batch of queries will be sent. Responses for the individual queries // should be handled by the response handler of each Request. Query(reqs []*Request, options ...QueryOption) chan error }
Dispatcher is an interface defining the API for dispatching queries to bitcoin peers.
type Peer ¶
type Peer interface { // QueueMessageWithEncoding adds the passed bitcoin message to the peer // send queue. QueueMessageWithEncoding(msg wire.Message, doneChan chan<- struct{}, encoding wire.MessageEncoding) // SubscribeRecvMsg adds a OnRead subscription to the peer. All bitcoin // messages received from this peer will be sent on the returned // channel. A closure is also returned, that should be called to cancel // the subscription. SubscribeRecvMsg() (<-chan wire.Message, func()) // Addr returns the address of this peer. Addr() string // OnDisconnect returns a channel that will be closed when this peer is // disconnected. OnDisconnect() <-chan struct{} }
Peer is the interface that defines the methods needed by the query package to be able to make requests and receive responses from a network peer.
type PeerRanking ¶
type PeerRanking interface { // AddPeer adds a peer to the ranking. AddPeer(peer string) // Reward should be called when the peer has succeeded in a query, // increasing the likelihood that it will be picked for subsequent // queries. Reward(peer string) // Punish should be called when the peer has failed in a query, // decreasing the likelihood that it will be picked for subsequent // queries. Punish(peer string) // Order sorst the slice of peers according to their ranking. Order(peers []string) }
PeerRanking is an interface that must be satisfied by the underlying module that is used to determine which peers to prioritize querios on.
func NewPeerRanking ¶
func NewPeerRanking() PeerRanking
NewPeerRanking returns a new, empty ranking.
type Progress ¶
type Progress struct { // Finished is true if the query was finished as a result of the // received response. Finished bool // Progressed is true if the query made progress towards fully // answering the request as a result of the received response. This is // used for the requests types where more than one response is // expected. Progressed bool }
Progress encloses the result of handling a response for a given Request, determining whether the response did progress the query.
type QueryOption ¶
type QueryOption func(*queryOptions) // nolint:golint
QueryOption is a functional option argument to any of the network query methods, such as GetBlock and GetCFilter (when that resorts to a network query). These are always processed in order, with later options overriding earlier ones.
func Cancel ¶
func Cancel(cancel chan struct{}) QueryOption
Cancel takes a channel that can be closed to indicate that the query should be canceled.
func Encoding ¶
func Encoding(encoding wire.MessageEncoding) QueryOption
Encoding is a query option that allows the caller to set a message encoding for the query messages.
func Timeout ¶
func Timeout(timeout time.Duration) QueryOption
Timeout is a query option that specifies the total time a query is allowed to be tried before it is failed.
type Request ¶
type Request struct { // Req is the message request to send. Req wire.Message // HandleResp is a response handler that will be called for every // message received from the peer that the request was made to. It // should validate the response against the request made, and return a // Progress indicating whether the request was answered by this // particular response. // // NOTE: Since the worker's job queue will be stalled while this method // is running, it should not be doing any expensive operations. It // should validate the response and immediately return the progress. // The response should be handed off to another goroutine for // processing. HandleResp func(req, resp wire.Message, peer string) Progress }
Request is the main struct that defines a bitcoin network query to be sent to connected peers.
type Task ¶
type Task interface { // Index returns this Task's index in the work queue. Index() uint64 }
Task is an interface that has a method for returning their index in the work queue.
type WorkManager ¶
type WorkManager struct {
// contains filtered or unexported fields
}
WorkManager is the main access point for outside callers, and satisfies the QueryAccess API. It receives queries to pass to peers, and schedules them among available workers, orchestrating where to send them.
func New ¶
func New(cfg *Config) *WorkManager
New returns a new WorkManager with the regular worker implementation.
func (*WorkManager) Query ¶
func (w *WorkManager) Query(requests []*Request, options ...QueryOption) chan error
Query distributes the slice of requests to the set of connected peers.
NOTO: Part of the Dispatcher interface.
func (*WorkManager) Stop ¶
func (w *WorkManager) Stop() error
Stop stops the WorkManager and all underlying goroutines.
type Worker ¶
type Worker interface { // Run starts the worker. The worker will supply its peer with queries, // and handle responses from it. Results for any query handled by this // worker will be delivered on the results channel. quit can be closed // to immediately make the worker exit. // // The method is blocking, and should be started in a goroutine. It // will run until the peer disconnects or the worker is told to quit. Run(results chan<- *jobResult, quit <-chan struct{}) // NewJob returns a channel where work that is to be handled by the // worker can be sent. If the worker reads a queryJob from this // channel, it is guaranteed that a response will eventually be // delivered on the results channel (except when the quit channel has // been closed). NewJob() chan<- *queryJob }
Worker is the interface that must be satisfied by workers managed by the WorkManager.