Documentation ¶
Index ¶
- Constants
- Variables
- func CollectProposalsForPublish(passed, failed []gov.SimpleProposal) (Proposals, SideProposals)
- func DelistTradingPairForPublish(ctx sdk.Context, dexKeeper *orderPkg.DexKeeper, symbol string)
- func ExpireOrdersForPublish(dexKeeper *orderPkg.DexKeeper, ctx sdk.Context, blockTime time.Time)
- func GetAccountBalances(mapper auth.AccountKeeper, ctx sdk.Context, accSlices ...[]string) (res map[string]Account)
- func GetTradeAndOrdersRelatedAccounts(tradesToPublish []*Trade, orderChanges orderPkg.OrderChanges, ...) []string
- func Publish(publisher MarketDataPublisher, metrics *Metrics, Logger tmlog.Logger, ...)
- func PublishEvent(publisher MarketDataPublisher, Logger tmlog.Logger, ...)
- func Stop(publisher MarketDataPublisher)
- func Timer(logger tmlog.Logger, description string, op func()) (durationMs int64)
- type Account
- type Accounts
- type AggregatedMarketDataPublisher
- type AllocatedAmt
- type AssetBalance
- type AvroOrJsonMsg
- type Block
- type BlockFee
- type BlockInfoToPublish
- type Books
- type BreatheBlockMsg
- type Coin
- type CompletedReDelegation
- type CompletedUnbondingDelegation
- type CrossReceiver
- type CrossTransfer
- type CrossTransfers
- type CryptoBlock
- type DelegateEvent
- type Delegation
- type Distribution
- type DistributionMsg
- type EssMsg
- type ExecutionResults
- type Input
- type KafkaMarketDataPublisher
- type LocalMarketDataPublisher
- type MarketDataPublisher
- type Metrics
- type Mirror
- type Mirrors
- type MockMarketDataPublisher
- type NativeBlockMeta
- type NativeTransaction
- type Order
- type OrderBookDelta
- type OrderSymbolId
- type Orders
- type Output
- type PriceLevel
- type Proposal
- type ProposalStatus
- type Proposals
- type ReDelegation
- type Receiver
- type RedelegateEvent
- type Reward
- type SideProposal
- type SideProposals
- type Slash
- type SlashMsg
- type StakeUpdates
- type StakingMsg
- type Trade
- type Transaction
- type Transfer
- type Transfers
- type UnbondingDelegation
- type UndelegateEvent
- type Validator
Constants ¶
const ( // TODO(#66): revisit the setting / whole thread model here, // do we need better way to make main thread less possibility to block TransferCollectionChannelSize = 4000 ToRemoveOrderIdChannelSize = 1000 MaxOrderBookLevel = 100 )
const (
KafkaBrokerSep = ";"
)
Variables ¶
var ( Logger tmlog.Logger Cfg *config.PublicationConfig ToPublishCh chan BlockInfoToPublish ToRemoveOrderIdCh chan OrderSymbolId // order symbol and ids to remove from keeper.OrderInfoForPublish IsLive bool ToPublishEventCh chan *sub.ToPublishEvent )
var Pool = newPool()
block level pool
Functions ¶
func CollectProposalsForPublish ¶
func CollectProposalsForPublish(passed, failed []gov.SimpleProposal) (Proposals, SideProposals)
func ExpireOrdersForPublish ¶
func GetAccountBalances ¶
func GetTradeAndOrdersRelatedAccounts ¶
func GetTradeAndOrdersRelatedAccounts(tradesToPublish []*Trade, orderChanges orderPkg.OrderChanges, orderInfosForPublish orderPkg.OrderInfoForPublish) []string
func Publish ¶
func Publish( publisher MarketDataPublisher, metrics *Metrics, Logger tmlog.Logger, cfg *config.PublicationConfig, ToPublishCh <-chan BlockInfoToPublish)
func PublishEvent ¶
func PublishEvent( publisher MarketDataPublisher, Logger tmlog.Logger, cfg *config.PublicationConfig, ToPublishEventCh <-chan *sub.ToPublishEvent)
func Stop ¶
func Stop(publisher MarketDataPublisher)
Types ¶
type Account ¶
type Account struct { Owner string // string representation of AccAddress Fee string Sequence int64 Balances []*AssetBalance }
func (*Account) MarshalJSON ¶
func (*Account) ToNativeMap ¶
type Accounts ¶
func (*Accounts) EmptyCopy ¶
func (msg *Accounts) EmptyCopy() AvroOrJsonMsg
func (*Accounts) EssentialMsg ¶
func (*Accounts) ToNativeMap ¶
type AggregatedMarketDataPublisher ¶
type AggregatedMarketDataPublisher struct {
// contains filtered or unexported fields
}
func NewAggregatedMarketDataPublisher ¶
func NewAggregatedMarketDataPublisher(publishers ...MarketDataPublisher) (publisher *AggregatedMarketDataPublisher)
func (*AggregatedMarketDataPublisher) Stop ¶
func (publisher *AggregatedMarketDataPublisher) Stop()
type AllocatedAmt ¶
func (*AllocatedAmt) String ¶
func (msg *AllocatedAmt) String() string
type AssetBalance ¶
func (*AssetBalance) String ¶
func (msg *AssetBalance) String() string
func (*AssetBalance) ToNativeMap ¶
func (msg *AssetBalance) ToNativeMap() map[string]interface{}
type AvroOrJsonMsg ¶
type Block ¶
type Block struct { ChainID string CryptoBlock CryptoBlock }
func GetBlockPublished ¶
func (Block) ToNativeMap ¶
type BlockFee ¶
type BlockFee struct { Height int64 Fee string Validators []string // slice of string wrappers of bytes representation of sdk.AccAddress }
deliberated not implemented Ess
func (BlockFee) MarshalJSON ¶
func (BlockFee) ToNativeMap ¶
type BlockInfoToPublish ¶
type BlockInfoToPublish struct {
// contains filtered or unexported fields
}
intermediate data structures to deal with concurrent publication between main thread and publisher thread
func NewBlockInfoToPublish ¶
func NewBlockInfoToPublish( height int64, timestamp int64, tradesToPublish []*Trade, proposalsToPublish *Proposals, sideProposalsToPublish *SideProposals, stakeUpdates *StakeUpdates, orderChanges orderPkg.OrderChanges, orderInfos orderPkg.OrderInfoForPublish, accounts map[string]Account, latestPriceLevels orderPkg.ChangedPriceLevelsMap, blockFee BlockFee, feeHolder orderPkg.FeeHolder, transfers *Transfers, block *Block) BlockInfoToPublish
type Books ¶
type Books struct { Height int64 Timestamp int64 NumOfMsgs int Books []OrderBookDelta }
deliberated not implemented Ess
func (*Books) ToNativeMap ¶
type BreatheBlockMsg ¶
func (*BreatheBlockMsg) EmptyCopy ¶
func (msg *BreatheBlockMsg) EmptyCopy() AvroOrJsonMsg
func (*BreatheBlockMsg) EssentialMsg ¶
func (msg *BreatheBlockMsg) EssentialMsg() string
func (*BreatheBlockMsg) String ¶
func (msg *BreatheBlockMsg) String() string
func (*BreatheBlockMsg) ToNativeMap ¶
func (msg *BreatheBlockMsg) ToNativeMap() map[string]interface{}
type Coin ¶
func (Coin) ToNativeMap ¶
type CompletedReDelegation ¶
type CompletedReDelegation struct { Delegator sdk.AccAddress ValidatorSrc sdk.ValAddress ValidatorDst sdk.ValAddress }
func (*CompletedReDelegation) String ¶
func (msg *CompletedReDelegation) String() string
type CompletedUnbondingDelegation ¶
type CompletedUnbondingDelegation struct { Validator sdk.ValAddress Delegator sdk.AccAddress Amount Coin }
func (*CompletedUnbondingDelegation) String ¶
func (msg *CompletedUnbondingDelegation) String() string
type CrossReceiver ¶
func (CrossReceiver) String ¶
func (msg CrossReceiver) String() string
func (CrossReceiver) ToNativeMap ¶
func (msg CrossReceiver) ToNativeMap() map[string]interface{}
type CrossTransfer ¶
type CrossTransfer struct { TxHash string ChainId string RelayerFee int64 Type string From string Denom string Contract string Decimals int To []CrossReceiver }
func (CrossTransfer) String ¶
func (msg CrossTransfer) String() string
func (CrossTransfer) ToNativeMap ¶
func (msg CrossTransfer) ToNativeMap() map[string]interface{}
type CrossTransfers ¶
type CrossTransfers struct { Height int64 Num int Timestamp int64 Transfers []CrossTransfer }
deliberated not implemented Ess
func (CrossTransfers) String ¶
func (msg CrossTransfers) String() string
func (CrossTransfers) ToNativeMap ¶
func (msg CrossTransfers) ToNativeMap() map[string]interface{}
type CryptoBlock ¶
type CryptoBlock struct { BlockHash string ParentHash string BlockHeight int64 Timestamp string TxTotal int64 BlockMeta NativeBlockMeta Transactions []Transaction }
func (CryptoBlock) String ¶
func (msg CryptoBlock) String() string
func (CryptoBlock) ToNativeMap ¶
func (msg CryptoBlock) ToNativeMap() map[string]interface{}
type DelegateEvent ¶
type DelegateEvent struct { Delegator sdk.AccAddress Validator sdk.ValAddress Amount Coin TxHash string }
func (*DelegateEvent) String ¶
func (msg *DelegateEvent) String() string
type Delegation ¶
type Delegation stake.Delegation
func (*Delegation) String ¶
func (msg *Delegation) String() string
type Distribution ¶
type Distribution struct { Validator sdk.ValAddress SelfDelegator sdk.AccAddress DistributeAddr sdk.AccAddress ValTokens int64 TotalReward int64 Commission int64 Rewards []*Reward }
func (*Distribution) String ¶
func (msg *Distribution) String() string
type DistributionMsg ¶
type DistributionMsg struct { NumOfMsgs int Height int64 Timestamp int64 Distributions map[string][]*Distribution }
distribution message
func (*DistributionMsg) EmptyCopy ¶
func (msg *DistributionMsg) EmptyCopy() AvroOrJsonMsg
func (*DistributionMsg) EssentialMsg ¶
func (msg *DistributionMsg) EssentialMsg() string
func (*DistributionMsg) String ¶
func (msg *DistributionMsg) String() string
func (*DistributionMsg) ToNativeMap ¶
func (msg *DistributionMsg) ToNativeMap() map[string]interface{}
type EssMsg ¶
type EssMsg interface { AvroOrJsonMsg // a string that carry essential msg used to make up downstream service on kafka issue // this string would be persisted into file EssentialMsg() string // an empty message of original `AvroOrJsonMsg` to make downstream logic not broken EmptyCopy() AvroOrJsonMsg }
EssMsg is a type when AvroOrJsonMsg failed to publish Not all AvroOrJsonMsg implemented Ess because:
for transfer:
1. qs doesn't subscribe to its topic (risk control is relying on that) 2. risk control can recover from explorer indexed transfers (pull mode) 3. we don't have a unique representation of transfer like order-id (we didn't save txhash in message)
for trade: the problem is same with above point 3, (trade id is only generated during publication, not persisted anywhere). If we keep qty, price, sid, bid for a trade, it would be too much, in this case we should recover from local publisher
type ExecutionResults ¶
type ExecutionResults struct { Height int64 Timestamp int64 // milli seconds since Epoch NumOfMsgs int // number of individual messages we published, consumer can verify messages they received against this field to make sure they does not miss messages Trades trades Orders Orders Proposals Proposals StakeUpdates StakeUpdates }
func (*ExecutionResults) EmptyCopy ¶
func (msg *ExecutionResults) EmptyCopy() AvroOrJsonMsg
func (*ExecutionResults) EssentialMsg ¶
func (msg *ExecutionResults) EssentialMsg() string
func (*ExecutionResults) String ¶
func (msg *ExecutionResults) String() string
func (*ExecutionResults) ToNativeMap ¶
func (msg *ExecutionResults) ToNativeMap() map[string]interface{}
type Input ¶
func (Input) ToNativeMap ¶
type KafkaMarketDataPublisher ¶
type KafkaMarketDataPublisher struct {
// contains filtered or unexported fields
}
func NewKafkaMarketDataPublisher ¶
func NewKafkaMarketDataPublisher( logger log.Logger, dbDir string, failFast bool) (publisher *KafkaMarketDataPublisher)
func (*KafkaMarketDataPublisher) Stop ¶
func (publisher *KafkaMarketDataPublisher) Stop()
type LocalMarketDataPublisher ¶
type LocalMarketDataPublisher struct {
// contains filtered or unexported fields
}
Publish market data to local marketdata dir in bnbchaind home each message will be in json format one line in file file can be compressed and auto-rotated
func NewLocalMarketDataPublisher ¶
func NewLocalMarketDataPublisher( dataPath string, tmLogger tmLogger.Logger, config *config.PublicationConfig) (publisher *LocalMarketDataPublisher)
func (*LocalMarketDataPublisher) Stop ¶
func (publisher *LocalMarketDataPublisher) Stop()
type MarketDataPublisher ¶
type MarketDataPublisher interface { Stop() // contains filtered or unexported methods }
type Metrics ¶
type Metrics struct { // Height of last published message PublicationHeight metricsPkg.Gauge // Size of publication queue PublicationQueueSize metricsPkg.Gauge // Time between publish this and the last block. // Should be (approximate) blocking + abci + publication time PublicationBlockIntervalMs metricsPkg.Gauge // Time used to collect block information CollectBlockTimeMs metricsPkg.Gauge // Time used to collect orderbook information CollectOrderBookTimeMs metricsPkg.Gauge // Time used to publish everything in a block // Should be (approximate) sum of folllowing Times PublishTotalTimeMs metricsPkg.Gauge // Time used to publish order & trade PublishTradeAndOrderTimeMs metricsPkg.Gauge // Time used to publish orderbook PublishOrderbookTimeMs metricsPkg.Gauge // Time used to publish accounts PublishAccountTimeMs metricsPkg.Gauge // Time used to publish blockfee PublishBlockfeeTimeMs metricsPkg.Gauge // Time used to publish transfer PublishTransfersTimeMs metricsPkg.Gauge // Time used to publish block PublishBlockTimeMs metricsPkg.Gauge // Time used to publish sideProposal PublishSideProposalTimeMs metricsPkg.Gauge // num of trade NumTrade metricsPkg.Gauge // num of order NumOrder metricsPkg.Gauge // num of orderbook levels NumOrderBook metricsPkg.Gauge // num of account balance changes NumAccounts metricsPkg.Gauge // num of transfer NumTransfers metricsPkg.Gauge NumOrderInfoForPublish metricsPkg.Gauge }
Metrics contains metrics exposed by this package.
func PrometheusMetrics ¶
func PrometheusMetrics() *Metrics
PrometheusMetrics returns Metrics build using Prometheus client library.
type Mirror ¶
type Mirror struct { TxHash string ChainId string Type string RelayerFee int64 Sender string Contract string BEP20Name string BEP20Symbol string BEP2Symbol string OldTotalSupply int64 TotalSupply int64 Decimals int Fee int64 }
func (Mirror) ToNativeMap ¶
type MockMarketDataPublisher ¶
type MockMarketDataPublisher struct { AccountPublished []*Accounts BooksPublished []*Books ExecutionResultsPublished []*ExecutionResults BlockFeePublished []BlockFee TransferPublished []Transfers BlockPublished []*Block Lock *sync.Mutex // as mock publisher is only used in testing, its no harm to have this granularity Lock MessagePublished uint32 // atomic integer used to determine the published messages }
func NewMockMarketDataPublisher ¶
func NewMockMarketDataPublisher() (publisher *MockMarketDataPublisher)
func (*MockMarketDataPublisher) Stop ¶
func (publisher *MockMarketDataPublisher) Stop()
type NativeBlockMeta ¶
type NativeBlockMeta struct { LastCommitHash string DataHash string ValidatorsHash string NextValidatorsHash string ConsensusHash string AppHash string LastResultsHash string EvidenceHash string ProposerAddress string }
func (NativeBlockMeta) String ¶
func (msg NativeBlockMeta) String() string
func (NativeBlockMeta) ToNativeMap ¶
func (msg NativeBlockMeta) ToNativeMap() map[string]interface{}
type NativeTransaction ¶
type NativeTransaction struct { Source int64 TxType string TxAsset string OrderId string Code uint32 Data string ProposalId int64 }
func (NativeTransaction) String ¶
func (msg NativeTransaction) String() string
func (NativeTransaction) ToNativeMap ¶
func (msg NativeTransaction) ToNativeMap() map[string]interface{}
type Order ¶
type Order struct { Symbol string Status orderPkg.ChangeType OrderId string TradeId string Owner string Side int8 OrderType int8 Price int64 Qty int64 LastExecutedPrice int64 LastExecutedQty int64 CumQty int64 Fee string // DEPRECATING(Galileo): total fee for Owner in this block, should use SingleFee in future OrderCreationTime int64 TransactionTime int64 TimeInForce int8 CurrentExecutionType orderPkg.ExecutionType TxHash string SingleFee string // fee for this order update - ADDED Galileo }
type OrderBookDelta ¶
type OrderBookDelta struct { Symbol string Buys []PriceLevel Sells []PriceLevel }
func (*OrderBookDelta) String ¶
func (msg *OrderBookDelta) String() string
func (*OrderBookDelta) ToNativeMap ¶
func (msg *OrderBookDelta) ToNativeMap() map[string]interface{}
type OrderSymbolId ¶
type Output ¶
func (Output) ToNativeMap ¶
type PriceLevel ¶
func (*PriceLevel) String ¶
func (msg *PriceLevel) String() string
func (*PriceLevel) ToNativeMap ¶
func (msg *PriceLevel) ToNativeMap() map[string]interface{}
type Proposal ¶
type Proposal struct { Id int64 Status ProposalStatus }
type ProposalStatus ¶
type ProposalStatus uint8
const ( Succeed ProposalStatus = iota Failed )
func (ProposalStatus) String ¶
func (this ProposalStatus) String() string
type Proposals ¶
func (*Proposals) ToNativeMap ¶
type ReDelegation ¶
type ReDelegation stake.Redelegation
func (*ReDelegation) String ¶
func (msg *ReDelegation) String() string
type Receiver ¶
func (Receiver) ToNativeMap ¶
type RedelegateEvent ¶
type RedelegateEvent struct { Delegator sdk.AccAddress ValidatorSrc sdk.ValAddress ValidatorDst sdk.ValAddress Amount Coin TxHash string }
func (*RedelegateEvent) String ¶
func (msg *RedelegateEvent) String() string
type Reward ¶
type Reward struct { Validator sdk.ValAddress Delegator sdk.AccAddress Tokens int64 Amount int64 }
type SideProposal ¶
type SideProposal struct { Id int64 ChainId string Status ProposalStatus }
func (*SideProposal) String ¶
func (msg *SideProposal) String() string
type SideProposals ¶
type SideProposals struct { Height int64 Timestamp int64 NumOfMsgs int Proposals []*SideProposal }
func (*SideProposals) String ¶
func (msg *SideProposals) String() string
func (*SideProposals) ToNativeMap ¶
func (msg *SideProposals) ToNativeMap() map[string]interface{}
type Slash ¶
type Slash struct { Validator sdk.ValAddress InfractionType byte InfractionHeight int64 JailUtil int64 SlashAmount int64 ToFeePool int64 Submitter sdk.AccAddress SubmitterReward int64 ValidatorsCompensation []*AllocatedAmt }
type SlashMsg ¶
slash message
func (*SlashMsg) EmptyCopy ¶
func (msg *SlashMsg) EmptyCopy() AvroOrJsonMsg
func (*SlashMsg) EssentialMsg ¶
func (*SlashMsg) ToNativeMap ¶
type StakeUpdates ¶
type StakeUpdates struct { NumOfMsgs int CompletedUnbondingDelegations []*CompletedUnbondingDelegation }
func CollectStakeUpdatesForPublish ¶
func CollectStakeUpdatesForPublish(unbondingDelegations []stake.UnbondingDelegation) StakeUpdates
func (*StakeUpdates) String ¶
func (msg *StakeUpdates) String() string
func (*StakeUpdates) ToNativeMap ¶
func (msg *StakeUpdates) ToNativeMap() map[string]interface{}
type StakingMsg ¶
type StakingMsg struct { NumOfMsgs int Height int64 Timestamp int64 Validators []*Validator RemovedValidators map[string][]sdk.ValAddress Delegations map[string][]*Delegation UnbondingDelegations map[string][]*UnbondingDelegation ReDelegations map[string][]*ReDelegation CompletedUBDs map[string][]*CompletedUnbondingDelegation CompletedREDs map[string][]*CompletedReDelegation DelegateEvents map[string][]*DelegateEvent UndelegateEvents map[string][]*UndelegateEvent RedelegateEvents map[string][]*RedelegateEvent ElectedValidators map[string][]*Validator }
staking message
func (*StakingMsg) EmptyCopy ¶
func (msg *StakingMsg) EmptyCopy() AvroOrJsonMsg
func (*StakingMsg) EssentialMsg ¶
func (msg *StakingMsg) EssentialMsg() string
func (*StakingMsg) String ¶
func (msg *StakingMsg) String() string
func (*StakingMsg) ToNativeMap ¶
func (msg *StakingMsg) ToNativeMap() map[string]interface{}
type Trade ¶
type Trade struct { Id string Symbol string Price int64 Qty int64 Sid string Bid string Sfee string // DEPRECATING(Galileo): seller's total fee in this block, in future we should use SSingleFee which is more precise Bfee string // DEPRECATING(Galileo): buyer's total fee in this block, in future we should use BSingleFee which is more precise SAddr string // string representation of AccAddress BAddr string // string representation of AccAddress SSrc int64 // sell order source - ADDED Galileo BSrc int64 // buy order source - ADDED Galileo SSingleFee string // seller's fee for this trade - ADDED Galileo BSingleFee string // buyer's fee for this trade - ADDED Galileo TickType int // ADDED Galileo }
func (*Trade) MarshalJSON ¶
type Transaction ¶
type Transaction struct { TxHash string Fee string Timestamp string Inputs []Input Outputs []Output NativeTransaction NativeTransaction }
func (Transaction) String ¶
func (msg Transaction) String() string
func (Transaction) ToNativeMap ¶
func (msg Transaction) ToNativeMap() map[string]interface{}
type Transfer ¶
func (Transfer) ToNativeMap ¶
type Transfers ¶
deliberated not implemented Ess
func GetTransferPublished ¶
func (Transfers) ToNativeMap ¶
type UnbondingDelegation ¶
type UnbondingDelegation stake.UnbondingDelegation
func (*UnbondingDelegation) String ¶
func (msg *UnbondingDelegation) String() string
type UndelegateEvent ¶
type UndelegateEvent struct { Delegator sdk.AccAddress Validator sdk.ValAddress Amount Coin TxHash string }
func (*UndelegateEvent) String ¶
func (msg *UndelegateEvent) String() string