Documentation ¶
Index ¶
- Constants
- func AddListener(name string, ...)
- type BaseJob
- func (e *BaseJob) CreatedAt() time.Time
- func (e *BaseJob) FromChainID() *big.Int
- func (e *BaseJob) GetBackOff() int
- func (e *BaseJob) GetData() []byte
- func (e *BaseJob) GetID() int32
- func (e *BaseJob) GetListener() Listener
- func (e *BaseJob) GetMaxTry() int
- func (e *BaseJob) GetNextTry() int64
- func (e *BaseJob) GetRetryCount() int
- func (e *BaseJob) GetSubscriptionName() string
- func (e *BaseJob) GetTransaction() Transaction
- func (e *BaseJob) GetType() int
- func (e *BaseJob) GetValue() *big.Int
- func (e *BaseJob) Hash() common.Hash
- func (e *BaseJob) IncreaseRetryCount()
- func (e *BaseJob) Process() ([]byte, error)
- func (e *BaseJob) Save(status string) error
- func (e *BaseJob) SetID(id int32)
- func (e *BaseJob) String() string
- func (e *BaseJob) UpdateNextTry(nextTry int64)
- func (e *BaseJob) Utils() utils.Utils
- type Block
- type BridgeWorker
- type Config
- type Controller
- type EmptyTransaction
- type Handler
- type Job
- type JobHandler
- type Listener
- type ListenerStats
- type Log
- type LsConfig
- type Pool
- func (p *Pool) AddWorkers(workers []Worker)
- func (p *Pool) Enqueue(job JobHandler)
- func (p *Pool) IsClosed() bool
- func (p *Pool) PrepareRetryableJob(job JobHandler)
- func (p *Pool) RetryJob(job JobHandler)
- func (p *Pool) SendJobToWorker(workerCh chan JobHandler, job JobHandler)
- func (p *Pool) Start(closeFunc func())
- func (p *Pool) Stats() Stats
- func (p *Pool) Wait()
- type Receipt
- type Secret
- type Stats
- type Subscribe
- type TaskHandler
- type Transaction
- type Worker
Constants ¶
View Source
const ( ListenHandler = iota CallbackHandler )
View Source
const ( TxEvent = iota LogEvent )
Variables ¶
This section is empty.
Functions ¶
Types ¶
type BaseJob ¶
type BaseJob struct {
// contains filtered or unexported fields
}
func NewBaseJob ¶
func (*BaseJob) FromChainID ¶
func (*BaseJob) GetBackOff ¶
func (*BaseJob) GetListener ¶
func (*BaseJob) GetNextTry ¶
func (*BaseJob) GetRetryCount ¶
func (*BaseJob) GetSubscriptionName ¶
func (*BaseJob) GetTransaction ¶
func (e *BaseJob) GetTransaction() Transaction
func (*BaseJob) IncreaseRetryCount ¶
func (e *BaseJob) IncreaseRetryCount()
func (*BaseJob) UpdateNextTry ¶
type BridgeWorker ¶
type BridgeWorker struct {
// contains filtered or unexported fields
}
func (*BridgeWorker) Channel ¶
func (w *BridgeWorker) Channel() chan JobHandler
func (*BridgeWorker) Close ¶
func (w *BridgeWorker) Close()
func (*BridgeWorker) Context ¶
func (w *BridgeWorker) Context() context.Context
func (*BridgeWorker) ProcessJob ¶
func (w *BridgeWorker) ProcessJob(job JobHandler) error
func (*BridgeWorker) Stop ¶
func (w *BridgeWorker) Stop()
func (*BridgeWorker) String ¶
func (w *BridgeWorker) String() string
func (*BridgeWorker) Wait ¶
func (w *BridgeWorker) Wait()
type Config ¶
type Config struct { Listeners map[string]*LsConfig `json:"listeners" mapstructure:"listeners"` NumberOfWorkers int `json:"numberOfWorkers" mapstructure:"numberOfWorkers"` MaxQueueSize int `json:"maxQueueSize" mapstructure:"maxQueueSize"` MaxRetry int32 `json:"maxRetry" mapstructure:"maxRetry"` BackOff int32 `json:"backoff" mapstructure:"backoff"` DB *stores.Database `json:"database" mapstructure:"database"` // this field is used for testing purpose Testing bool }
type Controller ¶
type Controller struct { HandlerABIs map[string]*abi.ABI Pool *Pool // contains filtered or unexported fields }
func NewWithContext ¶ added in v0.1.3
func (*Controller) Close ¶
func (c *Controller) Close()
func (*Controller) LoadABIsFromConfig ¶
func (c *Controller) LoadABIsFromConfig(lsConfig *LsConfig) (err error)
LoadABIsFromConfig loads all ABIPath and add results to Handler.ABI
func (*Controller) Start ¶
func (c *Controller) Start() error
type EmptyTransaction ¶
type EmptyTransaction struct {
// contains filtered or unexported fields
}
func NewEmptyTransaction ¶
func (*EmptyTransaction) GetData ¶
func (b *EmptyTransaction) GetData() []byte
func (*EmptyTransaction) GetFromAddress ¶
func (b *EmptyTransaction) GetFromAddress() string
func (*EmptyTransaction) GetHash ¶
func (b *EmptyTransaction) GetHash() common.Hash
func (*EmptyTransaction) GetToAddress ¶
func (b *EmptyTransaction) GetToAddress() string
func (*EmptyTransaction) GetValue ¶
func (b *EmptyTransaction) GetValue() *big.Int
type Handler ¶
type Handler struct { // Contract Name that will be used to get ABI Contract string `json:"contract" mapstructure:"contract"` // Name is method/event name Name string `json:"name" mapstructure:"name"` // ContractAddress is used in callback case ContractAddress string `json:"contractAddress" mapstructure:"contractAddress"` // Listener who triggers callback event Listener string `json:"listener" mapstructure:"listener"` ABI *abi.ABI `json:"-"` // HandleMethod is used when processing listened job, do nothing if it is empty HandleMethod string `json:"handleMethod" mapstructure:"handleMethod"` }
type Job ¶
type JobHandler ¶
type JobHandler interface { GetID() int32 GetType() int GetRetryCount() int GetNextTry() int64 GetMaxTry() int GetData() []byte GetValue() *big.Int GetBackOff() int Process() ([]byte, error) Hash() common.Hash IncreaseRetryCount() UpdateNextTry(int64) GetListener() Listener GetSubscriptionName() string GetTransaction() Transaction FromChainID() *big.Int Save(string) error CreatedAt() time.Time String() string }
type Listener ¶
type Listener interface { GetName() string GetStore() stores.MainStore Config() *LsConfig Period() time.Duration GetSafeBlockRange() uint64 GetCurrentBlock() Block GetLatestBlock() (Block, error) GetLatestBlockHeight() (uint64, error) GetBlock(height uint64) (Block, error) GetBlockWithLogs(height uint64) (Block, error) GetChainID() (*big.Int, error) GetReceipt(common.Hash) (*types.Receipt, error) Context() context.Context GetSubscriptions() map[string]*Subscribe UpdateCurrentBlock(block Block) error SaveCurrentBlockToDB() error SaveTransactionsToDB(txs []Transaction) error GetListenHandleJob(subscriptionName string, tx Transaction, eventId string, data []byte) JobHandler SendCallbackJobs(listeners map[string]Listener, subscriptionName string, tx Transaction, inputData []byte) NewJobFromDB(job *models.Job) (JobHandler, error) Start() Close() IsDisabled() bool SetInitHeight(uint64) GetInitHeight() uint64 GetEthClient() utils.EthClient GetTasks() []TaskHandler GetTask(index int) TaskHandler AddTask(handler TaskHandler) IsUpTodate() bool GetBridgeOperatorSign() utils.ISign GetVoterSign() utils.ISign GetRelayerSign() utils.ISign GetLegacyBridgeOperatorSign() utils.ISign AddListeners(map[string]Listener) // GetListener returns listener by name GetListener(string) Listener CacheBlocks(blockNumbers map[uint64]struct{}) }
type ListenerStats ¶ added in v0.1.3
type LsConfig ¶
type LsConfig struct { ChainId string `json:"chainId" mapstructure:"chainId"` Name string `json:"-"` RpcUrl string `json:"rpcUrl" mapstructure:"rpcUrl"` LoadInterval time.Duration `json:"blockTime" mapstructure:"blockTime"` SafeBlockRange uint64 `json:"safeBlockRange" mapstructure:"safeBlockRange"` FromHeight uint64 `json:"fromHeight" mapstructure:"fromHeight"` TaskInterval time.Duration `json:"taskInterval" mapstructure:"taskInterval"` Disabled bool `json:"disabled" mapstructure:"disabled"` // TODO: apply more ways to get privatekey. such as: PLAINTEXT, KMS, etc. Secret *Secret `json:"secret" mapstructure:"secret"` Subscriptions map[string]*Subscribe `json:"subscriptions" mapstructure:"subscriptions"` TransactionCheckPeriod time.Duration `json:"transactionCheckPeriod" mapstructure:"transactionCheckPeriod"` Contracts map[string]string `json:"contracts" mapstructure:"contracts"` ProcessWithinBlocks uint64 `json:"processWithinBlocks" mapstructure:"processWithinBlocks"` MaxTasksQuery int `json:"maxTasksQuery" mapstructure:"maxTasksQuery"` MinTasksQuery int `json:"minTasksQuery" mapstructure:"minTasksQuery"` // GetLogsBatchSize is used at batch size when calling processBatchLogs GetLogsBatchSize int `json:"getLogsBatchSize" mapstructure:"getLogsBatchSize"` // MaxProcessingTasks is used to specify max processing tasks allowed while processing tasks // if number of tasks reaches this number, it waits until this number decrease MaxProcessingTasks int `json:"maxProcessingTasks" mapstructure:"maxProcessingTasks"` GasLimitBumpRatio uint64 `json:"gasLimitBumpRatio" mapstructure:"gasLimitBumpRatio"` Stats *ListenerStats `json:"stats" mapstructure:"stats"` }
type Pool ¶
type Pool struct { Workers []Worker // message backoff MaxRetry int32 BackOff int32 // Queue holds a list of worker Queue chan chan JobHandler // JobChan receives new job JobChan chan JobHandler RetryJobChan chan JobHandler FailedJobChan chan JobHandler MaxQueueSize int // contains filtered or unexported fields }
func (*Pool) AddWorkers ¶
func (*Pool) Enqueue ¶
func (p *Pool) Enqueue(job JobHandler)
func (*Pool) PrepareRetryableJob ¶
func (p *Pool) PrepareRetryableJob(job JobHandler)
func (*Pool) RetryJob ¶
func (p *Pool) RetryJob(job JobHandler)
func (*Pool) SendJobToWorker ¶
func (p *Pool) SendJobToWorker(workerCh chan JobHandler, job JobHandler)
type Receipt ¶
type Receipt interface { GetTransaction() Transaction GetStatus() bool GetLogs() []Log }
type Secret ¶
type Secret struct { BridgeOperator *utils.SignMethodConfig `json:"bridgeOperator" mapstructure:"bridgeOperator"` Voter *utils.SignMethodConfig `json:"voter" mapstructure:"voter"` Relayer *utils.SignMethodConfig `json:"relayer" mapstructure:"relayer"` LegacyBridgeOperator *utils.SignMethodConfig `json:"legacyBridgeOperator" mapstructure:"legacyBridgeOperator"` }
type Subscribe ¶
type Subscribe struct { From string `json:"from" mapstructure:"from"` To string `json:"to" mapstructure:"to"` // Type can be either TxEvent or LogEvent Type int `json:"type" mapstructure:"type"` Handler *Handler `json:"handler" mapstructure:"handler"` CallBacks map[string]string `json:"callbacks" mapstructure:"callbacks"` }
type TaskHandler ¶
type Transaction ¶
type Worker ¶
type Worker interface { Context() context.Context Close() ProcessJob(job JobHandler) error Stop() Channel() chan JobHandler Wait() }
Click to show internal directories.
Click to hide internal directories.