rpc

package
v0.0.0-...-ff78b6e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 6, 2023 License: GPL-3.0 Imports: 38 Imported by: 0

Documentation

Overview

Package rpc implements bi-directional JSON-RPC 2.0 on multiple transports.

It provides access to the exported methods of an object across a network or other I/O connection. After creating a server or client instance, objects can be registered to make them visible as 'services'. Exported methods that follow specific conventions can be called remotely. It also has support for the publish/subscribe pattern.

RPC Methods

Methods that satisfy the following criteria are made available for remote access:

  • method must be exported
  • method returns 0, 1 (response or error) or 2 (response and error) values

An example method:

func (s *CalcService) Add(a, b int) (int, error)

When the returned error isn't nil the returned integer is ignored and the error is sent back to the client. Otherwise the returned integer is sent back to the client.

Optional arguments are supported by accepting pointer values as arguments. E.g. if we want to do the addition in an optional finite field we can accept a mod argument as pointer value.

func (s *CalcService) Add(a, b int, mod *int) (int, error)

This RPC method can be called with 2 integers and a null value as third argument. In that case the mod argument will be nil. Or it can be called with 3 integers, in that case mod will be pointing to the given third argument. Since the optional argument is the last argument the RPC package will also accept 2 integers as arguments. It will pass the mod argument as nil to the RPC method.

The server offers the ServeCodec method which accepts a ServerCodec instance. It will read requests from the codec, process the request and sends the response back to the client using the codec. The server can execute requests concurrently. Responses can be sent back to the client out of order.

An example server which uses the JSON codec:

 type CalculatorService struct {}

 func (s *CalculatorService) Add(a, b int) int {
	return a + b
 }

 func (s *CalculatorService) Div(a, b int) (int, error) {
	if b == 0 {
		return 0, errors.New("divide by zero")
	}
	return a/b, nil
 }

 calculator := new(CalculatorService)
 server := NewServer()
 server.RegisterName("calculator", calculator)
 l, _ := net.ListenUnix("unix", &net.UnixAddr{Net: "unix", Name: "/tmp/calculator.sock"})
 server.ServeListener(l)

Subscriptions

The package also supports the publish subscribe pattern through the use of subscriptions. A method that is considered eligible for notifications must satisfy the following criteria:

  • method must be exported
  • first method argument type must be context.Context
  • method must have return types (rpc.Subscription, error)

An example method:

func (s *BlockChainService) NewBlocks(ctx context.Context) (rpc.Subscription, error) {
	...
}

When the service containing the subscription method is registered to the server, for example under the "blockchain" namespace, a subscription is created by calling the "blockchain_subscribe" method.

Subscriptions are deleted when the user sends an unsubscribe request or when the connection which was used to create the subscription is closed. This can be initiated by the client and server. The server will close the connection for any write error.

For more information about subscriptions, see https://github.com/nebojsa94/erigon/wiki/RPC-PUB-SUB.

Reverse Calls

In any method handler, an instance of rpc.Client can be accessed through the ClientFromContext method. Using this client instance, server-to-client method calls can be performed on the RPC connection.

Index

Examples

Constants

View Source
const (
	LatestExecutedBlockNumber = BlockNumber(-5)
	FinalizedBlockNumber      = BlockNumber(-4)
	SafeBlockNumber           = BlockNumber(-3)
	PendingBlockNumber        = BlockNumber(-2)
	LatestBlockNumber         = BlockNumber(-1)
	EarliestBlockNumber       = BlockNumber(0)
)
View Source
const MetadataApi = "rpc"

Variables

View Source
var (
	ErrClientQuit                = errors.New("client is closed")
	ErrNoResult                  = errors.New("no result in JSON-RPC response")
	ErrSubscriptionQueueOverflow = errors.New("subscription queue overflow")
)
View Source
var (
	// ErrNotificationsUnsupported is returned when the connection doesn't support notifications
	ErrNotificationsUnsupported = errors.New("notifications not supported")
	// ErrNotificationNotFound is returned when the notification for the given id is not found
	ErrSubscriptionNotFound = errors.New("subscription not found")
)
View Source
var (
	LatestExecutedBlock = LatestExecutedBlockNumber.AsBlockReference()
	FinalizedBlock      = FinalizedBlockNumber.AsBlockReference()
	SafeBlock           = SafeBlockNumber.AsBlockReference()
	PendingBlock        = PendingBlockNumber.AsBlockReference()
	LatestBlock         = LatestBlockNumber.AsBlockReference()
	EarliestBlock       = EarliestBlockNumber.AsBlockReference()
)

