sprout

package module
v0.0.0-...-2b3db28 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 3, 2024 License: Apache-2.0 Imports: 16 Imported by: 3

README

sprout-go

builds.sr.ht status GoDoc

This is an implementation of the Sprout Protocol in Go. It provides methods to send and receive protocol messages in Sprout. Sprout is one part of the Arbor Chat project.

NOTE: this package requires using a fork of golang.org/x/crypto, and you must therefore include the following in your go.mod:

    replace golang.org/x/crypto => github.com/ProtonMail/crypto <version-from-sprout-go's-go.mod>

About Arbor

arbor logo

Arbor is a chat system that makes communication clearer. It explicitly captures context that other platforms ignore, allowing you to understand the relationship between each message and every other message. It also respects its users and focuses on group collaboration.

You can get information about the Arbor project here.

For news about the project, join our mailing list!

Relay

This repo currently contains an example implementation of an Arbor Relay, which is analagous to a "server" in a traditional chat system. You can find it in cmd/relay.

Local testing

If you want to run a local relay to test something, it's very easy!

First, install mkcert so that you can create trusted local TLS certificates.

Then do the following:

mkcert --install # configure your local CA
mkcert localhost 127.0.0.1 ::1 arbor.local # generate a trusted cert for local addresses
mkdir grove # create somewhere to store arbor forest data

# Copy any nodes you want the relay to have into this grove directory.
# To copy from your local sprig installation:
cp ~/.config/sprig/grove/* grove/
# Adapt this as necessary if your history is stored elsewhere.

relay -certpath ./localhost+3.pem -keypath ./localhost+3-key.pem -grovepath ./grove/

You can now connect to this relay with any client on the address localhost:7777, arbor.local:7777, and the other names that you configured in the certificate.

Writing sprout by hand

If you want to examine the behavior of a sprout relay, it is sometimes convenient to directly connect to the relay.

To do this, install socat (it's probably in your package manager). If you want to test against the official arbor relay you can do:

socat openssl:arbor.chat:7117 stdio

You can then type valid sprout protocol messages and see the relay's responses.

Here's an example session that you could replicate:

version 1 0.0

Response:

status 1 0

We advertise our protocol version as 0.0 and the relay agrees to use that version.

list 2 1 3

Response:

response 2 1
SHA512_B32__mw9nEYu_XAgnw0mRRNO60J2bqIOOBmalIXgOqxoKV-o <base64url-node-data>

We request a list of the three most recent communities. We only get one back because the relay currently only knows about one.

leaves_of 3 SHA512_B32__mw9nEYu_XAgnw0mRRNO60J2bqIOOBmalIXgOqxoKV-o 3

Response:

response 3 3
SHA512_B32__S7wybDxauEZEYJCRenemN2WB5woupuHlRU-Gj5eVU8M <base64url-node-data>
SHA512_B32__GITqdhoPqbECv6Sb03zW8H7Ry0M8dScmQwMcrdeoEwI <base64url-node-data>
SHA512_B32___wCB6e6y9cWhjdHGyI4ki-qGa-GVWTmgP8ZtYpIK7O0 <base64url-node-data>

We request three leaves of the one community that exists.

Documentation

Overview

Package sprout provides types and utilities for implementing client and server programs that speak the Sprout Protocol. The Sprout Protocol is specified here:

https://man.sr.ht/~whereswaldon/arborchat/specifications/sprout.md

NOTE: this package requires using a fork of golang.org/x/crypto, and you must therefore include the following in your `go.mod`:

replace golang.org/x/crypto => github.com/ProtonMail/crypto <version-from-sprout-go's-go.mod>

This package exports several important types.

The Conn type wraps a connection-oriented transport (usually a TCP connection) and provides methods for sending sprout messages and reading sprout messages off of the connection. It has a number of exported fields which are functions that should handle incoming messages. These must be set by the user, and their behavior should conform to the Sprout specification. If using a Conn directly, be sure to invoke the ReadMessage() method properly to ensure that you receive repies.

The Worker type wraps a Conn and provides automatic implementations of both the handler functions for each sprout message and the processing loop that will read new messages and dispatch their handlers. You can send messages on a worker by calling Conn methods via struct embedding. It has an exported embedded Conn.

The Conn type has both synchronous and asynchronous methods for sending messages. The synchronous ones block until they recieve a response or their timeout channel emits a value. Details on how to use these methods follow.

Note: The Send* methods

The non-Async methods block until the get a response or until their timeout is reached. There are several cases in which will return an error:

- There is a network problem sending the message or receiving the response

- There is a problem creating the outbound message or parsing the inbound response

- The status message received in response is not sprout.StatusOk. In this case, the error will be of type sprout.Status

The recommended way to invoke synchronous Send*() methods is with a time.Ticker as the input channel, like so:

err := s.SendVersion(time.NewTicker(time.Second*5).C)

Note: The Send*Async methods

The Async versions of each send operation provide more granular control over blocking behavior. They return a chan interface{}, but will never send anything other than a sprout.Status or sprout.Response over that channel. It is safe to assume that the value will be one of those two.

The Async versions also return a handle for the request called a MessageID. This can be used to cancel the request in the event that it doesn't have a response or the response no longer matters. This can be done manually using the Cancel() method on the Conn type. The synchronous version of each send method handles this for you, but it must be done manually with the async variant.

An example of the appropriate use of an async method:

resultChan, messageID, err := conn.SendQueryAsync(ids)
if err != nil {
    // handle err
}
select {
    case data := <-resultChan:
        switch asConcrete := data.(type) {
            case sprout.Status:
                // handle status
            case sprout.Response:
                // handle Response
        }
    case <-time.NewTicker(time.Second*5).C:
        conn.Cancel(messageID)
        // handle timeout
}

Index

Constants

View Source
const (
	CurrentMajor = 0
	CurrentMinor = 0
)

Variables

This section is empty.

Functions

func LaunchSupervisedWorker

func LaunchSupervisedWorker(done <-chan struct{}, addr string, s store.ExtendedStore, tlsConfig *tls.Config, logger *log.Logger)

LaunchSupervisedWorker launches a worker in a new goroutine that will connect to `addr` and use `store` as its node storage. It will dial using the provided `tlsConfig`, and it will log errors on the given `logger`.

BUG(whereswaldon): this interface is experimental and likely to change.

func NodeFromBase64URL

func NodeFromBase64URL(in string) (forest.Node, error)

Types

type Conn

type Conn struct {
	// Write side of connection, synchronized with mutex
	sync.Mutex
	Conn io.ReadWriteCloser

	// Read side of connection, buffered for parse simplicity
	BufferedConn io.Reader

	// Protocol version in use
	Major, Minor int

	// Map from messageID to channel waiting for response
	PendingStatus sync.Map

	OnVersion     func(s *Conn, messageID MessageID, major, minor int) error
	OnList        func(s *Conn, messageID MessageID, nodeType fields.NodeType, quantity int) error
	OnQuery       func(s *Conn, messageID MessageID, nodeIds []*fields.QualifiedHash) error
	OnAncestry    func(s *Conn, messageID MessageID, nodeID *fields.QualifiedHash, levels int) error
	OnLeavesOf    func(s *Conn, messageID MessageID, nodeID *fields.QualifiedHash, quantity int) error
	OnSubscribe   func(s *Conn, messageID MessageID, nodeID *fields.QualifiedHash) error
	OnUnsubscribe func(s *Conn, messageID MessageID, nodeID *fields.QualifiedHash) error
	OnAnnounce    func(s *Conn, messageID MessageID, nodes []forest.Node) error
	// contains filtered or unexported fields
}

func NewConn

func NewConn(transport io.ReadWriteCloser) (*Conn, error)

NewConn constructs a sprout connection using the provided transport. Writes to the transport are expected to reach the other end of the sprout connection, and reads should deliver bytes from the other end. The expected use is TCP connections, though other transports are possible.

func (*Conn) Cancel

func (s *Conn) Cancel(messageID MessageID)

Cancel deallocates the response structures associated with the protocol message with the given identifier. This is primarily useful when the other end of the connection has not responded in a long time, and we are interested in cleaning up the resources used in waiting for them to respond. An attempt to cancel a message that is not waiting for a response will have no effect.

func (*Conn) ReadMessage

func (s *Conn) ReadMessage() error

ReadMessage reads and parses a single sprout protocol message off of the connection. It calls the appropriate OnVerb handler function when it parses a message, and it returns any parse errors. It will block when no messages are available.

This method must be called in a loop in order for the sprout connection to be able to receive messages properly. This isn't done automatically by the Conn type in order to provide flexibility on how to handler errors from this method. The Worker type can wrap a Conn to both implement its handler functions and call this method automatically.

This method may return an UnsolicitedMessageError in some cases. This may be due to a local timeout/request cancellation, and should generally not be cause to close the connection entirely.

func (*Conn) SendAncestry

func (s *Conn) SendAncestry(nodeID *fields.QualifiedHash, levels int, timeoutChan <-chan time.Time) (Response, error)

SendAncestry requests the ancestry of the node with the given id. The levels parameter specifies the maximum number of leves of ancestry to return.

func (*Conn) SendAncestryAsync

func (s *Conn) SendAncestryAsync(nodeID *fields.QualifiedHash, levels int) (<-chan interface{}, MessageID, error)

SendAncestry requests the ancestry of the node with the given id. The levels parameter specifies the maximum number of leves of ancestry to return. See the package-level documentation for details on how to use the Async methods.

func (*Conn) SendAnnounce

func (s *Conn) SendAnnounce(nodes []forest.Node, timeoutChan <-chan time.Time) error

SendAnnounce announces the existence of the given nodes to the peer on the other end of the sprout connection.

func (*Conn) SendAnnounceAsync

func (s *Conn) SendAnnounceAsync(nodes []forest.Node) (<-chan interface{}, MessageID, error)

SendAnnounceAsync announces the existence of the given nodes to the peer on the other end of the sprout connection. See the package-level documentation for details on how to use Async methods.

func (*Conn) SendLeavesOf

func (s *Conn) SendLeavesOf(nodeId *fields.QualifiedHash, quantity int, timeoutChan <-chan time.Time) (Response, error)

SendLeavesOf returns up to quantity nodes that are leaves in the tree rooted at the given ID.

func (*Conn) SendLeavesOfAsync

func (s *Conn) SendLeavesOfAsync(nodeId *fields.QualifiedHash, quantity int) (<-chan interface{}, MessageID, error)

SendLeavesOf returns up to quantity nodes that are leaves in the tree rooted at the given ID. For a description of how to use the Async methods, see the package-level documentation.

func (*Conn) SendList

func (s *Conn) SendList(nodeType fields.NodeType, quantity int, timeoutChan <-chan time.Time) (Response, error)

SendList requests a list of recent nodes of a particular node type from the other end of the sprout connection.

func (*Conn) SendListAsync

func (s *Conn) SendListAsync(nodeType fields.NodeType, quantity int) (<-chan interface{}, MessageID, error)

SendListAsync requests a list of recent nodes of a particular node type from the other end of the sprout connection. The requested quantity is the maximum number of nodes that the other end should provide, though it may provide significantly fewer. See the package level documentation for details on how to use the Async methods.

func (*Conn) SendQuery

func (s *Conn) SendQuery(nodeIds []*fields.QualifiedHash, timeoutChan <-chan time.Time) (Response, error)

SendQuery requests the nodes with a list of IDs from the other side of the sprout connection.

func (*Conn) SendQueryAsync

func (s *Conn) SendQueryAsync(nodeIds ...*fields.QualifiedHash) (<-chan interface{}, MessageID, error)

SendQueryAsync requests the nodes with a list of IDs from the other side of the sprout connection. See the package level documentation for details on how to use the Async methods.

func (*Conn) SendResponse

func (s *Conn) SendResponse(msgID MessageID, nodes []forest.Node) error

func (*Conn) SendStatus

func (s *Conn) SendStatus(targetMessageID MessageID, errorCode StatusCode) error

SendStatus responds to the message with the give targetMessageID with the given status code. It is always synchronous, and will return any error in transmitting the message.

func (*Conn) SendSubscribe

func (s *Conn) SendSubscribe(community *forest.Community, timeoutChan <-chan time.Time) error

SendSubscribe attempts to add the given community ID to the list of subscribed IDs for this connection. If it succeeds, both peers are required to exchange new nodes for that community using Announce().

func (*Conn) SendSubscribeAsync

func (s *Conn) SendSubscribeAsync(community *forest.Community) (<-chan interface{}, MessageID, error)

SendSubscribeAsync attempts to add the given community ID to the list of subscribed IDs for this connection. If it succeeds, both peers are required to exchange new nodes for that community using Announce(). For details on how to use Async methods, see the package-level documentation.

func (*Conn) SendSubscribeByID

func (s *Conn) SendSubscribeByID(community *fields.QualifiedHash, timeoutChan <-chan time.Time) error

SendSubscribeByID attempts to add the given community ID to the list of subscribed IDs for this connection. If it succeeds, both peers are required to exchange new nodes for that community using Announce().

func (*Conn) SendSubscribeByIDAsync

func (s *Conn) SendSubscribeByIDAsync(community *fields.QualifiedHash) (<-chan interface{}, MessageID, error)

SendSubscribeByIDAsync attempts to add the given community ID to the list of subscribed IDs for this connection. If it succeeds, both peers are required to exchange new nodes for that community using Announce(). For details on how to use Async methods, see the package-level documentation.

func (*Conn) SendUnsubscribe

func (s *Conn) SendUnsubscribe(community *forest.Community, timeoutChan <-chan time.Time) error

SendUnsubscribe attempts to add the given community ID to the list of subscribed IDs for this connection. If it succeeds, both peers are required to exchange new nodes for that community using Announce().

func (*Conn) SendUnsubscribeAsync

func (s *Conn) SendUnsubscribeAsync(community *forest.Community) (<-chan interface{}, MessageID, error)

SendUnsubscribeAsync attempts to add the given community ID to the list of subscribed IDs for this connection. If it succeeds, both peers are required to exchange new nodes for that community using Announce(). For details on how to use Async methods, see the package-level documentation.

func (*Conn) SendUnsubscribeByID

func (s *Conn) SendUnsubscribeByID(community *fields.QualifiedHash, timeoutChan <-chan time.Time) error

SendUnsubscribeByID attempts to add the given community ID to the list of subscribed IDs for this connection. If it succeeds, both peers are required to exchange new nodes for that community using Announce().

func (*Conn) SendUnsubscribeByIDAsync

func (s *Conn) SendUnsubscribeByIDAsync(community *fields.QualifiedHash) (<-chan interface{}, MessageID, error)

SendUnsubscribeByIDAsync attempts to add the given community ID to the list of subscribed IDs for this connection. If it succeeds, both peers are required to exchange new nodes for that community using Announce(). For details on how to use Async methods, see the package-level documentation.

func (*Conn) SendVersion

func (s *Conn) SendVersion(timeoutChan <-chan time.Time) error

SendVersion notifies the other end of the sprout connection of our supported protocol version number.

func (*Conn) SendVersionAsync

func (s *Conn) SendVersionAsync() (<-chan interface{}, MessageID, error)

SendVersionAsync notifies the other end of the sprout connection of our supported protocol version number. See the package-level documentation for details on how to use the Async methods properly.

type MessageID

type MessageID int

type Response

type Response struct {
	Nodes []forest.Node
}

type Session

type Session struct {
	sync.RWMutex
	Communities map[*fields.QualifiedHash]struct{}
}

Session stores the state of a sprout connection between hosts, which is currently just the subscribed community set.

func NewSession

func NewSession() *Session

func (*Session) IsSubscribed

func (c *Session) IsSubscribed(communityID *fields.QualifiedHash) bool

func (*Session) Subscribe

func (c *Session) Subscribe(communityID *fields.QualifiedHash)

func (*Session) Unsubscribe

func (c *Session) Unsubscribe(communityID *fields.QualifiedHash)

type Status

type Status struct {
	Code StatusCode
}

func (Status) Error

func (s Status) Error() string

type StatusCode

type StatusCode int

StatusCode represents the status of a sprout protocol message.

const (
	StatusOk            StatusCode = 0
	ErrorMalformed      StatusCode = 1
	ErrorProtocolTooOld StatusCode = 2
	ErrorProtocolTooNew StatusCode = 3
	ErrorUnknownNode    StatusCode = 4
)

func (StatusCode) String

func (s StatusCode) String() string

String converts the status code into a human-readable error message

type UnsolicitedMessageError

type UnsolicitedMessageError struct {
	// The ID that the unsolicited message was in response to
	MessageID
}

UnsolicitedMessageError is an error indicating that a sprout peer sent a response or status message with an ID that was unexpected. This could occur when we cancelled waiting on a request (such as a timeout), when the peer has a bug (double response, incorrect target message id in response), or when the peer is misbehaving.

func (UnsolicitedMessageError) Error

func (u UnsolicitedMessageError) Error() string

type Verb

type Verb string
const (
	VersionVerb     Verb = "version"
	ListVerb        Verb = "list"
	QueryVerb       Verb = "query"
	AncestryVerb    Verb = "ancestry"
	LeavesOfVerb    Verb = "leaves_of"
	SubscribeVerb   Verb = "subscribe"
	UnsubscribeVerb Verb = "unsubscribe"
	AnnounceVerb    Verb = "announce"
	ResponseVerb    Verb = "response"
	StatusVerb      Verb = "status"
)

type Worker

type Worker struct {
	Done           <-chan struct{}
	DefaultTimeout time.Duration
	*Conn
	*log.Logger
	*Session
	SubscribableStore store.ExtendedStore
	// contains filtered or unexported fields
}

func NewWorker

func NewWorker(done <-chan struct{}, conn net.Conn, s store.ExtendedStore) (*Worker, error)

func (*Worker) BootstrapLocalStore

func (c *Worker) BootstrapLocalStore(maxCommunities int)

BootstrapLocalStore is a utility method for loading all available content from the peer on the other end of the sprout connection. It will

  • discover all communities
  • fetch the signing identities of those communities
  • validate and insert those identities and communities into the worker's store
  • subscribe to all of those communities
  • fetch all leaves of those communities
  • fetch the ancestry of each leaf and validate it (fetching identities as necessary), inserting nodes that pass valdiation into the store

func (*Worker) EnsureAuthorAvailable

func (c *Worker) EnsureAuthorAvailable(node forest.Node, perRequestTimeout time.Duration) error

EnsureAuthorAvailable attempts to ensure that the author of provided node is available within the worker's configured store. It returns nil when the author is available, and returns error if it is passed invalid parameters or if any operations fail.

func (*Worker) HandleNewNode

func (c *Worker) HandleNewNode(node forest.Node)

Asynchronously announce new node if appropriate

func (*Worker) IngestNode

func (c *Worker) IngestNode(node forest.Node) error

IngestNode makes a best-effort attempt to validate and insert the given node. It will fetch the author (if not already available), then attempt to validate the node. If that validation fails, it will attempt to fetch the node's entire ancestry and the authorship of each ancestry node. It will validate each ancestor and insert them into the local store (if they are not already there), then it will attempt to re-validate the original node after processing its entire ancestry.

It will return the first error during this chain of validations.

func (*Worker) OnAncestry

func (c *Worker) OnAncestry(s *Conn, messageID MessageID, nodeID *fields.QualifiedHash, levels int) error

func (*Worker) OnAnnounce

func (c *Worker) OnAnnounce(s *Conn, messageID MessageID, nodes []forest.Node) error

func (*Worker) OnLeavesOf

func (c *Worker) OnLeavesOf(s *Conn, messageID MessageID, nodeID *fields.QualifiedHash, quantity int) error

func (*Worker) OnList

func (c *Worker) OnList(s *Conn, messageID MessageID, nodeType fields.NodeType, quantity int) error

func (*Worker) OnQuery

func (c *Worker) OnQuery(s *Conn, messageID MessageID, nodeIds []*fields.QualifiedHash) error

func (*Worker) OnSubscribe

func (c *Worker) OnSubscribe(s *Conn, messageID MessageID, nodeID *fields.QualifiedHash) (err error)

func (*Worker) OnUnsubscribe

func (c *Worker) OnUnsubscribe(s *Conn, messageID MessageID, nodeID *fields.QualifiedHash) (err error)

func (*Worker) OnVersion

func (c *Worker) OnVersion(s *Conn, messageID MessageID, major, minor int) error

func (*Worker) Run

func (c *Worker) Run()

func (*Worker) SynchronizeFullTree

func (c *Worker) SynchronizeFullTree(root forest.Node, maxNodes int, perRequestTimeout time.Duration) error

SynchronizeFullTree ensures that the entire tree rooted at the provided node is available within the worker's configured store. maxNodes is the number of leaf nodes whose ancestry will be fetched, starting from the most recent.

Notes

Bugs

  • this interface is experimental and likely to change.

Directories

Path Synopsis
cmd
relay Module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL