proxyd

package module
v0.0.0-...-eefeb8b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 15, 2024 License: MIT Imports: 46 Imported by: 0

README

rpc-proxy

This tool implements proxyd, an RPC request router and proxy. It does the following things:

  1. Whitelists RPC methods.
  2. Routes RPC methods to groups of backend services.
  3. Automatically retries failed backend requests.
  4. Track backend consensus (latest, safe, finalized blocks), peer count and sync state.
  5. Re-write requests and responses to enforce consensus.
  6. Load balance requests across backend services.
  7. Cache immutable responses from backends.
  8. Provides metrics to measure request latency, error rates, and the like.

Usage

Run make proxyd to build the binary. No additional dependencies are necessary.

To configure proxyd for use, you'll need to create a configuration file to define your proxy backends and routing rules. Check out example.config.toml for how to do this alongside a full list of all options with commentary.

Once you have a config file, start the daemon via proxyd <path-to-config>.toml.

Consensus awareness

Starting on v4.0.0, proxyd is aware of the consensus state of its backends. This helps minimize chain reorgs experienced by clients.

To enable this behavior, you must set consensus_aware value to true in the backend group.

When consensus awareness is enabled, proxyd will poll the backends for their states and resolve a consensus group based on:

  • the common ancestor latest block, i.e. if a backend is experiencing a fork, the fork won't be visible to the clients
  • the lowest safe block
  • the lowest finalized block
  • peer count
  • sync state

The backend group then acts as a round-robin load balancer distributing traffic equally across healthy backends in the consensus group, increasing the availability of the proxy.

A backend is considered healthy if it meets the following criteria:

  • not banned
  • avg 1-min moving window error rate ≤ configurable threshold
  • avg 1-min moving window latency ≤ configurable threshold
  • peer count ≥ configurable threshold
  • latest block lag ≤ configurable threshold
  • last state update ≤ configurable threshold
  • not currently syncing

When a backend is experiencing inconsistent consensus, high error rates or high latency, the backend will be banned for a configurable amount of time (default 5 minutes) and won't receive any traffic during this period.

Tag rewrite

When consensus awareness is enabled, proxyd will enforce the consensus state transparently for all the clients.

For example, if a client requests the eth_getBlockByNumber method with the latest tag, proxyd will rewrite the request to use the resolved latest block from the consensus group and forward it to the backend.

The following request methods are rewritten:

  • eth_getLogs
  • eth_newFilter
  • eth_getBalance
  • eth_getCode
  • eth_getTransactionCount
  • eth_call
  • eth_getStorageAt
  • eth_getBlockTransactionCountByNumber
  • eth_getUncleCountByBlockNumber
  • eth_getBlockByNumber
  • eth_getTransactionByBlockNumberAndIndex
  • eth_getUncleByBlockNumberAndIndex
  • debug_getRawReceipts

And eth_blockNumber response is overridden with current block consensus.

Cacheable methods

Cache use Redis and can be enabled for the following immutable methods:

  • eth_chainId
  • net_version
  • eth_getBlockTransactionCountByHash
  • eth_getUncleCountByBlockHash
  • eth_getBlockByHash
  • eth_getTransactionByBlockHashAndIndex
  • eth_getUncleByBlockHashAndIndex
  • debug_getRawReceipts (block hash only)

Meta method consensus_getReceipts

To support backends with different specifications in the same backend group, proxyd exposes a convenient method to fetch receipts abstracting away what specific backend will serve the request.

Each backend specifies their preferred method to fetch receipts with consensus_receipts_target config, which will be translated from consensus_getReceipts.

This method takes a blockNumberOrHash (i.e. tag|qty|hash) and returns the receipts for all transactions in the block.

Request example

{
  "jsonrpc":"2.0",
  "id": 1,
  "params": ["0xc6ef2fc5426d6ad6fd9e2a26abeab0aa2411b7ab17f30a99d3cb96aed1d1055b"]
}

It currently supports translation to the following targets:

  • debug_getRawReceipts(blockOrHash) (default)
  • alchemy_getTransactionReceipts(blockOrHash)
  • parity_getBlockReceipts(blockOrHash)
  • eth_getBlockReceipts(blockOrHash)

The selected target is returned in the response, in a wrapped result.

Response example

{
  "jsonrpc": "2.0",
  "id": 1,
  "result": {
    "method": "debug_getRawReceipts",
    "result": {
      // the actual raw result from backend
    }
  }
}

See op-node receipt fetcher.

Metrics

See metrics.go for a list of all available metrics.

The metrics port is configurable via the metrics.port and metrics.host keys in the config.

Adding Backend SSL Certificates in Docker

The Docker image runs on Alpine Linux. If you get SSL errors when connecting to a backend within Docker, you may need to add additional certificates to Alpine's certificate store. To do this, bind mount the certificate bundle into a file in /usr/local/share/ca-certificates. The entrypoint.sh script will then update the store with whatever is in the ca-certificates directory prior to starting proxyd.

Documentation

Index

Constants

View Source
const (
	JSONRPCVersion       = "2.0"
	JSONRPCErrorInternal = -32000
)
View Source
const (
	MetricsNamespace = "proxyd"

	RPCRequestSourceHTTP = "http"
	RPCRequestSourceWS   = "ws"

	BackendProxyd = "proxyd"
	SourceClient  = "client"
	SourceBackend = "backend"
	MethodUnknown = "unknown"
)
View Source
const (
	ContextKeyAuth               = "authorization"
	ContextKeyReqID              = "req_id"
	ContextKeyXForwardedFor      = "x_forwarded_for"
	ContextKeyOpTxProxyAuth      = "op_txproxy_auth"
	DefaultOpTxProxyAuthHeader   = "X-Optimism-Signature"
	DefaultMaxBatchRPCCallsLimit = 100
	MaxBatchRPCCallsHardLimit    = 1000
)
View Source
const ConsensusGetReceiptsMethod = "consensus_getReceipts"
View Source
const (
	DefaultPollerInterval = 1 * time.Second
)
View Source
const ReceiptsTargetAlchemyGetTransactionReceipts = "alchemy_getTransactionReceipts"
View Source
const ReceiptsTargetDebugGetRawReceipts = "debug_getRawReceipts"
View Source
const ReceiptsTargetEthGetTransactionReceipts = "eth_getBlockReceipts"
View Source
const ReceiptsTargetParityGetTransactionReceipts = "parity_getBlockReceipts"

Variables

View Source
var (
	ErrParseErr = &RPCErr{
		Code:          -32700,
		Message:       "parse error",
		HTTPErrorCode: 400,
	}
	ErrInternal = &RPCErr{
		Code:          JSONRPCErrorInternal,
		Message:       "internal error",
		HTTPErrorCode: 500,
	}
	ErrMethodNotWhitelisted = &RPCErr{
		Code:          notFoundRpcError,
		Message:       "rpc method is not whitelisted",
		HTTPErrorCode: 403,
	}
	ErrBackendOffline = &RPCErr{
		Code:          JSONRPCErrorInternal - 10,
		Message:       "backend offline",
		HTTPErrorCode: 503,
	}
	ErrNoBackends = &RPCErr{
		Code:          JSONRPCErrorInternal - 11,
		Message:       "no backend is currently healthy to serve traffic",
		HTTPErrorCode: 503,
	}
	ErrBackendOverCapacity = &RPCErr{
		Code:          JSONRPCErrorInternal - 12,
		Message:       "backend is over capacity",
		HTTPErrorCode: 429,
	}
	ErrBackendBadResponse = &RPCErr{
		Code:          JSONRPCErrorInternal - 13,
		Message:       "backend returned an invalid response",
		HTTPErrorCode: 500,
	}
	ErrTooManyBatchRequests = &RPCErr{
		Code:    JSONRPCErrorInternal - 14,
		Message: "too many RPC calls in batch request",
	}
	ErrGatewayTimeout = &RPCErr{
		Code:          JSONRPCErrorInternal - 15,
		Message:       "gateway timeout",
		HTTPErrorCode: 504,
	}
	ErrOverRateLimit = &RPCErr{
		Code:          JSONRPCErrorInternal - 16,
		Message:       "over rate limit",
		HTTPErrorCode: 429,
	}
	ErrOverSenderRateLimit = &RPCErr{
		Code:          JSONRPCErrorInternal - 17,
		Message:       "sender is over rate limit",
		HTTPErrorCode: 429,
	}
	ErrNotHealthy = &RPCErr{
		Code:          JSONRPCErrorInternal - 18,
		Message:       "backend is currently not healthy to serve traffic",
		HTTPErrorCode: 503,
	}
	ErrBlockOutOfRange = &RPCErr{
		Code:          JSONRPCErrorInternal - 19,
		Message:       "block is out of range",
		HTTPErrorCode: 400,
	}

	ErrRequestBodyTooLarge = &RPCErr{
		Code:          JSONRPCErrorInternal - 21,
		Message:       "request body too large",
		HTTPErrorCode: 413,
	}

	ErrBackendResponseTooLarge = &RPCErr{
		Code:          JSONRPCErrorInternal - 20,
		Message:       "backend response too large",
		HTTPErrorCode: 500,
	}

	ErrBackendUnexpectedJSONRPC = errors.New("backend returned an unexpected JSON-RPC response")

	ErrConsensusGetReceiptsCantBeBatched = errors.New("consensus_getReceipts cannot be batched")
	ErrConsensusGetReceiptsInvalidTarget = errors.New("unsupported consensus_receipts_target")
)
View Source
var (
	ErrRewriteBlockOutOfRange = errors.New("block is out of range")
	ErrRewriteRangeTooLarge   = errors.New("block range is too large")
)
View Source
var ErrLimitReaderOverLimit = errors.New("over read limit")
View Source
var MillisecondDurationBuckets = []float64{1, 10, 50, 100, 500, 1000, 5000, 10000, 100000}
View Source
var NoopFrontendRateLimiter = &noopFrontendRateLimiter{}
View Source
var PayloadSizeBuckets = []float64{10, 50, 100, 500, 1000, 5000, 10000, 100000, 1000000}

Functions

func CheckRedisConnection

func CheckRedisConnection(client *redis.Client) error

func CreateTLSClient

func CreateTLSClient(ca string) (*tls.Config, error)

func GetAuthCtx

func GetAuthCtx(ctx context.Context) string

func GetOpTxProxyAuthHeader

func GetOpTxProxyAuthHeader(ctx context.Context) string

func GetReqID

func GetReqID(ctx context.Context) string

func GetXForwardedFor

func GetXForwardedFor(ctx context.Context) string

func IsBatch

func IsBatch(raw []byte) bool

func IsValidID

func IsValidID(id json.RawMessage) bool

func LimitReader

func LimitReader(r io.Reader, n int64) io.Reader

func MaybeRecordErrorsInRPCRes

func MaybeRecordErrorsInRPCRes(ctx context.Context, backendName string, reqs []*RPCReq, resBatch []*RPCRes)

func MaybeRecordSpecialRPCError

func MaybeRecordSpecialRPCError(ctx context.Context, backendName, method string, rpcErr *RPCErr)

func NewRedisClient

func NewRedisClient(url string) (*redis.Client, error)

func ParseBatchRPCReq

func ParseBatchRPCReq(body []byte) ([]json.RawMessage, error)

func ParseKeyPair

func ParseKeyPair(crt, key string) (tls.Certificate, error)

func ReadFromEnvOrConfig

func ReadFromEnvOrConfig(value string) (string, error)

func RecordBackendFinalizedBlock

func RecordBackendFinalizedBlock(b *Backend, blockNumber hexutil.Uint64)

func RecordBackendGroupFallbacks

func RecordBackendGroupFallbacks(bg *BackendGroup, name string, fallback bool)

func RecordBackendGroupMulticallCompletion

func RecordBackendGroupMulticallCompletion(bg *BackendGroup, backendName string, error string)

func RecordBackendGroupMulticallRequest

func RecordBackendGroupMulticallRequest(bg *BackendGroup, backendName string)

func RecordBackendLatestBlock

func RecordBackendLatestBlock(b *Backend, blockNumber hexutil.Uint64)

func RecordBackendNetworkErrorRateSlidingWindow

func RecordBackendNetworkErrorRateSlidingWindow(b *Backend, rate float64)

func RecordBackendNetworkLatencyAverageSlidingWindow

func RecordBackendNetworkLatencyAverageSlidingWindow(b *Backend, avgLatency time.Duration)

func RecordBackendSafeBlock

func RecordBackendSafeBlock(b *Backend, blockNumber hexutil.Uint64)

func RecordBackendUnexpectedBlockTags

func RecordBackendUnexpectedBlockTags(b *Backend, unexpected bool)

func RecordBatchRPCError

func RecordBatchRPCError(ctx context.Context, backendName string, reqs []*RPCReq, err error)

func RecordBatchRPCForward

func RecordBatchRPCForward(ctx context.Context, backendName string, reqs []*RPCReq, source string)

func RecordBatchSize

func RecordBatchSize(size int)

func RecordCacheError

func RecordCacheError(method string)

func RecordCacheHit

func RecordCacheHit(method string)

func RecordCacheMiss

func RecordCacheMiss(method string)

func RecordConsensusBackendBanned

func RecordConsensusBackendBanned(b *Backend, banned bool)

func RecordConsensusBackendInSync

func RecordConsensusBackendInSync(b *Backend, inSync bool)

func RecordConsensusBackendPeerCount

func RecordConsensusBackendPeerCount(b *Backend, peerCount uint64)

func RecordConsensusBackendUpdateDelay

func RecordConsensusBackendUpdateDelay(b *Backend, lastUpdate time.Time)

func RecordGroupConsensusCount

func RecordGroupConsensusCount(group *BackendGroup, count int)

func RecordGroupConsensusError

func RecordGroupConsensusError(group *BackendGroup, label string, err error)

func RecordGroupConsensusFilteredCount

func RecordGroupConsensusFilteredCount(group *BackendGroup, count int)

func RecordGroupConsensusFinalizedBlock

func RecordGroupConsensusFinalizedBlock(group *BackendGroup, blockNumber hexutil.Uint64)

func RecordGroupConsensusHAFinalizedBlock

func RecordGroupConsensusHAFinalizedBlock(group *BackendGroup, leader string, blockNumber hexutil.Uint64)

func RecordGroupConsensusHALatestBlock

func RecordGroupConsensusHALatestBlock(group *BackendGroup, leader string, blockNumber hexutil.Uint64)

func RecordGroupConsensusHASafeBlock

func RecordGroupConsensusHASafeBlock(group *BackendGroup, leader string, blockNumber hexutil.Uint64)

func RecordGroupConsensusLatestBlock

func RecordGroupConsensusLatestBlock(group *BackendGroup, blockNumber hexutil.Uint64)

func RecordGroupConsensusSafeBlock

func RecordGroupConsensusSafeBlock(group *BackendGroup, blockNumber hexutil.Uint64)

func RecordGroupTotalCount

func RecordGroupTotalCount(group *BackendGroup, count int)

func RecordHealthyCandidates

func RecordHealthyCandidates(b *BackendGroup, candidates int)

func RecordRPCError

func RecordRPCError(ctx context.Context, backendName, method string, err error)

func RecordRPCForward

func RecordRPCForward(ctx context.Context, backendName, method, source string)

func RecordRedisError

func RecordRedisError(source string)

func RecordRequestPayloadSize

func RecordRequestPayloadSize(ctx context.Context, payloadSize int)

func RecordResponsePayloadSize

func RecordResponsePayloadSize(ctx context.Context, payloadSize int)

func RecordUnserviceableRequest

func RecordUnserviceableRequest(ctx context.Context, source string)

func RecordWSMessage

func RecordWSMessage(ctx context.Context, backendName, source string)

func SetLogLevel

func SetLogLevel(logLevel slog.Leveler)

func ValidateRPCReq

func ValidateRPCReq(req *RPCReq) error

Types

type Backend

type Backend struct {
	Name string
	// contains filtered or unexported fields
}

func NewBackend

func NewBackend(
	name string,
	rpcURL string,
	wsURL string,
	rpcSemaphore *semaphore.Weighted,
	opts ...BackendOpt,
) *Backend

func (*Backend) ClearSlidingWindows

func (b *Backend) ClearSlidingWindows()

func (*Backend) ErrorRate

func (b *Backend) ErrorRate() (errorRate float64)

ErrorRate returns the instant error rate of the backend

func (*Backend) Forward

func (b *Backend) Forward(ctx context.Context, reqs []*RPCReq, isBatch bool) ([]*RPCRes, error)

func (*Backend) ForwardRPC

func (b *Backend) ForwardRPC(ctx context.Context, res *RPCRes, id string, method string, params ...any) error

ForwardRPC makes a call directly to a backend and populate the response into `res`

func (*Backend) IsDegraded

func (b *Backend) IsDegraded() bool

IsDegraded checks if the backend is serving traffic in a degraded state (i.e. used as a last resource)

func (*Backend) IsHealthy

func (b *Backend) IsHealthy() bool

IsHealthy checks if the backend is able to serve traffic, based on dynamic parameters

func (*Backend) Override

func (b *Backend) Override(opts ...BackendOpt)

func (*Backend) ProxyWS

func (b *Backend) ProxyWS(clientConn *websocket.Conn, methodWhitelist *StringSet) (*WSProxier, error)

type BackendConfig

type BackendConfig struct {
	Username         string            `toml:"username"`
	Password         string            `toml:"password"`
	RPCURL           string            `toml:"rpc_url"`
	WSURL            string            `toml:"ws_url"`
	WSPort           int               `toml:"ws_port"`
	MaxRPS           int               `toml:"max_rps"`
	MaxWSConns       int               `toml:"max_ws_conns"`
	CAFile           string            `toml:"ca_file"`
	ClientCertFile   string            `toml:"client_cert_file"`
	ClientKeyFile    string            `toml:"client_key_file"`
	StripTrailingXFF bool              `toml:"strip_trailing_xff"`
	Headers          map[string]string `toml:"headers"`

	Weight int `toml:"weight"`

	ConsensusSkipPeerCountCheck bool   `toml:"consensus_skip_peer_count"`
	ConsensusForcedCandidate    bool   `toml:"consensus_forced_candidate"`
	ConsensusReceiptsTarget     string `toml:"consensus_receipts_target"`
}

type BackendGroup

type BackendGroup struct {
	Name             string
	Backends         []*Backend
	WeightedRouting  bool
	Consensus        *ConsensusPoller
	FallbackBackends map[string]bool
	// contains filtered or unexported fields
}

func (*BackendGroup) ExecuteMulticall

func (bg *BackendGroup) ExecuteMulticall(ctx context.Context, rpcReqs []*RPCReq) *BackendGroupRPCResponse

Note: rpcReqs should only contain 1 request of 'sendRawTransactions'

func (*BackendGroup) Fallbacks

func (bg *BackendGroup) Fallbacks() []*Backend

func (*BackendGroup) Forward

func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, string, error)

NOTE: BackendGroup Forward contains the log for balancing with consensus aware

func (*BackendGroup) ForwardRequestToBackendGroup

func (bg *BackendGroup) ForwardRequestToBackendGroup(
	rpcReqs []*RPCReq,
	backends []*Backend,
	ctx context.Context,
	isBatch bool,
) *BackendGroupRPCResponse

func (*BackendGroup) GetRoutingStrategy

func (bg *BackendGroup) GetRoutingStrategy() RoutingStrategy

func (*BackendGroup) MulticallRequest

func (bg *BackendGroup) MulticallRequest(backend *Backend, rpcReqs []*RPCReq, wg *sync.WaitGroup, ctx context.Context, ch chan *multicallTuple)

func (*BackendGroup) OverwriteConsensusResponses

func (bg *BackendGroup) OverwriteConsensusResponses(rpcReqs []*RPCReq, overriddenResponses []*indexedReqRes, rewrittenReqs []*RPCReq) ([]*RPCReq, []*indexedReqRes)

func (*BackendGroup) Primaries

func (bg *BackendGroup) Primaries() []*Backend

func (*BackendGroup) ProcessMulticallResponses

func (bg *BackendGroup) ProcessMulticallResponses(ch chan *multicallTuple, ctx context.Context) *BackendGroupRPCResponse

func (*BackendGroup) ProxyWS

func (bg *BackendGroup) ProxyWS(ctx context.Context, clientConn *websocket.Conn, methodWhitelist *StringSet) (*WSProxier, error)

func (*BackendGroup) Shutdown

func (bg *BackendGroup) Shutdown()

type BackendGroupConfig

type BackendGroupConfig struct {
	Backends []string `toml:"backends"`

	WeightedRouting bool `toml:"weighted_routing"`

	RoutingStrategy RoutingStrategy `toml:"routing_strategy"`

	MulticallRPCErrorCheck bool `toml:"multicall_rpc_error_check"`

	/*
		Deprecated: Use routing_strategy config to create a consensus_aware proxyd instance
	*/
	ConsensusAware          bool         `toml:"consensus_aware"`
	ConsensusAsyncHandler   string       `toml:"consensus_handler"`
	ConsensusPollerInterval TOMLDuration `toml:"consensus_poller_interval"`

	ConsensusBanPeriod          TOMLDuration `toml:"consensus_ban_period"`
	ConsensusMaxUpdateThreshold TOMLDuration `toml:"consensus_max_update_threshold"`
	ConsensusMaxBlockLag        uint64       `toml:"consensus_max_block_lag"`
	ConsensusMaxBlockRange      uint64       `toml:"consensus_max_block_range"`
	ConsensusMinPeerCount       int          `toml:"consensus_min_peer_count"`

	ConsensusHA                  bool         `toml:"consensus_ha"`
	ConsensusHAHeartbeatInterval TOMLDuration `toml:"consensus_ha_heartbeat_interval"`
	ConsensusHALockPeriod        TOMLDuration `toml:"consensus_ha_lock_period"`
	ConsensusHARedis             RedisConfig  `toml:"consensus_ha_redis"`

	Fallbacks []string `toml:"fallbacks"`
}

func (*BackendGroupConfig) ValidateRoutingStrategy

func (b *BackendGroupConfig) ValidateRoutingStrategy(bgName string) bool

type BackendGroupRPCResponse

type BackendGroupRPCResponse struct {
	RPCRes   []*RPCRes
	ServedBy string
	// contains filtered or unexported fields
}

type BackendGroupsConfig

type BackendGroupsConfig map[string]*BackendGroupConfig

type BackendOpt

type BackendOpt func(b *Backend)

func WithBasicAuth

func WithBasicAuth(username, password string) BackendOpt

func WithConsensusForcedCandidate

func WithConsensusForcedCandidate(forcedCandidate bool) BackendOpt

func WithConsensusReceiptTarget

func WithConsensusReceiptTarget(receiptsTarget string) BackendOpt

func WithConsensusSkipPeerCountCheck

func WithConsensusSkipPeerCountCheck(skipPeerCountCheck bool) BackendOpt

func WithHeaders

func WithHeaders(headers map[string]string) BackendOpt

func WithIntermittentNetworkErrorSlidingWindow

func WithIntermittentNetworkErrorSlidingWindow(sw *sw.AvgSlidingWindow) BackendOpt

func WithMaxDegradedLatencyThreshold

func WithMaxDegradedLatencyThreshold(maxDegradedLatencyThreshold time.Duration) BackendOpt

func WithMaxErrorRateThreshold

func WithMaxErrorRateThreshold(maxErrorRateThreshold float64) BackendOpt

func WithMaxLatencyThreshold

func WithMaxLatencyThreshold(maxLatencyThreshold time.Duration) BackendOpt

func WithMaxRPS

func WithMaxRPS(maxRPS int) BackendOpt

func WithMaxResponseSize

func WithMaxResponseSize(size int64) BackendOpt

func WithMaxRetries

func WithMaxRetries(retries int) BackendOpt

func WithMaxWSConns

func WithMaxWSConns(maxConns int) BackendOpt

func WithOutOfServiceDuration

func WithOutOfServiceDuration(interval time.Duration) BackendOpt

func WithProxydIP

func WithProxydIP(ip string) BackendOpt

func WithStrippedTrailingXFF

func WithStrippedTrailingXFF() BackendOpt

func WithTLSConfig

func WithTLSConfig(tlsConfig *tls.Config) BackendOpt

func WithTimeout

func WithTimeout(timeout time.Duration) BackendOpt

func WithWeight

func WithWeight(weight int) BackendOpt

type BackendOptions

type BackendOptions struct {
	ResponseTimeoutSeconds      int          `toml:"response_timeout_seconds"`
	MaxResponseSizeBytes        int64        `toml:"max_response_size_bytes"`
	MaxRetries                  int          `toml:"max_retries"`
	OutOfServiceSeconds         int          `toml:"out_of_service_seconds"`
	MaxDegradedLatencyThreshold TOMLDuration `toml:"max_degraded_latency_threshold"`
	MaxLatencyThreshold         TOMLDuration `toml:"max_latency_threshold"`
	MaxErrorRateThreshold       float64      `toml:"max_error_rate_threshold"`
}

type BackendsConfig

type BackendsConfig map[string]*BackendConfig

type BatchConfig

type BatchConfig struct {
	MaxSize      int    `toml:"max_size"`
	ErrorMessage string `toml:"error_message"`
}

type BlockHashOrNumberParameter

type BlockHashOrNumberParameter struct {
	BlockHash   *common.Hash     `json:"blockHash"`
	BlockNumber *rpc.BlockNumber `json:"blockNumber"`
}

BlockHashOrNumberParameter is a non-conventional wrapper used by alchemy_getTransactionReceipts

type Cache

type Cache interface {
	Get(ctx context.Context, key string) (string, error)
	Put(ctx context.Context, key string, value string) error
}

type CacheConfig

type CacheConfig struct {
	Enabled bool         `toml:"enabled"`
	TTL     TOMLDuration `toml:"ttl"`
}

type Config

type Config struct {
	WSBackendGroup        string                `toml:"ws_backend_group"`
	Server                ServerConfig          `toml:"server"`
	Cache                 CacheConfig           `toml:"cache"`
	Redis                 RedisConfig           `toml:"redis"`
	Metrics               MetricsConfig         `toml:"metrics"`
	RateLimit             RateLimitConfig       `toml:"rate_limit"`
	BackendOptions        BackendOptions        `toml:"backend"`
	Backends              BackendsConfig        `toml:"backends"`
	BatchConfig           BatchConfig           `toml:"batch"`
	Authentication        map[string]string     `toml:"authentication"`
	BackendGroups         BackendGroupsConfig   `toml:"backend_groups"`
	RPCMethodMappings     map[string]string     `toml:"rpc_method_mappings"`
	WSMethodWhitelist     []string              `toml:"ws_method_whitelist"`
	WhitelistErrorMessage string                `toml:"whitelist_error_message"`
	SenderRateLimit       SenderRateLimitConfig `toml:"sender_rate_limit"`
}

type ConsensusAsyncHandler

type ConsensusAsyncHandler interface {
	Init()
	Shutdown()
}

ConsensusAsyncHandler controls the asynchronous polling mechanism, interval and shutdown

func NewNoopAsyncHandler

func NewNoopAsyncHandler() ConsensusAsyncHandler

func NewPollerAsyncHandler

func NewPollerAsyncHandler(ctx context.Context, cp *ConsensusPoller) ConsensusAsyncHandler

type ConsensusGetReceiptsResult

type ConsensusGetReceiptsResult struct {
	Method string      `json:"method"`
	Result interface{} `json:"result"`
}

type ConsensusOpt

type ConsensusOpt func(cp *ConsensusPoller)

func WithAsyncHandler

func WithAsyncHandler(asyncHandler ConsensusAsyncHandler) ConsensusOpt

func WithBanPeriod

func WithBanPeriod(banPeriod time.Duration) ConsensusOpt

func WithListener

func WithListener(listener OnConsensusBroken) ConsensusOpt

func WithMaxBlockLag

func WithMaxBlockLag(maxBlockLag uint64) ConsensusOpt

func WithMaxBlockRange

func WithMaxBlockRange(maxBlockRange uint64) ConsensusOpt

func WithMaxUpdateThreshold

func WithMaxUpdateThreshold(maxUpdateThreshold time.Duration) ConsensusOpt

func WithMinPeerCount

func WithMinPeerCount(minPeerCount uint64) ConsensusOpt

func WithPollerInterval

func WithPollerInterval(interval time.Duration) ConsensusOpt

func WithTracker

func WithTracker(tracker ConsensusTracker) ConsensusOpt

type ConsensusPoller

type ConsensusPoller struct {
	// contains filtered or unexported fields
}

ConsensusPoller checks the consensus state for each member of a BackendGroup resolves the highest common block for multiple nodes, and reconciles the consensus in case of block hash divergence to minimize re-orgs

func NewConsensusPoller

func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller

func (*ConsensusPoller) AddListener

func (cp *ConsensusPoller) AddListener(listener OnConsensusBroken)

func (*ConsensusPoller) Ban

func (cp *ConsensusPoller) Ban(be *Backend)

Ban bans a specific backend

func (*ConsensusPoller) BannedUntil

func (cp *ConsensusPoller) BannedUntil(be *Backend) time.Time

IsBanned checks if a specific backend is banned

func (*ConsensusPoller) ClearListeners

func (cp *ConsensusPoller) ClearListeners()

func (*ConsensusPoller) FilterCandidates

func (cp *ConsensusPoller) FilterCandidates(backends []*Backend) map[*Backend]*backendState

filterCandidates find out what backends are the candidates to be in the consensus group and create a copy of current their state

a candidate is a serving node within the following conditions:

  • not banned
  • healthy (network latency and error rate)
  • with minimum peer count
  • in sync
  • updated recently
  • not lagging latest block

func (*ConsensusPoller) GetBackendState

func (cp *ConsensusPoller) GetBackendState(be *Backend) *backendState

GetBackendState creates a copy of backend state so that the caller can use it without locking

func (*ConsensusPoller) GetConsensusGroup

func (cp *ConsensusPoller) GetConsensusGroup() []*Backend

GetConsensusGroup returns the backend members that are agreeing in a consensus

func (*ConsensusPoller) GetFinalizedBlockNumber

func (ct *ConsensusPoller) GetFinalizedBlockNumber() hexutil.Uint64

GetFinalizedBlockNumber returns the `finalized` agreed block number in a consensus

func (*ConsensusPoller) GetLastUpdate

func (cp *ConsensusPoller) GetLastUpdate(be *Backend) time.Time

func (*ConsensusPoller) GetLatestBlockNumber

func (ct *ConsensusPoller) GetLatestBlockNumber() hexutil.Uint64

GetLatestBlockNumber returns the `latest` agreed block number in a consensus

func (*ConsensusPoller) GetSafeBlockNumber

func (ct *ConsensusPoller) GetSafeBlockNumber() hexutil.Uint64

GetSafeBlockNumber returns the `safe` agreed block number in a consensus

func (*ConsensusPoller) IsBanned

func (cp *ConsensusPoller) IsBanned(be *Backend) bool

IsBanned checks if a specific backend is banned

func (*ConsensusPoller) Reset

func (cp *ConsensusPoller) Reset()

Reset reset all backend states

func (*ConsensusPoller) Shutdown

func (cp *ConsensusPoller) Shutdown()

func (*ConsensusPoller) Unban

func (cp *ConsensusPoller) Unban(be *Backend)

Unban removes any bans from the backends

func (*ConsensusPoller) UpdateBackend

func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend)

UpdateBackend refreshes the consensus state of a single backend

func (*ConsensusPoller) UpdateBackendGroupConsensus

func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context)

UpdateBackendGroupConsensus resolves the current group consensus based on the state of the backends

type ConsensusTracker

type ConsensusTracker interface {
	GetLatestBlockNumber() hexutil.Uint64
	SetLatestBlockNumber(blockNumber hexutil.Uint64)
	GetSafeBlockNumber() hexutil.Uint64
	SetSafeBlockNumber(blockNumber hexutil.Uint64)
	GetFinalizedBlockNumber() hexutil.Uint64
	SetFinalizedBlockNumber(blockNumber hexutil.Uint64)
}

ConsensusTracker abstracts how we store and retrieve the current consensus allowing it to be stored locally in-memory or in a shared Redis cluster

func NewInMemoryConsensusTracker

func NewInMemoryConsensusTracker() ConsensusTracker

func NewRedisConsensusTracker

func NewRedisConsensusTracker(ctx context.Context,
	redisClient *redis.Client,
	bg *BackendGroup,
	namespace string,
	opts ...RedisConsensusTrackerOpt) ConsensusTracker

type ConsensusTrackerState

type ConsensusTrackerState struct {
	Latest    hexutil.Uint64 `json:"latest"`
	Safe      hexutil.Uint64 `json:"safe"`
	Finalized hexutil.Uint64 `json:"finalized"`
}

DTO to hold the current consensus state

type FallbackRateLimiter

type FallbackRateLimiter struct {
	// contains filtered or unexported fields
}

FallbackRateLimiter is a combination of a primary and secondary rate limiter. If the primary rate limiter fails, due to an unexpected error, the secondary rate limiter will be used. This is useful to reduce reliance on a single Redis instance for rate limiting. If both fail, the request is not let through.

func (*FallbackRateLimiter) Take

func (r *FallbackRateLimiter) Take(ctx context.Context, key string) (bool, error)

type FrontendRateLimiter

type FrontendRateLimiter interface {
	// Take consumes a key, and a maximum number of requests
	// per time interval. It returns a boolean denoting if
	// the limit could be taken, or an error if a failure
	// occurred in the backing rate limit implementation.
	//
	// No error will be returned if the limit could not be taken
	// as a result of the requestor being over the limit.
	Take(ctx context.Context, key string) (bool, error)
}

func NewFallbackRateLimiter

func NewFallbackRateLimiter(primary FrontendRateLimiter, secondary FrontendRateLimiter) FrontendRateLimiter

func NewMemoryFrontendRateLimit

func NewMemoryFrontendRateLimit(dur time.Duration, max int) FrontendRateLimiter

func NewRedisFrontendRateLimiter

func NewRedisFrontendRateLimiter(r *redis.Client, dur time.Duration, max int, prefix string) FrontendRateLimiter

type InMemoryConsensusTracker

type InMemoryConsensusTracker struct {
	// contains filtered or unexported fields
}

InMemoryConsensusTracker store and retrieve in memory, async-safe

func (*InMemoryConsensusTracker) Behind

func (*InMemoryConsensusTracker) GetFinalizedBlockNumber

func (ct *InMemoryConsensusTracker) GetFinalizedBlockNumber() hexutil.Uint64

func (*InMemoryConsensusTracker) GetLatestBlockNumber

func (ct *InMemoryConsensusTracker) GetLatestBlockNumber() hexutil.Uint64

func (*InMemoryConsensusTracker) GetSafeBlockNumber

func (ct *InMemoryConsensusTracker) GetSafeBlockNumber() hexutil.Uint64

func (*InMemoryConsensusTracker) SetFinalizedBlockNumber

func (ct *InMemoryConsensusTracker) SetFinalizedBlockNumber(blockNumber hexutil.Uint64)

func (*InMemoryConsensusTracker) SetLatestBlockNumber

func (ct *InMemoryConsensusTracker) SetLatestBlockNumber(blockNumber hexutil.Uint64)

func (*InMemoryConsensusTracker) SetSafeBlockNumber

func (ct *InMemoryConsensusTracker) SetSafeBlockNumber(blockNumber hexutil.Uint64)

func (*InMemoryConsensusTracker) Valid

func (ct *InMemoryConsensusTracker) Valid() bool

type LimitedHTTPClient

type LimitedHTTPClient struct {
	http.Client
	// contains filtered or unexported fields
}

func (*LimitedHTTPClient) DoLimited

func (c *LimitedHTTPClient) DoLimited(req *http.Request) (*http.Response, error)

type LimitedReader

type LimitedReader struct {
	R io.Reader // underlying reader
	N int64     // max bytes remaining
}

A LimitedReader reads from R but limits the amount of data returned to just N bytes. Each call to Read updates N to reflect the new amount remaining. Unlike the standard library version, Read returns ErrLimitReaderOverLimit when N <= 0.

func (*LimitedReader) Read

func (l *LimitedReader) Read(p []byte) (int, error)

type MemoryFrontendRateLimiter

type MemoryFrontendRateLimiter struct {
	// contains filtered or unexported fields
}

MemoryFrontendRateLimiter is a rate limiter that stores all rate limiting information in local memory. It works by storing a limitedKeys struct that references the truncated timestamp at which the struct was created. If the current truncated timestamp doesn't match what's referenced, the limit is reset. Otherwise, values in a map are incremented to represent the limit. This will never return an error.

func (*MemoryFrontendRateLimiter) Take

type MethodMappingsConfig

type MethodMappingsConfig map[string]string

type MetricsConfig

type MetricsConfig struct {
	Enabled bool   `toml:"enabled"`
	Host    string `toml:"host"`
	Port    int    `toml:"port"`
}

type NoopAsyncHandler

type NoopAsyncHandler struct{}

NoopAsyncHandler allows fine control updating the consensus

func (*NoopAsyncHandler) Init

func (ah *NoopAsyncHandler) Init()

func (*NoopAsyncHandler) Shutdown

func (ah *NoopAsyncHandler) Shutdown()

type NoopRPCCache

type NoopRPCCache struct{}

func (*NoopRPCCache) GetRPC

func (n *NoopRPCCache) GetRPC(context.Context, *RPCReq) (*RPCRes, error)

func (*NoopRPCCache) PutRPC

func (n *NoopRPCCache) PutRPC(context.Context, *RPCReq, *RPCRes) error

type OnConsensusBroken

type OnConsensusBroken func()

type PollerAsyncHandler

type PollerAsyncHandler struct {
	// contains filtered or unexported fields
}

PollerAsyncHandler asynchronously updates each individual backend and the group consensus

func (*PollerAsyncHandler) Init

func (ah *PollerAsyncHandler) Init()

func (*PollerAsyncHandler) Shutdown

func (ah *PollerAsyncHandler) Shutdown()

type RPCCache

type RPCCache interface {
	GetRPC(ctx context.Context, req *RPCReq) (*RPCRes, error)
	PutRPC(ctx context.Context, req *RPCReq, res *RPCRes) error
}

type RPCErr

type RPCErr struct {
	Code          int    `json:"code"`
	Message       string `json:"message"`
	Data          string `json:"data,omitempty"`
	HTTPErrorCode int    `json:"-"`
}

func ErrInvalidParams

func ErrInvalidParams(msg string) *RPCErr

func ErrInvalidRequest

func ErrInvalidRequest(msg string) *RPCErr

func (*RPCErr) Clone

func (r *RPCErr) Clone() *RPCErr

func (*RPCErr) Error

func (r *RPCErr) Error() string

type RPCMethodHandler

type RPCMethodHandler interface {
	GetRPCMethod(context.Context, *RPCReq) (*RPCRes, error)
	PutRPCMethod(context.Context, *RPCReq, *RPCRes) error
}

type RPCReq

type RPCReq struct {
	JSONRPC string          `json:"jsonrpc"`
	Method  string          `json:"method"`
	Params  json.RawMessage `json:"params"`
	ID      json.RawMessage `json:"id"`
}

func ParseRPCReq

func ParseRPCReq(body []byte) (*RPCReq, error)

type RPCRes

type RPCRes struct {
	JSONRPC string
	Result  interface{}
	Error   *RPCErr
	ID      json.RawMessage
}

func NewRPCErrorRes

func NewRPCErrorRes(id json.RawMessage, err error) *RPCRes

func NewRPCRes

func NewRPCRes(id json.RawMessage, result interface{}) *RPCRes

func OverrideResponses

func OverrideResponses(res []*RPCRes, overriddenResponses []*indexedReqRes) []*RPCRes

func ParseRPCRes

func ParseRPCRes(r io.Reader) (*RPCRes, error)

func (*RPCRes) IsError

func (r *RPCRes) IsError() bool

func (*RPCRes) MarshalJSON

func (r *RPCRes) MarshalJSON() ([]byte, error)

type RateLimitConfig

type RateLimitConfig struct {
	UseRedis         bool                                `toml:"use_redis"`
	BaseRate         int                                 `toml:"base_rate"`
	BaseInterval     TOMLDuration                        `toml:"base_interval"`
	ExemptOrigins    []string                            `toml:"exempt_origins"`
	ExemptUserAgents []string                            `toml:"exempt_user_agents"`
	ErrorMessage     string                              `toml:"error_message"`
	MethodOverrides  map[string]*RateLimitMethodOverride `toml:"method_overrides"`
	IPHeaderOverride string                              `toml:"ip_header_override"`
}

type RateLimitMethodOverride

type RateLimitMethodOverride struct {
	Limit    int          `toml:"limit"`
	Interval TOMLDuration `toml:"interval"`
	Global   bool         `toml:"global"`
}

type RedisConfig

type RedisConfig struct {
	URL              string `toml:"url"`
	Namespace        string `toml:"namespace"`
	ReadURL          string `toml:"read_url"`
	FallbackToMemory bool   `toml:"fallback_to_memory"`
}

type RedisConsensusTracker

type RedisConsensusTracker struct {
	// contains filtered or unexported fields
}

RedisConsensusTracker store and retrieve in a shared Redis cluster, with leader election

func (*RedisConsensusTracker) GetFinalizedBlockNumber

func (ct *RedisConsensusTracker) GetFinalizedBlockNumber() hexutil.Uint64

func (*RedisConsensusTracker) GetLatestBlockNumber

func (ct *RedisConsensusTracker) GetLatestBlockNumber() hexutil.Uint64

func (*RedisConsensusTracker) GetSafeBlockNumber

func (ct *RedisConsensusTracker) GetSafeBlockNumber() hexutil.Uint64

func (*RedisConsensusTracker) Init

func (ct *RedisConsensusTracker) Init()

func (*RedisConsensusTracker) SetFinalizedBlockNumber

func (ct *RedisConsensusTracker) SetFinalizedBlockNumber(blockNumber hexutil.Uint64)

func (*RedisConsensusTracker) SetLatestBlockNumber

func (ct *RedisConsensusTracker) SetLatestBlockNumber(blockNumber hexutil.Uint64)

func (*RedisConsensusTracker) SetSafeBlockNumber

func (ct *RedisConsensusTracker) SetSafeBlockNumber(blockNumber hexutil.Uint64)

type RedisConsensusTrackerOpt

type RedisConsensusTrackerOpt func(cp *RedisConsensusTracker)

func WithHeartbeatInterval

func WithHeartbeatInterval(heartbeatInterval time.Duration) RedisConsensusTrackerOpt

func WithLockPeriod

func WithLockPeriod(lockPeriod time.Duration) RedisConsensusTrackerOpt

type RedisFrontendRateLimiter

type RedisFrontendRateLimiter struct {
	// contains filtered or unexported fields
}

RedisFrontendRateLimiter is a rate limiter that stores data in Redis. It uses the basic rate limiter pattern described on the Redis best practices website: https://redis.com/redis-best-practices/basic-rate-limiting/.

func (*RedisFrontendRateLimiter) Take

type RewriteContext

type RewriteContext struct {
	// contains filtered or unexported fields
}

type RewriteResult

type RewriteResult uint8
const (
	// RewriteNone means request should be forwarded as-is
	RewriteNone RewriteResult = iota

	// RewriteOverrideError means there was an error attempting to rewrite
	RewriteOverrideError

	// RewriteOverrideRequest means the modified request should be forwarded to the backend
	RewriteOverrideRequest

	// RewriteOverrideResponse means to skip calling the backend and serve the overridden response
	RewriteOverrideResponse
)

func RewriteRequest

func RewriteRequest(rctx RewriteContext, req *RPCReq, res *RPCRes) (RewriteResult, error)

RewriteRequest modifies the request object to comply with the rewrite context before the method has been called at the backend it returns false if nothing was changed

func RewriteResponse

func RewriteResponse(rctx RewriteContext, req *RPCReq, res *RPCRes) (RewriteResult, error)

RewriteResponse modifies the response object to comply with the rewrite context after the method has been called at the backend RewriteResult informs the decision of the rewrite

func RewriteTags

func RewriteTags(rctx RewriteContext, req *RPCReq, res *RPCRes) (RewriteResult, error)

RewriteTags modifies the request and the response based on block tags

type RoutingStrategy

type RoutingStrategy string
const (
	ConsensusAwareRoutingStrategy RoutingStrategy = "consensus_aware"
	MulticallRoutingStrategy      RoutingStrategy = "multicall"
	FallbackRoutingStrategy       RoutingStrategy = "fallback"
)

type SenderRateLimitConfig

type SenderRateLimitConfig struct {
	Enabled         bool
	Interval        TOMLDuration
	Limit           int
	AllowedChainIds []*big.Int `toml:"allowed_chain_ids"`
}

SenderRateLimitConfig configures the sender-based rate limiter for eth_sendRawTransaction requests. To enable pre-eip155 transactions, add '0' to allowed_chain_ids.

type Server

type Server struct {
	BackendGroups map[string]*BackendGroup
	// contains filtered or unexported fields
}

func NewServer

func NewServer(
	backendGroups map[string]*BackendGroup,
	wsBackendGroup *BackendGroup,
	wsMethodWhitelist *StringSet,
	rpcMethodMappings map[string]string,
	maxBodySize int64,
	authenticatedPaths map[string]string,
	timeout time.Duration,
	maxUpstreamBatchSize int,
	enableServedByHeader bool,
	cache RPCCache,
	rateLimitConfig RateLimitConfig,
	senderRateLimitConfig SenderRateLimitConfig,
	enableRequestLog bool,
	maxRequestBodyLogLen int,
	maxBatchSize int,
	limiterFactory limiterFactoryFunc,
) (*Server, error)

func Start

func Start(config *Config) (*Server, func(), error)

func (*Server) HandleHealthz

func (s *Server) HandleHealthz(w http.ResponseWriter, r *http.Request)

func (*Server) HandleRPC

func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request)

func (*Server) HandleWS

func (s *Server) HandleWS(w http.ResponseWriter, r *http.Request)

func (*Server) RPCListenAndServe

func (s *Server) RPCListenAndServe(host string, port int) error

func (*Server) Shutdown

func (s *Server) Shutdown()

func (*Server) WSListenAndServe

func (s *Server) WSListenAndServe(host string, port int) error

type ServerConfig

type ServerConfig struct {
	RPCHost           string `toml:"rpc_host"`
	RPCPort           int    `toml:"rpc_port"`
	WSHost            string `toml:"ws_host"`
	WSPort            int    `toml:"ws_port"`
	MaxBodySizeBytes  int64  `toml:"max_body_size_bytes"`
	MaxConcurrentRPCs int64  `toml:"max_concurrent_rpcs"`
	LogLevel          string `toml:"log_level"`

	// TimeoutSeconds specifies the maximum time spent serving an HTTP request. Note that isn't used for websocket connections
	TimeoutSeconds int `toml:"timeout_seconds"`

	MaxUpstreamBatchSize int `toml:"max_upstream_batch_size"`

	EnableRequestLog      bool `toml:"enable_request_log"`
	MaxRequestBodyLogLen  int  `toml:"max_request_body_log_len"`
	EnablePprof           bool `toml:"enable_pprof"`
	EnableXServedByHeader bool `toml:"enable_served_by_header"`
	AllowAllOrigins       bool `toml:"allow_all_origins"`
}

type StaticMethodHandler

type StaticMethodHandler struct {
	// contains filtered or unexported fields
}

func (*StaticMethodHandler) GetRPCMethod

func (e *StaticMethodHandler) GetRPCMethod(ctx context.Context, req *RPCReq) (*RPCRes, error)

func (*StaticMethodHandler) PutRPCMethod

func (e *StaticMethodHandler) PutRPCMethod(ctx context.Context, req *RPCReq, res *RPCRes) error

type StringSet

type StringSet struct {
	// contains filtered or unexported fields
}

func NewStringSet

func NewStringSet() *StringSet

func NewStringSetFromStrings

func NewStringSetFromStrings(in []string) *StringSet

func (*StringSet) Add

func (s *StringSet) Add(str string)

func (*StringSet) Entries

func (s *StringSet) Entries() []string

func (*StringSet) Extend

func (s *StringSet) Extend(in []string) *StringSet

func (*StringSet) Has

func (s *StringSet) Has(test string) bool

type TOMLDuration

type TOMLDuration time.Duration

func (*TOMLDuration) UnmarshalText

func (t *TOMLDuration) UnmarshalText(b []byte) error

type WSProxier

type WSProxier struct {
	// contains filtered or unexported fields
}

func NewWSProxier

func NewWSProxier(backend *Backend, clientConn, backendConn *websocket.Conn, methodWhitelist *StringSet) *WSProxier

func (*WSProxier) Proxy

func (w *WSProxier) Proxy(ctx context.Context) error

Directories

Path Synopsis
cmd
pkg
tools

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL