Documentation ¶
Overview ¶
Package adapter 按照 QCP 协议规范封装适配SDK,以实现安全、简便和快捷的接入非 Tendermint 技术栈的区块链。
为接入链提供交易发布功能,并为中继通信提供标准接口及相关服务( Http Rpc、Web Socket )。
Index ¶
- Constants
- Variables
- func ABCIQuery(path string, data cmn.HexBytes, height int64, trusted bool) (*ctypes.ResultABCIQuery, error)
- func EventSubscriber(eventSub types.EventSubscriber) func(*wsConnection)
- func GetParam(r *http.Request, param string) string
- func GetParamByteSlice(r *http.Request, param string) ([]byte, error)
- func GetParamFloat64(r *http.Request, param string) (float64, error)
- func GetParamInt32(r *http.Request, param string) (int32, error)
- func GetParamInt64(r *http.Request, param string) (int64, error)
- func GetParamRegexp(r *http.Request, param string, re *regexp.Regexp) (string, error)
- func GetParamUint(r *http.Request, param string) (uint, error)
- func GetParamUint64(r *http.Request, param string) (uint64, error)
- func Health() (*ctypes.ResultHealth, error)
- func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, cdc *amino.Codec, ...) *wsConnection
- func PingPeriod(pingPeriod time.Duration) func(*wsConnection)
- func ReadWait(readWait time.Duration) func(*wsConnection)
- func RecoverAndLogHandler(handler http.Handler) http.Handler
- func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc, cdc *amino.Codec)
- func StartHTTPAndTLSServer(listenAddr string, handler http.Handler, certFile, keyFile string, ...) (listener net.Listener, err error)
- func StartHTTPServer(listenAddr string, handler http.Handler, config Config) (listener net.Listener, err error)
- func StringTx(tx *txs.TxQcp) string
- func WriteChanCapacity(cap int) func(*wsConnection)
- func WriteRPCResponseHTTP(w http.ResponseWriter, res types.RPCResponse)
- func WriteRPCResponseHTTPError(w http.ResponseWriter, httpCode int, res types.RPCResponse)
- func WriteWait(writeWait time.Duration) func(*wsConnection)
- type Adapter
- type BaseService
- func (bs *BaseService) IsRunning() bool
- func (bs *BaseService) OnReset() error
- func (bs *BaseService) OnStart() error
- func (bs *BaseService) OnStop()
- func (bs *BaseService) Quit() <-chan struct{}
- func (bs *BaseService) Reset() error
- func (bs *BaseService) Start() error
- func (bs *BaseService) Stop() error
- func (bs *BaseService) String() string
- func (bs *BaseService) Wait()
- type Broadcaster
- type Config
- type DefaultBroadcaster
- type DefaultHandlerService
- func (s DefaultHandlerService) BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error)
- func (s DefaultHandlerService) CancelTx(tx *txs.TxQcp) error
- func (s DefaultHandlerService) GetCodec() *amino.Codec
- func (s DefaultHandlerService) ID() string
- func (s DefaultHandlerService) Name() string
- func (s DefaultHandlerService) PublishEvent(e *tmtypes.EventDataTx) error
- func (s DefaultHandlerService) PublishTx(tx *txs.TxQcp) error
- func (s DefaultHandlerService) Routes() map[string]*RPCFunc
- func (s DefaultHandlerService) Start() (err error)
- func (s DefaultHandlerService) Stop() error
- func (s DefaultHandlerService) Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscribe, error)
- func (s DefaultHandlerService) Unsubscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultUnsubscribe, error)
- func (s DefaultHandlerService) UnsubscribeAll(wsCtx rpctypes.WSRPCContext) (*ctypes.ResultUnsubscribe, error)
- type HandlerService
- type MempoolMocker
- func (m MempoolMocker) CheckTx(tx types.Tx, cb func(*abci.Response)) error
- func (m MempoolMocker) EnableTxsAvailable()
- func (m MempoolMocker) Flush()
- func (m MempoolMocker) FlushAppConn() error
- func (m MempoolMocker) Lock()
- func (m MempoolMocker) Reap(n int) types.Txs
- func (m MempoolMocker) Size() int
- func (m MempoolMocker) TxsAvailable() <-chan struct{}
- func (m MempoolMocker) Unlock()
- func (m MempoolMocker) Update(height int64, txs types.Txs) error
- type Option
- type Query
- type RPCFunc
- type Receiver
- type ResponseWriterWrapper
- type Server
- func (s *Server) BufferCapacity() int
- func (s *Server) OnReset() error
- func (s *Server) OnStart() error
- func (s *Server) OnStop()
- func (s *Server) Publish(ctx context.Context, msg interface{}) error
- func (s *Server) PublishWithTags(ctx context.Context, msg interface{}, tags TagMap) error
- func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, out chan<- interface{}) error
- func (s *Server) Unsubscribe(ctx context.Context, clientID string, query Query) error
- func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error
- type Service
- type TagMap
- type WebsocketManager
Constants ¶
const ( // MaxBodyBytes controls the maximum number of bytes the // server will read parsing the request body. MaxBodyBytes = int64(1000000) // 1MB )
Variables ¶
var ( // RegexpInt 整数正则 RegexpInt = regexp.MustCompile(`^-?[0-9]+$`) // RegexpHex 16进制正则 RegexpHex = regexp.MustCompile(`^(?i)[a-f0-9]+$`) // RegexpEmail 正则 RegexpEmail = regexp.MustCompile(`^(?i)(` + dotAtom + `)@(` + dotAtom + `)$`) // RegexpAddress 正则 RegexpAddress = regexp.MustCompile(`^(?i)[a-z0-9]{25,34}$`) // RegexpHost 正则 RegexpHost = regexp.MustCompile(`^(?i)(` + domain + `)$`) )
var ( // ErrSubscriptionNotFound is returned when a client tries to unsubscribe // from not existing subscription. ErrSubscriptionNotFound = errors.New("subscription not found") // ErrAlreadySubscribed is returned when a client tries to subscribe twice or // more using the same query. ErrAlreadySubscribed = errors.New("already subscribed") )
var ( // ErrAlreadyStarted 已启动错误 ErrAlreadyStarted = errors.New("already started") // ErrAlreadyStopped 已停止错误 ErrAlreadyStopped = errors.New("already stopped") )
Functions ¶
func ABCIQuery ¶
func ABCIQuery(path string, data cmn.HexBytes, height int64, trusted bool) (*ctypes.ResultABCIQuery, error)
ABCIQuery 交易、交易序号查询。
func EventSubscriber ¶
func EventSubscriber(eventSub types.EventSubscriber) func(*wsConnection)
EventSubscriber sets object that is used to subscribe / unsubscribe from events - not Goroutine-safe. If none given, default node's eventBus will be used.
func GetParamByteSlice ¶
GetParamByteSlice 获取指定字节切片
func GetParamFloat64 ¶
GetParamFloat64 获取指定参数
func GetParamInt32 ¶
GetParamInt32 获取指定参数
func GetParamInt64 ¶
GetParamInt64 获取指定参数
func GetParamRegexp ¶
GetParamRegexp 获取指定参数
func GetParamUint ¶
GetParamUint 获取指定参数
func GetParamUint64 ¶
GetParamUint64 获取指定参数
func NewWSConnection ¶
func NewWSConnection( baseConn *websocket.Conn, funcMap map[string]*RPCFunc, cdc *amino.Codec, options ...func(*wsConnection), ) *wsConnection
NewWSConnection wraps websocket.Conn.
See the commentary on the func(*wsConnection) functions for a detailed description of how to configure ping period and pong wait time. NOTE: if the write buffer is full, pongs may be dropped, which may cause clients to disconnect. see https://github.com/gorilla/websocket/issues/97
func PingPeriod ¶
PingPeriod sets the duration for sending websocket pings. It should only be used in the constructor - not Goroutine-safe.
func ReadWait ¶
ReadWait sets the amount of time to wait before a websocket read times out. It should only be used in the constructor - not Goroutine-safe.
func RecoverAndLogHandler ¶
RecoverAndLogHandler Wraps an HTTP handler, adding error logging. If the inner function panics, the outer function recovers, logs, sends an HTTP 500 error response.
func RegisterRPCFuncs ¶
RegisterRPCFuncs adds a route for each function in the funcMap, as well as general jsonrpc and websocket handlers for all functions. "result" is the interface on which the result objects are registered, and is popualted with every RPCResponse
func StartHTTPAndTLSServer ¶
func StartHTTPAndTLSServer( listenAddr string, handler http.Handler, certFile, keyFile string, config Config, ) (listener net.Listener, err error)
StartHTTPAndTLSServer starts an HTTPS server on listenAddr with the given handler. It wraps handler with RecoverAndLogHandler.
func StartHTTPServer ¶
func StartHTTPServer( listenAddr string, handler http.Handler, config Config, ) (listener net.Listener, err error)
StartHTTPServer starts an HTTP server on listenAddr with the given handler. It wraps handler with RecoverAndLogHandler.
func WriteChanCapacity ¶
func WriteChanCapacity(cap int) func(*wsConnection)
WriteChanCapacity sets the capacity of the websocket write channel. It should only be used in the constructor - not Goroutine-safe.
func WriteRPCResponseHTTP ¶
func WriteRPCResponseHTTP(w http.ResponseWriter, res types.RPCResponse)
WriteRPCResponseHTTP write rpc response
func WriteRPCResponseHTTPError ¶
func WriteRPCResponseHTTPError( w http.ResponseWriter, httpCode int, res types.RPCResponse, )
WriteRPCResponseHTTPError write rpc response error
Types ¶
type Adapter ¶
type Adapter struct { HandlerService Broadcaster Receiver }
Adapter 适配接口封装,封装交易广播接口和交易接收接口。
交易广播接口( Broadcaster )为调用端检测到交易事件时,由调用端调用; 交易接收接口( Receiver )为中继适配服务接收到远端中继广播的交易后,由适配服务回调通知调用方接收到远端跨链交易。
func NewAdapter ¶
func NewAdapter(name, id, listenAddr string, r Receiver, b Broadcaster) (*Adapter, error)
NewAdapter 创建新的交易广播器
type BaseService ¶
type BaseService struct {
// contains filtered or unexported fields
}
BaseService 封装服务基础操作方法
func NewBaseService ¶
func NewBaseService(name string, impl Service) *BaseService
NewBaseService creates a new BaseService.
func (*BaseService) IsRunning ¶
func (bs *BaseService) IsRunning() bool
IsRunning implements Service by returning true or false depending on the service's state.
func (*BaseService) OnReset ¶
func (bs *BaseService) OnReset() error
OnReset implements Service by panicking.
func (*BaseService) OnStart ¶
func (bs *BaseService) OnStart() error
OnStart implements Service by doing nothing. NOTE: Do not put anything in here, that way users don't need to call BaseService.OnStart()
func (*BaseService) OnStop ¶
func (bs *BaseService) OnStop()
OnStop implements Service by doing nothing. NOTE: Do not put anything in here, that way users don't need to call BaseService.OnStop()
func (*BaseService) Quit ¶
func (bs *BaseService) Quit() <-chan struct{}
Quit Implements Service by returning a quit channel.
func (*BaseService) Reset ¶
func (bs *BaseService) Reset() error
Reset implements Service by calling OnReset callback (if defined). An error will be returned if the service is running.
func (*BaseService) Start ¶
func (bs *BaseService) Start() error
Start implements Service by calling OnStart (if defined). An error will be returned if the service is already running or stopped. Not to start the stopped service, you need to call Reset.
func (*BaseService) Stop ¶
func (bs *BaseService) Stop() error
Stop implements Service by calling OnStop (if defined) and closing quit channel. An error will be returned if the service is already stopped.
func (*BaseService) String ¶
func (bs *BaseService) String() string
String implements Servce by returning a string representation of the service.
type Broadcaster ¶
Broadcaster 交易广播接口,通过该接口广播的交易即表示需要通过中继跨链提交交易以最终完成交易。
type DefaultBroadcaster ¶
type DefaultBroadcaster struct {
// contains filtered or unexported fields
}
DefaultBroadcaster 实现内存交易广播器。
作为接入链跨链交易的缓存以提高查询的相关功能的执行效率。 交易在广播器中不会缓存,而是直接转发给中继适配服务。
func (*DefaultBroadcaster) BroadcastTx ¶
func (b *DefaultBroadcaster) BroadcastTx(tx *txs.TxQcp) (err error)
BroadcastTx 实现交易广播接口,调用响应的交易及交易事件发布接口,以通过中继跨链提交交易以最终完成交易。
因为按照 QCP 协议规范定义,中继都是在接收到交易事件后查询交易数据,因此应保证先调用发布交易接口,然后再调用发布事件接口。
type DefaultHandlerService ¶
type DefaultHandlerService struct {
// contains filtered or unexported fields
}
DefaultHandlerService 服务管理结构,提供基本的启动,停止等服务管理功能。
交易事件不会缓存,会直接发送给匹配交易事件订阅条件( Query )的远端中继服务; 交易会被进行缓存(或持久化存储,取决于选用的交易池服务),交易数、占用内存或缓存时间超过一定量会自动清理缓存。
func (DefaultHandlerService) BroadcastTxSync ¶
func (s DefaultHandlerService) BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error)
BroadcastTxSync 广播交易。
func (DefaultHandlerService) CancelTx ¶
func (s DefaultHandlerService) CancelTx(tx *txs.TxQcp) error
CancelTx 撤销发布交易
func (DefaultHandlerService) GetCodec ¶
func (s DefaultHandlerService) GetCodec() *amino.Codec
GetCodec 获取amino.Codec 以便 Mock 时修改
func (DefaultHandlerService) ID ¶
func (s DefaultHandlerService) ID() string
ID 获取服务唯一标识,唯一标识设置后不允许修改,提供此方法获取服务唯一标识。
func (DefaultHandlerService) Name ¶
func (s DefaultHandlerService) Name() string
Name 获取服务名称,服务名称设置后不允许修改,提供此方法获取服务名称。
func (DefaultHandlerService) PublishEvent ¶
func (s DefaultHandlerService) PublishEvent(e *tmtypes.EventDataTx) error
PublishEvent 发布交易事件,提供给事件订阅
因为按照 QCP 协议规范定义,中继都是在接收到交易事件后查询交易数据,因此应保证先调用发布交易接口,然后再调用发布事件接口。
func (DefaultHandlerService) PublishTx ¶
func (s DefaultHandlerService) PublishTx(tx *txs.TxQcp) error
PublishTx 发布交易,提供给交易查询
因为按照 QCP 协议规范定义,中继都是在接收到交易事件后查询交易数据,因此应保证先调用发布交易接口,然后再调用发布事件接口。
func (DefaultHandlerService) Routes ¶
func (s DefaultHandlerService) Routes() map[string]*RPCFunc
Routes 创建接口路由映射
func (DefaultHandlerService) Subscribe ¶
func (s DefaultHandlerService) Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscribe, error)
Subscribe 指定订阅条件,订阅交易事件。
func (DefaultHandlerService) Unsubscribe ¶
func (s DefaultHandlerService) Unsubscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultUnsubscribe, error)
Unsubscribe 根据具体订阅条件,取消交易事件的订阅。
func (DefaultHandlerService) UnsubscribeAll ¶
func (s DefaultHandlerService) UnsubscribeAll(wsCtx rpctypes.WSRPCContext) (*ctypes.ResultUnsubscribe, error)
UnsubscribeAll 取消全部交易事件订阅。
type HandlerService ¶
type HandlerService interface { Start() error Stop() error GetCodec() *amino.Codec PublishTx(tx *txs.TxQcp) error PublishEvent(e *tmtypes.EventDataTx) error CancelTx(tx *txs.TxQcp) error }
HandlerService 中继基础服务封装接口
func NewHandlerService ¶
func NewHandlerService(name, id, listenAddr string) (HandlerService, error)
NewHandlerService 创建新服务管理实例
type MempoolMocker ¶
type MempoolMocker struct { }
MempoolMocker mock 实现,用于事件订阅和RPC 服务模拟
func (MempoolMocker) EnableTxsAvailable ¶
func (m MempoolMocker) EnableTxsAvailable()
EnableTxsAvailable 接口方法实现
func (MempoolMocker) TxsAvailable ¶
func (m MempoolMocker) TxsAvailable() <-chan struct{}
TxsAvailable 接口方法实现
type Option ¶
type Option func(*Server)
Option sets a parameter for the server.
func BufferCapacity ¶
BufferCapacity allows you to specify capacity for the internal server's queue. Since the server, given Y subscribers, could only process X messages, this option could be used to survive spikes (e.g. high amount of transactions during peak hours).
type RPCFunc ¶
type RPCFunc struct {
// contains filtered or unexported fields
}
RPCFunc contains the introspected type information for a function
func NewRPCFunc ¶
NewRPCFunc wraps a function for introspection. f is the function, args are comma separated argument names
func NewWSRPCFunc ¶
NewWSRPCFunc wraps a function for introspection and use in the websockets.
type ResponseWriterWrapper ¶
type ResponseWriterWrapper struct { Status int http.ResponseWriter }
ResponseWriterWrapper Remember the status for logging
func (*ResponseWriterWrapper) Hijack ¶
func (w *ResponseWriterWrapper) Hijack() (net.Conn, *bufio.ReadWriter, error)
Hijack implements http.Hijacker
func (*ResponseWriterWrapper) WriteHeader ¶
func (w *ResponseWriterWrapper) WriteHeader(status int)
WriteHeader write header
type Server ¶
type Server struct { BaseService // contains filtered or unexported fields }
Server allows clients to subscribe/unsubscribe for messages, publishing messages with or without tags, and manages internal state.
func NewServer ¶
NewServer returns a new server. See the commentary on the Option functions for a detailed description of how to configure buffering. If no options are provided, the resulting server's queue is unbuffered.
func (*Server) BufferCapacity ¶
BufferCapacity returns capacity of the internal server's queue.
func (*Server) OnStop ¶
func (s *Server) OnStop()
OnStop implements Service.OnStop by shutting down the server.
func (*Server) Publish ¶
Publish publishes the given message. An error will be returned to the caller if the context is canceled.
func (*Server) PublishWithTags ¶
PublishWithTags publishes the given message with the set of tags. The set is matched with clients queries. If there is a match, the message is sent to the client.
func (*Server) Subscribe ¶
func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, out chan<- interface{}) error
Subscribe creates a subscription for the given client. It accepts a channel on which messages matching the given query can be received. An error will be returned to the caller if the context is canceled or if subscription already exist for pair clientID and query.
func (*Server) Unsubscribe ¶
Unsubscribe removes the subscription on the given query. An error will be returned to the caller if the context is canceled or if subscription does not exist.
type Service ¶
type Service interface { // Start the service. // If it's already started or stopped, will return an error. // If OnStart() returns an error, it's returned by Start() Start() error OnStart() error // Stop the service. // If it's already stopped, will return an error. // OnStop must never error. Stop() error OnStop() // Reset the service. // Panics by default - must be overwritten to enable reset. Reset() error OnReset() error // Return true if the service is running IsRunning() bool // Quit returns a channel, which is closed once service is stopped. Quit() <-chan struct{} // String representation of the service String() string }
Service defines a service that can be started, stopped, and reset.
type TagMap ¶
type TagMap interface { // Get returns the value for a key, or nil if no value is present. // The ok result indicates whether value was found in the tags. Get(key string) (value string, ok bool) // Len returns the number of tags. Len() int }
TagMap is used to associate tags to a message. They can be queried by subscribers to choose messages they will received.
type WebsocketManager ¶
WebsocketManager provides a WS handler for incoming connections and passes a map of functions along with any additional params to new connections. NOTE: The websocket path is defined externally, e.g. in node/node.go
func NewWebsocketManager ¶
func NewWebsocketManager(funcMap map[string]*RPCFunc, cdc *amino.Codec, wsConnOptions ...func(*wsConnection)) *WebsocketManager
NewWebsocketManager returns a new WebsocketManager that passes a map of functions, connection options and logger to new WS connections.
func (*WebsocketManager) WebsocketHandler ¶
func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Request)
WebsocketHandler upgrades the request/response (via http.Hijack) and starts the wsConnection.