Documentation ¶
Index ¶
- Constants
- Variables
- func BeginBlocker(ctx sdk.Context, keeper Keeper)
- func CreateGenAccounts(numAccs int, genCoins sdk.Coins) (addrKeysSlice mock.AddrKeysSlice, genAccs []auth.Account)
- func EndBlocker(ctx sdk.Context, k Keeper)
- func MockApplyBlock(app *MockApp, blockHeight int64, txs []auth.StdTx)
- func NewKafkaEngine(url string, logger log.Logger, cfg *appCfg.StreamConfig) (types.IStreamEngine, error)
- func NewMySQLEngine(url string, log log.Logger, cfg *appCfg.StreamConfig) (types.IStreamEngine, error)
- func NewPulsarEngine(url string, logger log.Logger, cfg *appCfg.StreamConfig) (types.IStreamEngine, error)
- func NewRedisEngine(url string, logger log.Logger, cfg *appCfg.StreamConfig) (types.IStreamEngine, error)
- func ParseStreamEngineConfig(logger log.Logger, cfg *appCfg.StreamConfig) (map[EngineKind]types.IStreamEngine, error)
- func ProduceOrderTxs(app *MockApp, ctx sdk.Context, numToGenerate int, addrKeys mock.AddrKeys, ...) []auth.StdTx
- type AppModule
- func (am AppModule) BeginBlock(ctx sdk.Context, _ abci.RequestBeginBlock)
- func (am AppModule) EndBlock(ctx sdk.Context, _ abci.RequestEndBlock) []abci.ValidatorUpdate
- func (am AppModule) ExportGenesis(ctx sdk.Context) json.RawMessage
- func (am AppModule) InitGenesis(ctx sdk.Context, data json.RawMessage) []abci.ValidatorUpdate
- func (AppModule) Name() string
- func (am AppModule) NewHandler() sdk.Handler
- func (am AppModule) NewQuerierHandler() sdk.Querier
- func (am AppModule) QuerierRoute() string
- func (am AppModule) RegisterInvariants(ir sdk.InvariantRegistry)
- func (am AppModule) Route() string
- type AppModuleBasic
- func (AppModuleBasic) DefaultGenesis() json.RawMessage
- func (AppModuleBasic) GetQueryCmd(cdc *codec.Codec) *cobra.Command
- func (AppModuleBasic) GetTxCmd(cdc *codec.Codec) *cobra.Command
- func (AppModuleBasic) Name() string
- func (AppModuleBasic) RegisterCodec(cdc *codec.Codec)
- func (AppModuleBasic) RegisterRESTRoutes(ctx context.CLIContext, rtr *mux.Router)
- func (AppModuleBasic) ValidateGenesis(bz json.RawMessage) error
- type AtomTaskResult
- type AtomTaskRunner
- type BaseMarketKeeper
- type CacheQueue
- type Context
- type Coordinator
- type DexKeeper
- type EngineCreator
- type EngineKind
- type KafkaEngine
- type Keeper
- func (k Keeper) AnalysisEnable() bool
- func (k Keeper) GetMarketKeeper() MarketKeeper
- func (k Keeper) OnAccountUpdated(acc auth.Account)
- func (k Keeper) OnAddNewTokenPair(ctx sdk.Context, tokenPair *dex.TokenPair)
- func (k Keeper) OnFarmClaim(ctx sdk.Context, address sdk.AccAddress, poolName string, ...)
- func (k Keeper) OnSwapCreateExchange(ctx sdk.Context, swapTokenPair ammswap.SwapTokenPair)
- func (k Keeper) OnSwapToken(ctx sdk.Context, address sdk.AccAddress, swapTokenPair ammswap.SwapTokenPair, ...)
- func (k Keeper) OnTokenPairUpdated(ctx sdk.Context)
- func (k Keeper) SyncTx(ctx sdk.Context, tx *auth.StdTx, txHash string, timestamp int64)
- type Kind
- type MarketKeeper
- type MockApp
- type MySQLEngine
- type PulsarEngine
- type RedisEngine
- type RedisMarketKeeper
- type Stream
- type Task
- type TaskConst
- type TaskWithData
Constants ¶
View Source
const ( ModuleName = types.ModuleName QuerierRoute = types.QuerierRoute RouterKey = types.RouterKey )
View Source
const ( StreamNilKind Kind = 0x00 StreamMysqlKind Kind = 0x01 StreamRedisKind Kind = 0x02 StreamPulsarKind Kind = 0x03 StreamWebSocketKind Kind = 0x04 StreamKafkaKind Kind = 0x05 EngineNilKind EngineKind = 0x00 EngineAnalysisKind EngineKind = 0x01 EngineNotifyKind EngineKind = 0x02 EngineKlineKind EngineKind = 0x03 EngineWebSocketKind EngineKind = 0x04 )
View Source
const ( NacosTmrpcUrls = "stream.tmrpc_nacos_urls" NacosTmrpcNamespaceID = "stream.tmrpc_nacos_namespace_id" NacosTmrpcAppName = "stream.tmrpc_application_name" RpcExternalAddr = "rpc.external_laddr" )
Variables ¶
View Source
var EngineKind2StreamKindMap = map[EngineKind]Kind{ EngineAnalysisKind: StreamMysqlKind, EngineNotifyKind: StreamRedisKind, EngineWebSocketKind: StreamWebSocketKind, }
View Source
var StreamKind2EngineKindMap = map[Kind]EngineKind{ StreamMysqlKind: EngineAnalysisKind, StreamRedisKind: EngineNotifyKind, StreamPulsarKind: EngineKlineKind, StreamKafkaKind: EngineKlineKind, StreamWebSocketKind: EngineWebSocketKind, }
View Source
var TaskConstDesc = map[TaskConst]string{ TaskStatusInvalid: "STREAM_TASK_STATUS_INVALID", TaskStatusSuccess: "STREAM_TASK_STATUS_SUCCESS", TaskStatusStatusFail: "STREAM_TASK_STATUS_FAIL", TaskStatusPartialSuccess: "STREAM_TASK_STATUS_PARTITIAL_SUCCESS", TaskPhase1NextActionRestart: "STREAM_TASK_PHRASE1_NEXT_ACTION_RESTART", TaskPhase1NextActionJumpNextBlock: "STREAM_TASK_PHRASE1_NEXT_ACTION_JUMP_NEXT_BLK", TaskPhase1NextActionNewTask: "STREAM_TASK_PHRASE1_NEXT_ACTION_NEW_TASK", TaskPhase1NextActionReturnTask: "STREAM_TASK_PHRASE1_NEXT_ACTION_RERUN_TASK", TaskPhase1NextActionUnknown: "STREAM_TASK_PHRASE1_NEXT_ACTION_UNKNOWN", TaskPhase2NextActionRestart: "STREAM_TASK_PHRASE2_NEXT_ACTION_RESTART", TaskPhase2NextActionJumpNextBlock: "STREAM_TASK_PHRASE2_NEXT_ACTION_JUMP_NEXT_BLK", }
Functions ¶
func BeginBlocker ¶
BeginBlocker runs the logic of BeginBlocker with version 0. BeginBlocker resets keeper cache.
func CreateGenAccounts ¶
func EndBlocker ¶
func NewKafkaEngine ¶
func NewKafkaEngine(url string, logger log.Logger, cfg *appCfg.StreamConfig) (types.IStreamEngine, error)
func NewMySQLEngine ¶
func NewMySQLEngine(url string, log log.Logger, cfg *appCfg.StreamConfig) (types.IStreamEngine, error)
func NewPulsarEngine ¶
func NewPulsarEngine(url string, logger log.Logger, cfg *appCfg.StreamConfig) (types.IStreamEngine, error)
func NewRedisEngine ¶
func NewRedisEngine(url string, logger log.Logger, cfg *appCfg.StreamConfig) (types.IStreamEngine, error)
func ParseStreamEngineConfig ¶
func ParseStreamEngineConfig(logger log.Logger, cfg *appCfg.StreamConfig) (map[EngineKind]types.IStreamEngine, error)
func ProduceOrderTxs ¶
func ProduceOrderTxs(app *MockApp, ctx sdk.Context, numToGenerate int, addrKeys mock.AddrKeys, orderMsg *ordertypes.MsgNewOrders) []auth.StdTx
Types ¶
type AppModule ¶
type AppModule struct { AppModuleBasic // contains filtered or unexported fields }
func NewAppModule ¶
NewAppModule creates a new AppModule Object
func (AppModule) BeginBlock ¶
func (am AppModule) BeginBlock(ctx sdk.Context, _ abci.RequestBeginBlock)
func (AppModule) EndBlock ¶
func (am AppModule) EndBlock(ctx sdk.Context, _ abci.RequestEndBlock) []abci.ValidatorUpdate
func (AppModule) ExportGenesis ¶
func (am AppModule) ExportGenesis(ctx sdk.Context) json.RawMessage
func (AppModule) InitGenesis ¶
func (am AppModule) InitGenesis(ctx sdk.Context, data json.RawMessage) []abci.ValidatorUpdate
func (AppModule) NewHandler ¶
func (AppModule) NewQuerierHandler ¶
func (AppModule) QuerierRoute ¶
func (AppModule) RegisterInvariants ¶
func (am AppModule) RegisterInvariants(ir sdk.InvariantRegistry)
type AppModuleBasic ¶
type AppModuleBasic struct{}
app module Basics object
func (AppModuleBasic) DefaultGenesis ¶
func (AppModuleBasic) DefaultGenesis() json.RawMessage
func (AppModuleBasic) GetQueryCmd ¶
func (AppModuleBasic) GetQueryCmd(cdc *codec.Codec) *cobra.Command
Get the root query command of this module
func (AppModuleBasic) GetTxCmd ¶
func (AppModuleBasic) GetTxCmd(cdc *codec.Codec) *cobra.Command
Get the root tx command of this module
func (AppModuleBasic) Name ¶
func (AppModuleBasic) Name() string
func (AppModuleBasic) RegisterCodec ¶
func (AppModuleBasic) RegisterCodec(cdc *codec.Codec)
func (AppModuleBasic) RegisterRESTRoutes ¶
func (AppModuleBasic) RegisterRESTRoutes(ctx context.CLIContext, rtr *mux.Router)
Register rest routes
func (AppModuleBasic) ValidateGenesis ¶
func (AppModuleBasic) ValidateGenesis(bz json.RawMessage) error
Validation check of the Genesis
type AtomTaskResult ¶
type AtomTaskResult struct {
// contains filtered or unexported fields
}
type AtomTaskRunner ¶
type AtomTaskRunner struct {
// contains filtered or unexported fields
}
type BaseMarketKeeper ¶
type BaseMarketKeeper struct { }
type CacheQueue ¶
type CacheQueue struct {
// contains filtered or unexported fields
}
func (*CacheQueue) Enqueue ¶
func (cq *CacheQueue) Enqueue(sc Context)
func (*CacheQueue) Start ¶
func (cq *CacheQueue) Start()
type Coordinator ¶
type Coordinator struct {
// contains filtered or unexported fields
}
func NewCoordinator ¶
func NewCoordinator(logger log.Logger, taskCh chan *TaskWithData, resultCh chan Task, timeout int, engineMap map[EngineKind]types.IStreamEngine) *Coordinator
type EngineCreator ¶
type EngineCreator func(url string, logger log.Logger, cfg *appCfg.StreamConfig) (types.IStreamEngine, error)
func GetEngineCreator ¶
func GetEngineCreator(eKind EngineKind, sKind Kind) (EngineCreator, error)
type EngineKind ¶
type EngineKind byte
func StringToEngineKind ¶
func StringToEngineKind(kind string) EngineKind
type KafkaEngine ¶
type KafkaEngine struct {
// contains filtered or unexported fields
}
func (*KafkaEngine) URL ¶
func (ke *KafkaEngine) URL() string
func (*KafkaEngine) Write ¶
func (ke *KafkaEngine) Write(data types.IStreamData, success *bool)
type Keeper ¶
type Keeper struct {
// contains filtered or unexported fields
}
nolint
func NewKeeper ¶
func NewKeeper(orderKeeper types.OrderKeeper, tokenKeeper types.TokenKeeper, dexKeeper types.DexKeeper, accountKeeper types.AccountKeeper, swapKeeper types.SwapKeeper, farmKeeper types.FarmKeeper, cdc *codec.Codec, logger log.Logger, cfg *config.Config, metrics *monitor.StreamMetrics) Keeper
nolint
func (Keeper) AnalysisEnable ¶
AnalysisEnable returns true when analysis is enable
func (Keeper) GetMarketKeeper ¶
func (k Keeper) GetMarketKeeper() MarketKeeper
GetMarketKeeper returns market keeper
func (Keeper) OnAccountUpdated ¶
OnAccountUpdated called by auth when account updated
func (Keeper) OnAddNewTokenPair ¶
OnAddNewTokenPair called by dex when new token pair listed
func (Keeper) OnFarmClaim ¶
func (Keeper) OnSwapCreateExchange ¶
func (k Keeper) OnSwapCreateExchange(ctx sdk.Context, swapTokenPair ammswap.SwapTokenPair)
func (Keeper) OnSwapToken ¶
func (k Keeper) OnSwapToken(ctx sdk.Context, address sdk.AccAddress, swapTokenPair ammswap.SwapTokenPair, sellAmount sdk.SysCoin, buyAmount sdk.SysCoin)
OnSwapToken called by swap
func (Keeper) OnTokenPairUpdated ¶
OnTokenPairUpdated called by dex when token pair updated
type MarketKeeper ¶
type MarketKeeper backend.MarketKeeper
type MockApp ¶
type MockApp struct { *mock.App BankKeeper bank.Keeper OrderKeeper keeper.Keeper DexKeeper dex.Keeper TokenKeeper token.Keeper // contains filtered or unexported fields }
func GetMockApp ¶
type MySQLEngine ¶
type MySQLEngine struct {
// contains filtered or unexported fields
}
func (*MySQLEngine) URL ¶
func (e *MySQLEngine) URL() string
func (*MySQLEngine) Write ¶
func (e *MySQLEngine) Write(data types.IStreamData, success *bool)
type PulsarEngine ¶
type PulsarEngine struct {
// contains filtered or unexported fields
}
func (*PulsarEngine) URL ¶
func (e *PulsarEngine) URL() string
func (*PulsarEngine) Write ¶
func (e *PulsarEngine) Write(data types.IStreamData, success *bool)
type RedisEngine ¶
type RedisEngine struct {
// contains filtered or unexported fields
}
func (*RedisEngine) URL ¶
func (e *RedisEngine) URL() string
func (*RedisEngine) Write ¶
func (e *RedisEngine) Write(data types.IStreamData, success *bool)
type RedisMarketKeeper ¶
type RedisMarketKeeper struct { *BaseMarketKeeper // contains filtered or unexported fields }
func NewRedisMarketKeeper ¶
func NewRedisMarketKeeper(client *conn.Client, logger log.Logger) *RedisMarketKeeper
func (*RedisMarketKeeper) GetKlineByProductID ¶
func (k *RedisMarketKeeper) GetKlineByProductID(productID uint64, granularity, size int) ([][]string, error)
func (*RedisMarketKeeper) GetTickerByProducts ¶
func (k *RedisMarketKeeper) GetTickerByProducts(products []string) ([]map[string]string, error)
type Stream ¶
type Stream struct { Cache *common.Cache AnalysisEnable bool // contains filtered or unexported fields }
Stream maintains the engines
func NewStream ¶
func NewStream(orderKeeper types.OrderKeeper, tokenKeeper types.TokenKeeper, dexKeeper types.DexKeeper, swapKeeper types.SwapKeeper, farmKeeper types.FarmKeeper, cdc *codec.Codec, logger log.Logger, cfg *appCfg.Config) *Stream
func (Stream) NacosTmRpcAppName ¶
func (Stream) NacosTmRpcNamespaceID ¶
func (Stream) NacosTmRpcUrls ¶
func (Stream) RestExternalAddr ¶
func (Stream) RpcExternalAddr ¶
type Task ¶
type TaskWithData ¶
type TaskWithData struct { *Task // contains filtered or unexported fields }
Source Files ¶
Click to show internal directories.
Click to hide internal directories.