Functions

func CheckJwtSecret

func CheckJwtSecret(w http.ResponseWriter, r *http.Request, jwtSecret []byte) bool

func HandleError

func HandleError(err error, stream *jsoniter.Stream) error

func PreAllocateRPCMetricLabels

func PreAllocateRPCMetricLabels(apiList []API)

PreAllocateRPCMetricLabels pre-allocates labels for all rpc methods inside API List

Types

type API

type API struct {
	Namespace string      // namespace under which the rpc methods of Service are exposed
	Version   string      // api version for DApp's
	Service   interface{} // receiver instance which holds the methods
	Public    bool        // indication if the methods must be considered safe for public use
}

API describes the set of methods offered over the RPC interface

type AllowList

type AllowList map[string]struct{}

func (*AllowList) MarshalJSON

func (a *AllowList) MarshalJSON() ([]byte, error)

MarshalJSON returns *m as the JSON encoding of

func (*AllowList) UnmarshalJSON

func (a *AllowList) UnmarshalJSON(data []byte) error

type BatchElem

type BatchElem struct {
	Method string
	Args   []interface{}
	// The result is unmarshaled into this field. Result must be set to a
	// non-nil pointer value of the desired type, otherwise the response will be
	// discarded.
	Result interface{}
	// Error is set if the server returns an error for this request, or if
	// unmarshaling into Result fails. It is not set for I/O errors.
	Error error
}

BatchElem is an element in a batch request.

type BlockNumber

type BlockNumber int64

func AsBlockNumber

func AsBlockNumber(no interface{}) BlockNumber

func (BlockNumber) AsBlockReference

func (bn BlockNumber) AsBlockReference() BlockReference

func (BlockNumber) Int64

func (bn BlockNumber) Int64() int64

func (BlockNumber) MarshalText

func (bn BlockNumber) MarshalText() ([]byte, error)

func (BlockNumber) String

func (bn BlockNumber) String() string

func (BlockNumber) Uint64

func (bn BlockNumber) Uint64() uint64

func (*BlockNumber) UnmarshalJSON

func (bn *BlockNumber) UnmarshalJSON(data []byte) error

UnmarshalJSON parses the given JSON fragment into a BlockNumber. It supports: - "latest", "earliest", "pending", "safe", or "finalized" as string arguments - the block number Returned errors: - an invalid block number error when the given argument isn't a known strings - an out of range error when the given block number is either too little or too large

type BlockNumberOrHash

type BlockNumberOrHash struct {
	BlockNumber      *BlockNumber    `json:"blockNumber,omitempty"`
	BlockHash        *libcommon.Hash `json:"blockHash,omitempty"`
	RequireCanonical bool            `json:"requireCanonical,omitempty"`
}

func BlockNumberOrHashWithHash

func BlockNumberOrHashWithHash(hash libcommon.Hash, canonical bool) BlockNumberOrHash

func BlockNumberOrHashWithNumber

func BlockNumberOrHashWithNumber(blockNr BlockNumber) BlockNumberOrHash

func (*BlockNumberOrHash) Hash

func (bnh *BlockNumberOrHash) Hash() (libcommon.Hash, bool)

func (*BlockNumberOrHash) Number

func (bnh *BlockNumberOrHash) Number() (BlockNumber, bool)

func (*BlockNumberOrHash) UnmarshalJSON

func (bnh *BlockNumberOrHash) UnmarshalJSON(data []byte) error

type BlockReference

type BlockReference BlockNumberOrHash

func AsBlockReference

func AsBlockReference(ref interface{}) BlockReference

func HashBlockReference

func HashBlockReference(hash libcommon.Hash, canonical ...bool) BlockReference

func IntBlockReference

func IntBlockReference(blockNr *big.Int) BlockReference

func Uint64BlockReference

func Uint64BlockReference(blockNr uint64) BlockReference

func (BlockReference) Hash

func (br BlockReference) Hash() (libcommon.Hash, bool)

func (BlockReference) Number

func (br BlockReference) Number() (BlockNumber, bool)

func (BlockReference) String

func (br BlockReference) String() string

func (*BlockReference) UnmarshalJSON

func (br *BlockReference) UnmarshalJSON(data []byte) error

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client represents a connection to an RPC server.

func ClientFromContext

func ClientFromContext(ctx context.Context, logger log.Logger) (*Client, bool)

Client retrieves the client from the context, if any. This can be used to perform 'reverse calls' in a handler method.

func Dial

func Dial(rawurl string, logger log.Logger) (*Client, error)

Dial creates a new client for the given URL.

The currently supported URL schemes are "http", "https", "ws" and "wss". If rawurl is a file name with no URL scheme, a local socket connection is established using UNIX domain sockets on supported platforms and named pipes on Windows. If you want to configure transport options, use DialHTTP, DialWebsocket or DialIPC instead.

For websocket connections, the origin is set to the local host name.

The client reconnects automatically if the connection is lost.

func DialContext

func DialContext(ctx context.Context, rawurl string, logger log.Logger) (*Client, error)

DialContext creates a new RPC client, just like Dial.

The context is used to cancel or time out the initial connection establishment. It does not affect subsequent interactions with the client.

func DialHTTP

func DialHTTP(endpoint string, logger log.Logger) (*Client, error)

DialHTTP creates a new RPC client that connects to an RPC server over HTTP.

func DialHTTPWithClient

func DialHTTPWithClient(endpoint string, client *http.Client, logger log.Logger) (*Client, error)

DialHTTPWithClient creates a new RPC client that connects to an RPC server over HTTP using the provided HTTP Client.

func DialIO

func DialIO(ctx context.Context, in io.Reader, out io.Writer, logger log.Logger) (*Client, error)

DialIO creates a client which uses the given IO channels

func DialInProc

func DialInProc(handler *Server, logger log.Logger) *Client

DialInProc attaches an in-process connection to the given RPC server.

func DialStdIO

func DialStdIO(ctx context.Context, logger log.Logger) (*Client, error)

DialStdIO creates a client on stdin/stdout.

func DialWebsocket

func DialWebsocket(ctx context.Context, endpoint, origin string, logger log.Logger) (*Client, error)

DialWebsocket creates a new RPC client that communicates with a JSON-RPC server that is listening on the given endpoint.

The context is used for the initial connection establishment. It does not affect subsequent interactions with the client.

func DialWebsocketWithDialer

func DialWebsocketWithDialer(ctx context.Context, endpoint, origin string, dialer websocket.Dialer, logger log.Logger) (*Client, error)

DialWebsocketWithDialer creates a new RPC client that communicates with a JSON-RPC server that is listening on the given endpoint using the provided dialer.

func (*Client) BatchCall

func (c *Client) BatchCall(b []BatchElem) error

BatchCall sends all given requests as a single batch and waits for the server to return a response for all of them.

In contrast to Call, BatchCall only returns I/O errors. Any error specific to a request is reported through the Error field of the corresponding BatchElem.

Note that batch calls may not be executed atomically on the server side.

func (*Client) BatchCallContext

func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error

BatchCall sends all given requests as a single batch and waits for the server to return a response for all of them. The wait duration is bounded by the context's deadline.

In contrast to CallContext, BatchCallContext only returns errors that have occurred while sending the request. Any error specific to a request is reported through the Error field of the corresponding BatchElem.

Note that batch calls may not be executed atomically on the server side.

func (*Client) Call

func (c *Client) Call(result interface{}, method string, args ...interface{}) error

Call performs a JSON-RPC call with the given arguments and unmarshals into result if no error occurred.

The result must be a pointer so that package json can unmarshal into it. You can also pass nil, in which case the result is ignored.

func (*Client) CallContext

func (c *Client) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error

