rpc

package module
v0.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 12, 2021 License: MIT Imports: 12 Imported by: 3

README

go-libp2p-pubsub-rpc

Made by Textile Chat on Slack standard-readme compliant

RPC over libp2p pubsub with error handling

Table of Contents

Background

go-libp2p-pubsub-rpc is an extension to go-libp2p-pubsub that provides RPC-like functionality:

  • Request/Response pattern with a peer
  • Request/Multi-Response pattern with multiple peers

Install

go get github.com/textileio/go-libp2p-pubsub-rpc

Usage

package main

import (
	"context"
	"errors"
	"fmt"
	"time"

	core "github.com/libp2p/go-libp2p-core/peer"
	rpc "github.com/textileio/go-libp2p-pubsub-rpc"
	"github.com/textileio/go-libp2p-pubsub-rpc/peer"
)

func main() {

	//
	// PING PONG WITH TWO PEERS
	//

	// create a peer
	p1, _ := peer.New(peer.Config{
		RepoPath:   "/tmp/repo1",
		EnableMDNS: true, // can be used when peers are on the same network
	})

	// create another peer
	p2, _ := peer.New(peer.Config{
		RepoPath:   "/tmp/repo2",
		EnableMDNS: true,
	})

	eventHandler := func(from core.ID, topic string, msg []byte) {
		fmt.Printf("%s event: %s %s\n", topic, from, msg)
	}
	messageHandler := func(from core.ID, topic string, msg []byte) ([]byte, error) {
		fmt.Printf("%s message: %s %s\n", topic, from, msg)
		if string(msg) == "ping" {
			return []byte("pong"), nil
		} else {
			return nil, errors.New("invalid request")
		}
	}

	t1, _ := p1.NewTopic(context.Background(), "mytopic", true) // no need to subscribe if only publishing
	t1.SetEventHandler(eventHandler)                            // event handler reports topic membership events
	t1.SetMessageHandler(messageHandler)                        // message handle is any func that returns a response and error

	t2, _ := p2.NewTopic(context.Background(), "mytopic", true)
	t2.SetEventHandler(eventHandler)
	t2.SetMessageHandler(messageHandler) // using same message handler as peer1, but this could be anything

	time.Sleep(time.Second) // wait for mdns discovery

	// peer1 requests "pong" from peer2
	rc1, _ := t1.Publish(context.Background(), []byte("ping"))
	r1 := <-rc1
	// check r1.Err
	fmt.Printf("peer1 received \"%s\" from %s\n", r1.Data, r1.From)

	// peer2 requests "pong" from peer1
	rc2, _ := t2.Publish(context.Background(), []byte("ping"))
	r2 := <-rc2
	// check r2.Err
	fmt.Printf("peer2 received \"%s\" from %s\n", r2.Data, r2.From)

	// peers can respond with an error
	rc3, _ := t2.Publish(context.Background(), []byte("not a ping"))
	r3 := <-rc3
	fmt.Printf("peer2 received error \"%s\" from %s\n", r3.Err, r3.From)

	//
	// PING PONG WITH MULTIPLE PEERS
	//

	// create another peer
	p3, _ := peer.New(peer.Config{
		RepoPath:   "/tmp/repo3",
		EnableMDNS: true,
	})
	t3, _ := p3.NewTopic(context.Background(), "mytopic", true)
	t3.SetEventHandler(eventHandler)
	t3.SetMessageHandler(messageHandler)

	time.Sleep(time.Second) // wait for mdns discovery

	// peer1 requests "pong" from peer2 and peer3
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	rc, _ := t1.Publish(ctx, []byte("ping"), rpc.WithMultiResponse(true))
	for r := range rc {
		// check r.Err
		fmt.Printf("peer1 received \"%s\" from %s\n", r.Data, r.From)
	}
}

Contributing

Pull requests and bug reports are very welcome ❤️

This repository falls under the Textile Code of Conduct.

Feel free to get in touch by:

Changelog

A changelog is published along with each release.

License

MIT

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (

	// ErrResponseNotReceived indicates a response was not received after publishing a message.
	ErrResponseNotReceived = errors.New("response not received")
)

Functions

This section is empty.

Types

type EventHandler

type EventHandler func(from peer.ID, topic string, msg []byte)

EventHandler is used to receive topic peer events.

type MessageHandler

type MessageHandler func(from peer.ID, topic string, msg []byte) ([]byte, error)

MessageHandler is used to receive topic messages.

type PublishOption

type PublishOption func(*PublishOptions) error

PublishOption defines a Publish option.

func WithIgnoreResponse

func WithIgnoreResponse(enable bool) PublishOption

WithIgnoreResponse indicates whether or not Publish will wait for a response(s) from the receiver(s). Default: disabled.

func WithMultiResponse

func WithMultiResponse(enable bool) PublishOption

WithMultiResponse indicates whether or not Publish will wait for multiple responses before returning. Default: disabled.

func WithPubOpts

func WithPubOpts(opts ...pubsub.PubOpt) PublishOption

WithPubOpts sets native pubsub.PubOpt options.

type PublishOptions

type PublishOptions struct {
	// contains filtered or unexported fields
}

PublishOptions defines options for Publish.

type Response

type Response struct {
	// ID is the cid.Cid of the received message.
	ID string
	// From is the peer.ID of the sender.
	From peer.ID
	// Data is the message data.
	Data []byte
	// Err is an error from the sender.
	Err error
}

Response wraps a message response.

type Topic

type Topic struct {
	// contains filtered or unexported fields
}

Topic provides a nice interface to a libp2p pubsub topic.

func NewTopic

func NewTopic(ctx context.Context, ps *pubsub.PubSub, host peer.ID, topic string, subscribe bool) (*Topic, error)

NewTopic returns a new topic for the host.

func (*Topic) Close

func (t *Topic) Close() error

Close the topic.

func (*Topic) Publish

func (t *Topic) Publish(
	ctx context.Context,
	data []byte,
	opts ...PublishOption,
) (<-chan Response, error)

Publish data. Note that the data may arrive peers duplicated. And as a result, if WithMultiResponse is supplied, the response may be duplicated as well. See PublishOptions for option details.

func (*Topic) SetEventHandler

func (t *Topic) SetEventHandler(handler EventHandler)

SetEventHandler sets a handler func that will be called with peer events.

func (*Topic) SetMessageHandler

func (t *Topic) SetMessageHandler(handler MessageHandler)

SetMessageHandler sets a handler func that will be called with topic messages. A subscription is required for the handler to be called.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL