adapter

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 31, 2018 License: Apache-2.0 Imports: 34 Imported by: 0

README

中继适配器SDK

按照 QCP 协议规范封装适配SDK,以实现安全、简便和快捷的接入非 Tendermint 技术栈的区块链。

为接入链提供交易发布功能,并为中继通信提供标准接口及相关服务( Http Rpc、Web Socket )。

Documentation

Overview

Package adapter 按照 QCP 协议规范封装适配SDK,以实现安全、简便和快捷的接入非 Tendermint 技术栈的区块链。

为接入链提供交易发布功能,并为中继通信提供标准接口及相关服务( Http Rpc、Web Socket )。

Index

Constants

View Source
const (
	// MaxBodyBytes controls the maximum number of bytes the
	// server will read parsing the request body.
	MaxBodyBytes = int64(1000000) // 1MB
)

Variables

View Source
var (

	// RegexpInt 整数正则
	RegexpInt = regexp.MustCompile(`^-?[0-9]+$`)
	// RegexpHex 16进制正则
	RegexpHex = regexp.MustCompile(`^(?i)[a-f0-9]+$`)
	// RegexpEmail 正则
	RegexpEmail = regexp.MustCompile(`^(?i)(` + dotAtom + `)@(` + dotAtom + `)$`)
	// RegexpAddress 正则
	RegexpAddress = regexp.MustCompile(`^(?i)[a-z0-9]{25,34}$`)
	// RegexpHost 正则
	RegexpHost = regexp.MustCompile(`^(?i)(` + domain + `)$`)
)
View Source
var (
	// ErrSubscriptionNotFound is returned when a client tries to unsubscribe
	// from not existing subscription.
	ErrSubscriptionNotFound = errors.New("subscription not found")

	// ErrAlreadySubscribed is returned when a client tries to subscribe twice or
	// more using the same query.
	ErrAlreadySubscribed = errors.New("already subscribed")
)
View Source
var (
	// ErrAlreadyStarted 已启动错误
	ErrAlreadyStarted = errors.New("already started")
	// ErrAlreadyStopped 已停止错误
	ErrAlreadyStopped = errors.New("already stopped")
)

Functions

func ABCIQuery

func ABCIQuery(path string, data cmn.HexBytes, height int64, trusted bool) (*ctypes.ResultABCIQuery, error)

ABCIQuery 交易、交易序号查询。

func EventSubscriber

func EventSubscriber(eventSub types.EventSubscriber) func(*wsConnection)

EventSubscriber sets object that is used to subscribe / unsubscribe from events - not Goroutine-safe. If none given, default node's eventBus will be used.

func GetParam

func GetParam(r *http.Request, param string) string

GetParam 获取指定参数

func GetParamByteSlice

func GetParamByteSlice(r *http.Request, param string) ([]byte, error)

GetParamByteSlice 获取指定字节切片

func GetParamFloat64

func GetParamFloat64(r *http.Request, param string) (float64, error)

GetParamFloat64 获取指定参数

func GetParamInt32

func GetParamInt32(r *http.Request, param string) (int32, error)

GetParamInt32 获取指定参数

func GetParamInt64

func GetParamInt64(r *http.Request, param string) (int64, error)

GetParamInt64 获取指定参数

func GetParamRegexp

func GetParamRegexp(r *http.Request, param string, re *regexp.Regexp) (string, error)

GetParamRegexp 获取指定参数

func GetParamUint

func GetParamUint(r *http.Request, param string) (uint, error)

GetParamUint 获取指定参数

func GetParamUint64

func GetParamUint64(r *http.Request, param string) (uint64, error)

GetParamUint64 获取指定参数

func Health

func Health() (*ctypes.ResultHealth, error)

Health 健康检查。

func NewWSConnection

func NewWSConnection(
	baseConn *websocket.Conn,
	funcMap map[string]*RPCFunc,
	cdc *amino.Codec,
	options ...func(*wsConnection),
) *wsConnection

NewWSConnection wraps websocket.Conn.

See the commentary on the func(*wsConnection) functions for a detailed description of how to configure ping period and pong wait time. NOTE: if the write buffer is full, pongs may be dropped, which may cause clients to disconnect. see https://github.com/gorilla/websocket/issues/97

func PingPeriod

func PingPeriod(pingPeriod time.Duration) func(*wsConnection)

PingPeriod sets the duration for sending websocket pings. It should only be used in the constructor - not Goroutine-safe.

func ReadWait

func ReadWait(readWait time.Duration) func(*wsConnection)

ReadWait sets the amount of time to wait before a websocket read times out. It should only be used in the constructor - not Goroutine-safe.

func RecoverAndLogHandler

func RecoverAndLogHandler(handler http.Handler) http.Handler

RecoverAndLogHandler Wraps an HTTP handler, adding error logging. If the inner function panics, the outer function recovers, logs, sends an HTTP 500 error response.

func RegisterRPCFuncs

func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc, cdc *amino.Codec)

RegisterRPCFuncs adds a route for each function in the funcMap, as well as general jsonrpc and websocket handlers for all functions. "result" is the interface on which the result objects are registered, and is popualted with every RPCResponse

func StartHTTPAndTLSServer

func StartHTTPAndTLSServer(
	listenAddr string,
	handler http.Handler,
	certFile, keyFile string,
	config Config,
) (listener net.Listener, err error)

StartHTTPAndTLSServer starts an HTTPS server on listenAddr with the given handler. It wraps handler with RecoverAndLogHandler.

func StartHTTPServer

func StartHTTPServer(
	listenAddr string,
	handler http.Handler,
	config Config,
) (listener net.Listener, err error)

StartHTTPServer starts an HTTP server on listenAddr with the given handler. It wraps handler with RecoverAndLogHandler.

func StringTx

func StringTx(tx *txs.TxQcp) string

StringTx 将交易转换为字符串,用于日志记录,非完全序列化

func WriteChanCapacity

func WriteChanCapacity(cap int) func(*wsConnection)

WriteChanCapacity sets the capacity of the websocket write channel. It should only be used in the constructor - not Goroutine-safe.

func WriteRPCResponseHTTP

func WriteRPCResponseHTTP(w http.ResponseWriter, res types.RPCResponse)

WriteRPCResponseHTTP write rpc response

func WriteRPCResponseHTTPError

func WriteRPCResponseHTTPError(
	w http.ResponseWriter,
	httpCode int,
	res types.RPCResponse,
)

WriteRPCResponseHTTPError write rpc response error

func WriteWait

func WriteWait(writeWait time.Duration) func(*wsConnection)

WriteWait sets the amount of time to wait before a websocket write times out. It should only be used in the constructor - not Goroutine-safe.

Types

type Adapter

type Adapter struct {
	HandlerService
	Broadcaster
	Receiver
}

Adapter 适配接口封装,封装交易广播接口和交易接收接口。

交易广播接口( Broadcaster )为调用端检测到交易事件时,由调用端调用; 交易接收接口( Receiver )为中继适配服务接收到远端中继广播的交易后,由适配服务回调通知调用方接收到远端跨链交易。

func NewAdapter

func NewAdapter(name, id, listenAddr string, r Receiver, b Broadcaster) (*Adapter, error)

NewAdapter 创建新的交易广播器

type BaseService

type BaseService struct {
	// contains filtered or unexported fields
}

BaseService 封装服务基础操作方法

func NewBaseService

func NewBaseService(name string, impl Service) *BaseService

NewBaseService creates a new BaseService.

func (*BaseService) IsRunning

func (bs *BaseService) IsRunning() bool

IsRunning implements Service by returning true or false depending on the service's state.

func (*BaseService) OnReset

func (bs *BaseService) OnReset() error

OnReset implements Service by panicking.

func (*BaseService) OnStart

func (bs *BaseService) OnStart() error

OnStart implements Service by doing nothing. NOTE: Do not put anything in here, that way users don't need to call BaseService.OnStart()

func (*BaseService) OnStop

func (bs *BaseService) OnStop()

OnStop implements Service by doing nothing. NOTE: Do not put anything in here, that way users don't need to call BaseService.OnStop()

func (*BaseService) Quit

func (bs *BaseService) Quit() <-chan struct{}

Quit Implements Service by returning a quit channel.

func (*BaseService) Reset

func (bs *BaseService) Reset() error

Reset implements Service by calling OnReset callback (if defined). An error will be returned if the service is running.

func (*BaseService) Start

func (bs *BaseService) Start() error

Start implements Service by calling OnStart (if defined). An error will be returned if the service is already running or stopped. Not to start the stopped service, you need to call Reset.

func (*BaseService) Stop

func (bs *BaseService) Stop() error

Stop implements Service by calling OnStop (if defined) and closing quit channel. An error will be returned if the service is already stopped.

func (*BaseService) String

func (bs *BaseService) String() string

String implements Servce by returning a string representation of the service.

func (*BaseService) Wait

func (bs *BaseService) Wait()

Wait blocks until the service is stopped.

type Broadcaster

type Broadcaster interface {
	BroadcastTx(tx *txs.TxQcp) error
}

Broadcaster 交易广播接口,通过该接口广播的交易即表示需要通过中继跨链提交交易以最终完成交易。

type Config

type Config struct {
	MaxOpenConnections int
}

Config is an RPC server configuration.

type DefaultBroadcaster

type DefaultBroadcaster struct {
	// contains filtered or unexported fields
}

DefaultBroadcaster 实现内存交易广播器。

作为接入链跨链交易的缓存以提高查询的相关功能的执行效率。 交易在广播器中不会缓存,而是直接转发给中继适配服务。

func (*DefaultBroadcaster) BroadcastTx

func (b *DefaultBroadcaster) BroadcastTx(tx *txs.TxQcp) (err error)

BroadcastTx 实现交易广播接口,调用响应的交易及交易事件发布接口,以通过中继跨链提交交易以最终完成交易。

因为按照 QCP 协议规范定义,中继都是在接收到交易事件后查询交易数据,因此应保证先调用发布交易接口,然后再调用发布事件接口。

type DefaultHandlerService

type DefaultHandlerService struct {
	// contains filtered or unexported fields
}

DefaultHandlerService 服务管理结构,提供基本的启动,停止等服务管理功能。

交易事件不会缓存,会直接发送给匹配交易事件订阅条件( Query )的远端中继服务; 交易会被进行缓存(或持久化存储,取决于选用的交易池服务),交易数、占用内存或缓存时间超过一定量会自动清理缓存。

func (DefaultHandlerService) BroadcastTxSync

func (s DefaultHandlerService) BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error)

BroadcastTxSync 广播交易。

func (DefaultHandlerService) CancelTx

func (s DefaultHandlerService) CancelTx(tx *txs.TxQcp) error

CancelTx 撤销发布交易

func (DefaultHandlerService) GetCodec

func (s DefaultHandlerService) GetCodec() *amino.Codec

GetCodec 获取amino.Codec 以便 Mock 时修改

func (DefaultHandlerService) ID

ID 获取服务唯一标识,唯一标识设置后不允许修改,提供此方法获取服务唯一标识。

func (DefaultHandlerService) Name

func (s DefaultHandlerService) Name() string

Name 获取服务名称,服务名称设置后不允许修改,提供此方法获取服务名称。

func (DefaultHandlerService) PublishEvent

func (s DefaultHandlerService) PublishEvent(e *tmtypes.EventDataTx) error

PublishEvent 发布交易事件,提供给事件订阅

因为按照 QCP 协议规范定义,中继都是在接收到交易事件后查询交易数据,因此应保证先调用发布交易接口,然后再调用发布事件接口。

func (DefaultHandlerService) PublishTx

func (s DefaultHandlerService) PublishTx(tx *txs.TxQcp) error

PublishTx 发布交易,提供给交易查询

因为按照 QCP 协议规范定义,中继都是在接收到交易事件后查询交易数据,因此应保证先调用发布交易接口,然后再调用发布事件接口。

func (DefaultHandlerService) Routes

func (s DefaultHandlerService) Routes() map[string]*RPCFunc

Routes 创建接口路由映射

func (DefaultHandlerService) Start

func (s DefaultHandlerService) Start() (err error)

Start 启动服务

func (DefaultHandlerService) Stop

func (s DefaultHandlerService) Stop() error

Stop 停止服务,释放相关资源

func (DefaultHandlerService) Subscribe

Subscribe 指定订阅条件,订阅交易事件。

func (DefaultHandlerService) Unsubscribe

Unsubscribe 根据具体订阅条件,取消交易事件的订阅。

func (DefaultHandlerService) UnsubscribeAll

UnsubscribeAll 取消全部交易事件订阅。

type HandlerService

type HandlerService interface {
	Start() error
	Stop() error
	GetCodec() *amino.Codec
	PublishTx(tx *txs.TxQcp) error
	PublishEvent(e *tmtypes.EventDataTx) error
	CancelTx(tx *txs.TxQcp) error
}

HandlerService 中继基础服务封装接口

func NewHandlerService

func NewHandlerService(name, id, listenAddr string) (HandlerService, error)

NewHandlerService 创建新服务管理实例

type MempoolMocker

type MempoolMocker struct {
}

MempoolMocker mock 实现,用于事件订阅和RPC 服务模拟

func (MempoolMocker) CheckTx

func (m MempoolMocker) CheckTx(tx types.Tx, cb func(*abci.Response)) error

CheckTx 接口方法实现

func (MempoolMocker) EnableTxsAvailable

func (m MempoolMocker) EnableTxsAvailable()

EnableTxsAvailable 接口方法实现

func (MempoolMocker) Flush

func (m MempoolMocker) Flush()

Flush 接口方法实现

func (MempoolMocker) FlushAppConn

func (m MempoolMocker) FlushAppConn() error

FlushAppConn 接口方法实现

func (MempoolMocker) Lock

func (m MempoolMocker) Lock()

Lock 接口方法实现

func (MempoolMocker) Reap

func (m MempoolMocker) Reap(n int) types.Txs

Reap 接口方法实现

func (MempoolMocker) Size

func (m MempoolMocker) Size() int

Size 接口方法实现

func (MempoolMocker) TxsAvailable

func (m MempoolMocker) TxsAvailable() <-chan struct{}

TxsAvailable 接口方法实现

func (MempoolMocker) Unlock

func (m MempoolMocker) Unlock()

Unlock 接口方法实现

func (MempoolMocker) Update

func (m MempoolMocker) Update(height int64, txs types.Txs) error

Update 接口方法实现

type Option

type Option func(*Server)

Option sets a parameter for the server.

func BufferCapacity

func BufferCapacity(cap int) Option

BufferCapacity allows you to specify capacity for the internal server's queue. Since the server, given Y subscribers, could only process X messages, this option could be used to survive spikes (e.g. high amount of transactions during peak hours).

type Query

type Query interface {
	Matches(tags TagMap) bool
	String() string
}

Query defines an interface for a query to be used for subscribing.

type RPCFunc

type RPCFunc struct {
	// contains filtered or unexported fields
}

RPCFunc contains the introspected type information for a function

func NewRPCFunc

func NewRPCFunc(f interface{}, args string) *RPCFunc

NewRPCFunc wraps a function for introspection. f is the function, args are comma separated argument names

func NewWSRPCFunc

func NewWSRPCFunc(f interface{}, args string) *RPCFunc

NewWSRPCFunc wraps a function for introspection and use in the websockets.

type Receiver

type Receiver interface {
	ReceiveTx(tx *txs.TxQcp) error
}

Receiver 交易接收接口,接收中继从其他接入链发来的跨链交易。

type ResponseWriterWrapper

type ResponseWriterWrapper struct {
	Status int
	http.ResponseWriter
}

ResponseWriterWrapper Remember the status for logging

func (*ResponseWriterWrapper) Hijack

Hijack implements http.Hijacker

func (*ResponseWriterWrapper) WriteHeader

func (w *ResponseWriterWrapper) WriteHeader(status int)

WriteHeader write header

type Server

type Server struct {
	BaseService
	// contains filtered or unexported fields
}

Server allows clients to subscribe/unsubscribe for messages, publishing messages with or without tags, and manages internal state.

func NewServer

func NewServer(options ...Option) *Server

NewServer returns a new server. See the commentary on the Option functions for a detailed description of how to configure buffering. If no options are provided, the resulting server's queue is unbuffered.

func (*Server) BufferCapacity

func (s *Server) BufferCapacity() int

BufferCapacity returns capacity of the internal server's queue.

func (*Server) OnReset

func (s *Server) OnReset() error

OnReset implements Service.OnReset

func (*Server) OnStart

func (s *Server) OnStart() error

OnStart implements Service.OnStart by starting the server.

func (*Server) OnStop

func (s *Server) OnStop()

OnStop implements Service.OnStop by shutting down the server.

func (*Server) Publish

func (s *Server) Publish(ctx context.Context, msg interface{}) error

Publish publishes the given message. An error will be returned to the caller if the context is canceled.

func (*Server) PublishWithTags

func (s *Server) PublishWithTags(ctx context.Context, msg interface{}, tags TagMap) error

PublishWithTags publishes the given message with the set of tags. The set is matched with clients queries. If there is a match, the message is sent to the client.

func (*Server) Subscribe

func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, out chan<- interface{}) error

Subscribe creates a subscription for the given client. It accepts a channel on which messages matching the given query can be received. An error will be returned to the caller if the context is canceled or if subscription already exist for pair clientID and query.

func (*Server) Unsubscribe

func (s *Server) Unsubscribe(ctx context.Context, clientID string, query Query) error

Unsubscribe removes the subscription on the given query. An error will be returned to the caller if the context is canceled or if subscription does not exist.

func (*Server) UnsubscribeAll

func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error

UnsubscribeAll removes all client subscriptions. An error will be returned to the caller if the context is canceled or if subscription does not exist.

type Service

type Service interface {
	// Start the service.
	// If it's already started or stopped, will return an error.
	// If OnStart() returns an error, it's returned by Start()
	Start() error
	OnStart() error

	// Stop the service.
	// If it's already stopped, will return an error.
	// OnStop must never error.
	Stop() error
	OnStop()

	// Reset the service.
	// Panics by default - must be overwritten to enable reset.
	Reset() error
	OnReset() error

	// Return true if the service is running
	IsRunning() bool

	// Quit returns a channel, which is closed once service is stopped.
	Quit() <-chan struct{}

	// String representation of the service
	String() string
}

Service defines a service that can be started, stopped, and reset.

type TagMap

type TagMap interface {
	// Get returns the value for a key, or nil if no value is present.
	// The ok result indicates whether value was found in the tags.
	Get(key string) (value string, ok bool)
	// Len returns the number of tags.
	Len() int
}

TagMap is used to associate tags to a message. They can be queried by subscribers to choose messages they will received.

func NewTagMap

func NewTagMap(data map[string]string) TagMap

NewTagMap constructs a new immutable tag set from a map.

type WebsocketManager

type WebsocketManager struct {
	websocket.Upgrader
	// contains filtered or unexported fields
}

WebsocketManager provides a WS handler for incoming connections and passes a map of functions along with any additional params to new connections. NOTE: The websocket path is defined externally, e.g. in node/node.go

func NewWebsocketManager

func NewWebsocketManager(funcMap map[string]*RPCFunc, cdc *amino.Codec, wsConnOptions ...func(*wsConnection)) *WebsocketManager

NewWebsocketManager returns a new WebsocketManager that passes a map of functions, connection options and logger to new WS connections.

func (*WebsocketManager) WebsocketHandler

func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Request)

WebsocketHandler upgrades the request/response (via http.Hijack) and starts the wsConnection.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL