Documentation ¶
Overview ¶
Package sprout provides types and utilities for implementing client and server programs that speak the Sprout Protocol. The Sprout Protocol is specified here:
https://man.sr.ht/~whereswaldon/arborchat/specifications/sprout.md
NOTE: this package requires using a fork of golang.org/x/crypto, and you must therefore include the following in your `go.mod`:
replace golang.org/x/crypto => github.com/ProtonMail/crypto <version-from-sprout-go's-go.mod>
This package exports several important types.
The Conn type wraps a connection-oriented transport (usually a TCP connection) and provides methods for sending sprout messages and reading sprout messages off of the connection. It has a number of exported fields which are functions that should handle incoming messages. These must be set by the user, and their behavior should conform to the Sprout specification. If using a Conn directly, be sure to invoke the ReadMessage() method properly to ensure that you receive repies.
The Worker type wraps a Conn and provides automatic implementations of both the handler functions for each sprout message and the processing loop that will read new messages and dispatch their handlers. You can send messages on a worker by calling Conn methods via struct embedding. It has an exported embedded Conn.
The Conn type has both synchronous and asynchronous methods for sending messages. The synchronous ones block until they recieve a response or their timeout channel emits a value. Details on how to use these methods follow.
Note: The Send* methods
The non-Async methods block until the get a response or until their timeout is reached. There are several cases in which will return an error:
- There is a network problem sending the message or receiving the response
- There is a problem creating the outbound message or parsing the inbound response
- The status message received in response is not sprout.StatusOk. In this case, the error will be of type sprout.Status
The recommended way to invoke synchronous Send*() methods is with a time.Ticker as the input channel, like so:
err := s.SendVersion(time.NewTicker(time.Second*5).C)
Note: The Send*Async methods
The Async versions of each send operation provide more granular control over blocking behavior. They return a chan interface{}, but will never send anything other than a sprout.Status or sprout.Response over that channel. It is safe to assume that the value will be one of those two.
The Async versions also return a handle for the request called a MessageID. This can be used to cancel the request in the event that it doesn't have a response or the response no longer matters. This can be done manually using the Cancel() method on the Conn type. The synchronous version of each send method handles this for you, but it must be done manually with the async variant.
An example of the appropriate use of an async method:
resultChan, messageID, err := conn.SendQueryAsync(ids) if err != nil { // handle err } select { case data := <-resultChan: switch asConcrete := data.(type) { case sprout.Status: // handle status case sprout.Response: // handle Response } case <-time.NewTicker(time.Second*5).C: conn.Cancel(messageID) // handle timeout }
Index ¶
- Constants
- func LaunchSupervisedWorker(done <-chan struct{}, addr string, s store.ExtendedStore, ...)
- func NodeFromBase64URL(in string) (forest.Node, error)
- type Conn
- func (s *Conn) Cancel(messageID MessageID)
- func (s *Conn) ReadMessage() error
- func (s *Conn) SendAncestry(nodeID *fields.QualifiedHash, levels int, timeoutChan <-chan time.Time) (Response, error)
- func (s *Conn) SendAncestryAsync(nodeID *fields.QualifiedHash, levels int) (<-chan interface{}, MessageID, error)
- func (s *Conn) SendAnnounce(nodes []forest.Node, timeoutChan <-chan time.Time) error
- func (s *Conn) SendAnnounceAsync(nodes []forest.Node) (<-chan interface{}, MessageID, error)
- func (s *Conn) SendLeavesOf(nodeId *fields.QualifiedHash, quantity int, timeoutChan <-chan time.Time) (Response, error)
- func (s *Conn) SendLeavesOfAsync(nodeId *fields.QualifiedHash, quantity int) (<-chan interface{}, MessageID, error)
- func (s *Conn) SendList(nodeType fields.NodeType, quantity int, timeoutChan <-chan time.Time) (Response, error)
- func (s *Conn) SendListAsync(nodeType fields.NodeType, quantity int) (<-chan interface{}, MessageID, error)
- func (s *Conn) SendQuery(nodeIds []*fields.QualifiedHash, timeoutChan <-chan time.Time) (Response, error)
- func (s *Conn) SendQueryAsync(nodeIds ...*fields.QualifiedHash) (<-chan interface{}, MessageID, error)
- func (s *Conn) SendResponse(msgID MessageID, nodes []forest.Node) error
- func (s *Conn) SendStatus(targetMessageID MessageID, errorCode StatusCode) error
- func (s *Conn) SendSubscribe(community *forest.Community, timeoutChan <-chan time.Time) error
- func (s *Conn) SendSubscribeAsync(community *forest.Community) (<-chan interface{}, MessageID, error)
- func (s *Conn) SendSubscribeByID(community *fields.QualifiedHash, timeoutChan <-chan time.Time) error
- func (s *Conn) SendSubscribeByIDAsync(community *fields.QualifiedHash) (<-chan interface{}, MessageID, error)
- func (s *Conn) SendUnsubscribe(community *forest.Community, timeoutChan <-chan time.Time) error
- func (s *Conn) SendUnsubscribeAsync(community *forest.Community) (<-chan interface{}, MessageID, error)
- func (s *Conn) SendUnsubscribeByID(community *fields.QualifiedHash, timeoutChan <-chan time.Time) error
- func (s *Conn) SendUnsubscribeByIDAsync(community *fields.QualifiedHash) (<-chan interface{}, MessageID, error)
- func (s *Conn) SendVersion(timeoutChan <-chan time.Time) error
- func (s *Conn) SendVersionAsync() (<-chan interface{}, MessageID, error)
- type MessageID
- type Response
- type Session
- type Status
- type StatusCode
- type UnsolicitedMessageError
- type Verb
- type Worker
- func (c *Worker) BootstrapLocalStore(maxCommunities int)
- func (c *Worker) EnsureAuthorAvailable(node forest.Node, perRequestTimeout time.Duration) error
- func (c *Worker) HandleNewNode(node forest.Node)
- func (c *Worker) IngestNode(node forest.Node) error
- func (c *Worker) OnAncestry(s *Conn, messageID MessageID, nodeID *fields.QualifiedHash, levels int) error
- func (c *Worker) OnAnnounce(s *Conn, messageID MessageID, nodes []forest.Node) error
- func (c *Worker) OnLeavesOf(s *Conn, messageID MessageID, nodeID *fields.QualifiedHash, quantity int) error
- func (c *Worker) OnList(s *Conn, messageID MessageID, nodeType fields.NodeType, quantity int) error
- func (c *Worker) OnQuery(s *Conn, messageID MessageID, nodeIds []*fields.QualifiedHash) error
- func (c *Worker) OnSubscribe(s *Conn, messageID MessageID, nodeID *fields.QualifiedHash) (err error)
- func (c *Worker) OnUnsubscribe(s *Conn, messageID MessageID, nodeID *fields.QualifiedHash) (err error)
- func (c *Worker) OnVersion(s *Conn, messageID MessageID, major, minor int) error
- func (c *Worker) Run()
- func (c *Worker) SynchronizeFullTree(root forest.Node, maxNodes int, perRequestTimeout time.Duration) error
- Bugs
Constants ¶
const ( CurrentMajor = 0 CurrentMinor = 0 )
Variables ¶
This section is empty.
Functions ¶
func LaunchSupervisedWorker ¶
func LaunchSupervisedWorker(done <-chan struct{}, addr string, s store.ExtendedStore, tlsConfig *tls.Config, logger *log.Logger)
LaunchSupervisedWorker launches a worker in a new goroutine that will connect to `addr` and use `store` as its node storage. It will dial using the provided `tlsConfig`, and it will log errors on the given `logger`.
BUG(whereswaldon): this interface is experimental and likely to change.
Types ¶
type Conn ¶
type Conn struct { // Write side of connection, synchronized with mutex sync.Mutex Conn io.ReadWriteCloser // Read side of connection, buffered for parse simplicity BufferedConn io.Reader // Protocol version in use Major, Minor int // Map from messageID to channel waiting for response PendingStatus sync.Map OnVersion func(s *Conn, messageID MessageID, major, minor int) error OnList func(s *Conn, messageID MessageID, nodeType fields.NodeType, quantity int) error OnQuery func(s *Conn, messageID MessageID, nodeIds []*fields.QualifiedHash) error OnAncestry func(s *Conn, messageID MessageID, nodeID *fields.QualifiedHash, levels int) error OnLeavesOf func(s *Conn, messageID MessageID, nodeID *fields.QualifiedHash, quantity int) error OnSubscribe func(s *Conn, messageID MessageID, nodeID *fields.QualifiedHash) error OnUnsubscribe func(s *Conn, messageID MessageID, nodeID *fields.QualifiedHash) error OnAnnounce func(s *Conn, messageID MessageID, nodes []forest.Node) error // contains filtered or unexported fields }
func NewConn ¶
func NewConn(transport io.ReadWriteCloser) (*Conn, error)
NewConn constructs a sprout connection using the provided transport. Writes to the transport are expected to reach the other end of the sprout connection, and reads should deliver bytes from the other end. The expected use is TCP connections, though other transports are possible.
func (*Conn) Cancel ¶
Cancel deallocates the response structures associated with the protocol message with the given identifier. This is primarily useful when the other end of the connection has not responded in a long time, and we are interested in cleaning up the resources used in waiting for them to respond. An attempt to cancel a message that is not waiting for a response will have no effect.
func (*Conn) ReadMessage ¶
ReadMessage reads and parses a single sprout protocol message off of the connection. It calls the appropriate OnVerb handler function when it parses a message, and it returns any parse errors. It will block when no messages are available.
This method must be called in a loop in order for the sprout connection to be able to receive messages properly. This isn't done automatically by the Conn type in order to provide flexibility on how to handler errors from this method. The Worker type can wrap a Conn to both implement its handler functions and call this method automatically.
This method may return an UnsolicitedMessageError in some cases. This may be due to a local timeout/request cancellation, and should generally not be cause to close the connection entirely.
func (*Conn) SendAncestry ¶
func (s *Conn) SendAncestry(nodeID *fields.QualifiedHash, levels int, timeoutChan <-chan time.Time) (Response, error)
SendAncestry requests the ancestry of the node with the given id. The levels parameter specifies the maximum number of leves of ancestry to return.
func (*Conn) SendAncestryAsync ¶
func (s *Conn) SendAncestryAsync(nodeID *fields.QualifiedHash, levels int) (<-chan interface{}, MessageID, error)
SendAncestry requests the ancestry of the node with the given id. The levels parameter specifies the maximum number of leves of ancestry to return. See the package-level documentation for details on how to use the Async methods.
func (*Conn) SendAnnounce ¶
SendAnnounce announces the existence of the given nodes to the peer on the other end of the sprout connection.
func (*Conn) SendAnnounceAsync ¶
SendAnnounceAsync announces the existence of the given nodes to the peer on the other end of the sprout connection. See the package-level documentation for details on how to use Async methods.
func (*Conn) SendLeavesOf ¶
func (s *Conn) SendLeavesOf(nodeId *fields.QualifiedHash, quantity int, timeoutChan <-chan time.Time) (Response, error)
SendLeavesOf returns up to quantity nodes that are leaves in the tree rooted at the given ID.
func (*Conn) SendLeavesOfAsync ¶
func (s *Conn) SendLeavesOfAsync(nodeId *fields.QualifiedHash, quantity int) (<-chan interface{}, MessageID, error)
SendLeavesOf returns up to quantity nodes that are leaves in the tree rooted at the given ID. For a description of how to use the Async methods, see the package-level documentation.
func (*Conn) SendList ¶
func (s *Conn) SendList(nodeType fields.NodeType, quantity int, timeoutChan <-chan time.Time) (Response, error)
SendList requests a list of recent nodes of a particular node type from the other end of the sprout connection.
func (*Conn) SendListAsync ¶
func (s *Conn) SendListAsync(nodeType fields.NodeType, quantity int) (<-chan interface{}, MessageID, error)
SendListAsync requests a list of recent nodes of a particular node type from the other end of the sprout connection. The requested quantity is the maximum number of nodes that the other end should provide, though it may provide significantly fewer. See the package level documentation for details on how to use the Async methods.
func (*Conn) SendQuery ¶
func (s *Conn) SendQuery(nodeIds []*fields.QualifiedHash, timeoutChan <-chan time.Time) (Response, error)
SendQuery requests the nodes with a list of IDs from the other side of the sprout connection.
func (*Conn) SendQueryAsync ¶
func (s *Conn) SendQueryAsync(nodeIds ...*fields.QualifiedHash) (<-chan interface{}, MessageID, error)
SendQueryAsync requests the nodes with a list of IDs from the other side of the sprout connection. See the package level documentation for details on how to use the Async methods.
func (*Conn) SendStatus ¶
func (s *Conn) SendStatus(targetMessageID MessageID, errorCode StatusCode) error
SendStatus responds to the message with the give targetMessageID with the given status code. It is always synchronous, and will return any error in transmitting the message.
func (*Conn) SendSubscribe ¶
SendSubscribe attempts to add the given community ID to the list of subscribed IDs for this connection. If it succeeds, both peers are required to exchange new nodes for that community using Announce().
func (*Conn) SendSubscribeAsync ¶
func (s *Conn) SendSubscribeAsync(community *forest.Community) (<-chan interface{}, MessageID, error)
SendSubscribeAsync attempts to add the given community ID to the list of subscribed IDs for this connection. If it succeeds, both peers are required to exchange new nodes for that community using Announce(). For details on how to use Async methods, see the package-level documentation.
func (*Conn) SendSubscribeByID ¶
func (s *Conn) SendSubscribeByID(community *fields.QualifiedHash, timeoutChan <-chan time.Time) error
SendSubscribeByID attempts to add the given community ID to the list of subscribed IDs for this connection. If it succeeds, both peers are required to exchange new nodes for that community using Announce().
func (*Conn) SendSubscribeByIDAsync ¶
func (s *Conn) SendSubscribeByIDAsync(community *fields.QualifiedHash) (<-chan interface{}, MessageID, error)
SendSubscribeByIDAsync attempts to add the given community ID to the list of subscribed IDs for this connection. If it succeeds, both peers are required to exchange new nodes for that community using Announce(). For details on how to use Async methods, see the package-level documentation.
func (*Conn) SendUnsubscribe ¶
SendUnsubscribe attempts to add the given community ID to the list of subscribed IDs for this connection. If it succeeds, both peers are required to exchange new nodes for that community using Announce().
func (*Conn) SendUnsubscribeAsync ¶
func (s *Conn) SendUnsubscribeAsync(community *forest.Community) (<-chan interface{}, MessageID, error)
SendUnsubscribeAsync attempts to add the given community ID to the list of subscribed IDs for this connection. If it succeeds, both peers are required to exchange new nodes for that community using Announce(). For details on how to use Async methods, see the package-level documentation.
func (*Conn) SendUnsubscribeByID ¶
func (s *Conn) SendUnsubscribeByID(community *fields.QualifiedHash, timeoutChan <-chan time.Time) error
SendUnsubscribeByID attempts to add the given community ID to the list of subscribed IDs for this connection. If it succeeds, both peers are required to exchange new nodes for that community using Announce().
func (*Conn) SendUnsubscribeByIDAsync ¶
func (s *Conn) SendUnsubscribeByIDAsync(community *fields.QualifiedHash) (<-chan interface{}, MessageID, error)
SendUnsubscribeByIDAsync attempts to add the given community ID to the list of subscribed IDs for this connection. If it succeeds, both peers are required to exchange new nodes for that community using Announce(). For details on how to use Async methods, see the package-level documentation.
func (*Conn) SendVersion ¶
SendVersion notifies the other end of the sprout connection of our supported protocol version number.
func (*Conn) SendVersionAsync ¶
SendVersionAsync notifies the other end of the sprout connection of our supported protocol version number. See the package-level documentation for details on how to use the Async methods properly.
type Session ¶
type Session struct { sync.RWMutex Communities map[*fields.QualifiedHash]struct{} }
Session stores the state of a sprout connection between hosts, which is currently just the subscribed community set.
func NewSession ¶
func NewSession() *Session
func (*Session) IsSubscribed ¶
func (c *Session) IsSubscribed(communityID *fields.QualifiedHash) bool
func (*Session) Subscribe ¶
func (c *Session) Subscribe(communityID *fields.QualifiedHash)
func (*Session) Unsubscribe ¶
func (c *Session) Unsubscribe(communityID *fields.QualifiedHash)
type Status ¶
type Status struct {
Code StatusCode
}
type StatusCode ¶
type StatusCode int
StatusCode represents the status of a sprout protocol message.
const ( StatusOk StatusCode = 0 ErrorMalformed StatusCode = 1 ErrorProtocolTooOld StatusCode = 2 ErrorProtocolTooNew StatusCode = 3 ErrorUnknownNode StatusCode = 4 )
func (StatusCode) String ¶
func (s StatusCode) String() string
String converts the status code into a human-readable error message
type UnsolicitedMessageError ¶
type UnsolicitedMessageError struct { // The ID that the unsolicited message was in response to MessageID }
UnsolicitedMessageError is an error indicating that a sprout peer sent a response or status message with an ID that was unexpected. This could occur when we cancelled waiting on a request (such as a timeout), when the peer has a bug (double response, incorrect target message id in response), or when the peer is misbehaving.
func (UnsolicitedMessageError) Error ¶
func (u UnsolicitedMessageError) Error() string
type Verb ¶
type Verb string
const ( VersionVerb Verb = "version" ListVerb Verb = "list" QueryVerb Verb = "query" AncestryVerb Verb = "ancestry" LeavesOfVerb Verb = "leaves_of" SubscribeVerb Verb = "subscribe" UnsubscribeVerb Verb = "unsubscribe" AnnounceVerb Verb = "announce" ResponseVerb Verb = "response" StatusVerb Verb = "status" )
type Worker ¶
type Worker struct { Done <-chan struct{} DefaultTimeout time.Duration *Conn *log.Logger *Session SubscribableStore store.ExtendedStore // contains filtered or unexported fields }
func (*Worker) BootstrapLocalStore ¶
BootstrapLocalStore is a utility method for loading all available content from the peer on the other end of the sprout connection. It will
- discover all communities
- fetch the signing identities of those communities
- validate and insert those identities and communities into the worker's store
- subscribe to all of those communities
- fetch all leaves of those communities
- fetch the ancestry of each leaf and validate it (fetching identities as necessary), inserting nodes that pass valdiation into the store
func (*Worker) EnsureAuthorAvailable ¶
EnsureAuthorAvailable attempts to ensure that the author of provided node is available within the worker's configured store. It returns nil when the author is available, and returns error if it is passed invalid parameters or if any operations fail.
func (*Worker) HandleNewNode ¶
func (c *Worker) HandleNewNode(node forest.Node)
Asynchronously announce new node if appropriate
func (*Worker) IngestNode ¶
IngestNode makes a best-effort attempt to validate and insert the given node. It will fetch the author (if not already available), then attempt to validate the node. If that validation fails, it will attempt to fetch the node's entire ancestry and the authorship of each ancestry node. It will validate each ancestor and insert them into the local store (if they are not already there), then it will attempt to re-validate the original node after processing its entire ancestry.
It will return the first error during this chain of validations.
func (*Worker) OnAncestry ¶
func (*Worker) OnAnnounce ¶
func (*Worker) OnLeavesOf ¶
func (*Worker) OnSubscribe ¶
func (*Worker) OnUnsubscribe ¶
func (*Worker) SynchronizeFullTree ¶
func (c *Worker) SynchronizeFullTree(root forest.Node, maxNodes int, perRequestTimeout time.Duration) error
SynchronizeFullTree ensures that the entire tree rooted at the provided node is available within the worker's configured store. maxNodes is the number of leaf nodes whose ancestry will be fetched, starting from the most recent.
Notes ¶
Bugs ¶
this interface is experimental and likely to change.