CallContext performs a JSON-RPC call with the given arguments. If the context is canceled before the call has successfully returned, CallContext returns immediately.

The result must be a pointer so that package json can unmarshal into it. You can also pass nil, in which case the result is ignored.

func (*Client) Close

func (c *Client) Close()

Close closes the client, aborting any in-flight requests.

func (*Client) EthSubscribe

func (c *Client) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*ClientSubscription, error)

EthSubscribe registers a subscripion under the "eth" namespace.

func (*Client) Notify

func (c *Client) Notify(ctx context.Context, method string, args ...interface{}) error

Notify sends a notification, i.e. a method call that doesn't expect a response.

func (*Client) RegisterName

func (c *Client) RegisterName(name string, receiver interface{}) error

RegisterName creates a service for the given receiver type under the given name. When no methods on the given receiver match the criteria to be either a RPC method or a subscription an error is returned. Otherwise a new service is created and added to the service collection this client provides to the server.

func (*Client) SetHeader

func (c *Client) SetHeader(key, value string)

SetHeader adds a custom HTTP header to the client's requests. This method only works for clients using HTTP, it doesn't have any effect for clients using another transport.

func (*Client) Subscribe

func (c *Client) Subscribe(ctx context.Context, namespace string, channel interface{}, args ...interface{}) (*ClientSubscription, error)

Subscribe calls the "<namespace>_subscribe" method with the given arguments, registering a subscription. Server notifications for the subscription are sent to the given channel. The element type of the channel must match the expected type of content returned by the subscription.

The context argument cancels the RPC request that sets up the subscription but has no effect on the subscription after Subscribe has returned.

Slow subscribers will be dropped eventually. Client buffers up to 20000 notifications before considering the subscriber dead. The subscription Err channel will receive ErrSubscriptionQueueOverflow. Use a sufficiently large buffer on the channel or ensure that the channel usually has at least one reader to prevent this issue.

func (*Client) SupportedModules

func (c *Client) SupportedModules() (map[string]string, error)

SupportedModules calls the rpc_modules method, retrieving the list of APIs that are available on the server.

type ClientSubscription

type ClientSubscription struct {
	// contains filtered or unexported fields
}

ClientSubscription is a subscription established through the Client's Subscribe or EthSubscribe methods.

Example
package main

import (
	"context"
	"fmt"
	"github.com/nebojsa94/erigon/erigon-lib/common/hexutil"
	"time"

	"github.com/ledgerwatch/log/v3"
	"github.com/nebojsa94/erigon/rpc"
)

// In this example, our client wishes to track the latest 'block number'
// known to the server. The server supports two methods:
//
// eth_getBlockByNumber("latest", {})
//    returns the latest block object.
//
// eth_subscribe("newHeads")
//    creates a subscription which fires block objects when new blocks arrive.

type Block struct {
	Number *hexutil.Big
}

func main() {
	// Connect the client.
	logger := log.New()
	client, _ := rpc.Dial("ws://127.0.0.1:8545", logger)
	subch := make(chan Block)

	// Ensure that subch receives the latest block.
	go func() {
		for i := 0; ; i++ {
			if i > 0 {
				time.Sleep(2 * time.Second)
			}
			subscribeBlocks(client, subch)
		}
	}()

	// Print events from the subscription as they arrive.
	for block := range subch {
		fmt.Println("latest block:", block.Number)
	}
}

// subscribeBlocks runs in its own goroutine and maintains
// a subscription for new blocks.
func subscribeBlocks(client *rpc.Client, subch chan Block) {
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()

	// Subscribe to new blocks.
	sub, err := client.EthSubscribe(ctx, subch, "newHeads")
	if err != nil {
		fmt.Println("subscribe error:", err)
		return
	}

	// The connection is established now.
	// Update the channel with the current block.
	var lastBlock Block
	err = client.CallContext(ctx, &lastBlock, "eth_getBlockByNumber", "latest", false)
	if err != nil {
		fmt.Println("can't get latest block:", err)
		return
	}
	subch <- lastBlock

	// The subscription will deliver events to the channel. Wait for the
	// subscription to end for any reason, then loop around to re-establish
	// the connection.
	fmt.Println("connection lost: ", <-sub.Err())
}
Output:

func (*ClientSubscription) Err

func (sub *ClientSubscription) Err() <-chan error

Err returns the subscription error channel. The intended use of Err is to schedule resubscription when the client connection is closed unexpectedly.

The error channel receives a value when the subscription has ended due to an error. The received error is nil if Close has been called on the underlying client and no other error has occurred.

The error channel is closed when Unsubscribe is called on the subscription.

func (*ClientSubscription) Unsubscribe

func (sub *ClientSubscription) Unsubscribe()

Unsubscribe unsubscribes the notification and closes the error channel. It can safely be called more than once.

type CodecOption deprecated

type CodecOption int

CodecOption specifies which type of messages a codec supports.

Deprecated: this option is no longer honored by Server.

const (
	// OptionMethodInvocation is an indication that the codec supports RPC method calls
	OptionMethodInvocation CodecOption = 1 << iota

	// OptionSubscriptions is an indication that the codec supports RPC notifications
	OptionSubscriptions = 1 << iota // support pub sub
)

type Conn

type Conn interface {
	io.ReadWriteCloser
	SetWriteDeadline(time.Time) error
}

Conn is a subset of the methods of net.Conn which are sufficient for ServerCodec.

type ConnRemoteAddr

type ConnRemoteAddr interface {
	RemoteAddr() string
}

ConnRemoteAddr wraps the RemoteAddr operation, which returns a description of the peer address of a connection. If a Conn also implements ConnRemoteAddr, this description is used in log messages.

type CustomError

type CustomError struct {
	Code    int
	Message string
}

func (*CustomError) Error

func (e *CustomError) Error() string

func (*CustomError) ErrorCode

func (e *CustomError) ErrorCode() int

type DataError

type DataError interface {
	Error() string          // returns the message
	ErrorData() interface{} // returns the error data
}

A DataError contains some data in addition to the error message.

type DecimalOrHex

type DecimalOrHex uint64

DecimalOrHex unmarshals a non-negative decimal or hex parameter into a uint64.

func (*DecimalOrHex) UnmarshalJSON

func (dh *DecimalOrHex) UnmarshalJSON(data []byte) error

UnmarshalJSON implements json.Unmarshaler.

type Error

type Error interface {
	Error() string  // returns the message
	ErrorCode() int // returns the code
}

Error wraps RPC errors, which contain an error code in addition to the message.

type ForbiddenList

type ForbiddenList map[string]struct{}

type ID

type ID string

ID defines a pseudo random number that is used to identify RPC subscriptions.

func NewID

func NewID() ID

NewID returns a new, random ID.

type InvalidParamsError

type InvalidParamsError struct{ Message string }

unable to decode supplied params, or invalid parameters

func (*InvalidParamsError) Error

func (e *InvalidParamsError) Error() string

func (*InvalidParamsError) ErrorCode

func (e *InvalidParamsError) ErrorCode() int

type Notifier

type Notifier struct {
	// contains filtered or unexported fields
}

Notifier is tied to a RPC connection that supports subscriptions. Server callbacks use the notifier to send notifications.

func NotifierFromContext

func NotifierFromContext(ctx context.Context) (*Notifier, bool)

NotifierFromContext returns the Notifier value stored in ctx, if any.

func (*Notifier) Closed

func (n *Notifier) Closed() <-chan interface{}

Closed returns a channel that is closed when the RPC connection is closed. Deprecated: use subscription error channel

func (*Notifier) CreateSubscription

func (n *Notifier) CreateSubscription() *Subscription

CreateSubscription returns a new subscription that is coupled to the RPC connection. By default subscriptions are inactive and notifications are dropped until the subscription is marked as active. This is done by the RPC server after the subscription ID is send to the client.

func (*Notifier) Notify

func (n *Notifier) Notify(id ID, data interface{}) error

Notify sends a notification to the client with the given data as payload. If an error occurs the RPC connection is closed and the error is returned.

type RPCService

type RPCService struct {
	// contains filtered or unexported fields
}

RPCService gives meta information about the server. e.g. gives information about the loaded modules.

func (*RPCService) Modules

func (s *RPCService) Modules() map[string]string

Modules returns the list of RPC services with their version number

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is an RPC server.

func NewServer

func NewServer(batchConcurrency uint, traceRequests, disableStreaming bool, logger log.Logger, rpcSlowLogThreshold time.Duration) *Server

NewServer creates a new server instance with no registered handlers.

func (*Server) RegisterName

func (s *Server) RegisterName(name string, receiver interface{}) error

RegisterName creates a service for the given receiver type under the given name. When no methods on the given receiver match the criteria to be either a RPC method or a subscription an error is returned. Otherwise a new service is created and added to the service collection this server provides to clients.

func (*Server) ServeCodec

func (s *Server) ServeCodec(codec ServerCodec, options CodecOption)

ServeCodec reads incoming requests from codec, calls the appropriate callback and writes the response back using the given codec. It will block until the codec is closed or the server is stopped. In either case the codec is closed.

Note that codec options are no longer supported.

func (*Server) ServeHTTP

func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP serves JSON-RPC requests over HTTP.

func (*Server) ServeListener

func (s *Server) ServeListener(l net.Listener) error

ServeListener accepts connections on l, serving JSON-RPC on them.

func (*Server) SetAllowList

func (s *Server) SetAllowList(allowList AllowList)

SetAllowList sets the allow list for methods that are handled by this server

func (*Server) SetBatchLimit

func (s *Server) SetBatchLimit(limit int)

SetBatchLimit sets limit of number of requests in a batch

func (*Server) Stop

func (s *Server) Stop()

Stop stops reading new requests, waits for stopPendingRequestTimeout to allow pending requests to finish, then closes all codecs which will cancel pending requests and subscriptions.

func (*Server) WebsocketHandler

func (s *Server) WebsocketHandler(allowedOrigins []string, jwtSecret []byte, compression bool, logger log.Logger) http.Handler

WebsocketHandler returns a handler that serves JSON-RPC to WebSocket connections.

allowedOrigins should be a comma-separated list of allowed origin URLs. To allow connections with any origin, pass "*".

type ServerCodec

type ServerCodec interface {
	ReadBatch() (msgs []*jsonrpcMessage, isBatch bool, err error)
	Close()
	// contains filtered or unexported methods
}

ServerCodec implements reading, parsing and writing RPC messages for the server side of a RPC session. Implementations must be go-routine safe since the codec can be called in multiple go-routines concurrently.

func NewCodec

func NewCodec(conn Conn) ServerCodec

NewCodec creates a codec on the given connection. If conn implements ConnRemoteAddr, log messages will use it to include the remote address of the connection.

func NewFuncCodec

func NewFuncCodec(conn deadlineCloser, encode, decode func(v interface{}) error) ServerCodec

NewFuncCodec creates a codec which uses the given functions to read and write. If conn implements ConnRemoteAddr, log messages will use it to include the remote address of the connection.

func NewWebsocketCodec

func NewWebsocketCodec(conn *websocket.Conn) ServerCodec

type Subscription

type Subscription struct {
	ID ID
	// contains filtered or unexported fields
}

A Subscription is created by a notifier and tied to that notifier. The client can use this subscription to wait for an unsubscribe request for the client, see Err().

func (*Subscription) Err

func (s *Subscription) Err() <-chan error

Err returns a channel that is closed when the client send an unsubscribe request.

func (*Subscription) MarshalJSON

func (s *Subscription) MarshalJSON() ([]byte, error)

MarshalJSON marshals a subscription as its ID.

type Timestamp

type Timestamp uint64

func (Timestamp) TurnIntoUint64

func (ts Timestamp) TurnIntoUint64() uint64

func (*Timestamp) UnmarshalJSON

func (ts *Timestamp) UnmarshalJSON(data []byte) error

type UnsupportedForkError

type UnsupportedForkError struct{ Message string }

mismatch between the Engine API method version and the fork

func (*UnsupportedForkError) Error

func (e *UnsupportedForkError) Error() string

func (*UnsupportedForkError) ErrorCode

func (e *UnsupportedForkError) ErrorCode() int

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL