stream

package
v0.19.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 25, 2021 License: Apache-2.0 Imports: 52 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ModuleName   = types.ModuleName
	QuerierRoute = types.QuerierRoute
	RouterKey    = types.RouterKey
)
View Source
const (
	StreamNilKind       Kind = 0x00
	StreamMysqlKind     Kind = 0x01
	StreamRedisKind     Kind = 0x02
	StreamPulsarKind    Kind = 0x03
	StreamWebSocketKind Kind = 0x04
	StreamKafkaKind     Kind = 0x05

	EngineNilKind       EngineKind = 0x00
	EngineAnalysisKind  EngineKind = 0x01
	EngineNotifyKind    EngineKind = 0x02
	EngineKlineKind     EngineKind = 0x03
	EngineWebSocketKind EngineKind = 0x04
)
View Source
const (
	NacosTmrpcUrls        = "stream.tmrpc_nacos_urls"
	NacosTmrpcNamespaceID = "stream.tmrpc_nacos_namespace_id"
	NacosTmrpcAppName     = "stream.tmrpc_application_name"
	RpcExternalAddr       = "rpc.external_laddr"
)

Variables

View Source
var TaskConstDesc = map[TaskConst]string{
	TaskStatusInvalid:                 "STREAM_TASK_STATUS_INVALID",
	TaskStatusSuccess:                 "STREAM_TASK_STATUS_SUCCESS",
	TaskStatusStatusFail:              "STREAM_TASK_STATUS_FAIL",
	TaskStatusPartialSuccess:          "STREAM_TASK_STATUS_PARTITIAL_SUCCESS",
	TaskPhase1NextActionRestart:       "STREAM_TASK_PHRASE1_NEXT_ACTION_RESTART",
	TaskPhase1NextActionJumpNextBlock: "STREAM_TASK_PHRASE1_NEXT_ACTION_JUMP_NEXT_BLK",
	TaskPhase1NextActionNewTask:       "STREAM_TASK_PHRASE1_NEXT_ACTION_NEW_TASK",
	TaskPhase1NextActionReturnTask:    "STREAM_TASK_PHRASE1_NEXT_ACTION_RERUN_TASK",
	TaskPhase1NextActionUnknown:       "STREAM_TASK_PHRASE1_NEXT_ACTION_UNKNOWN",
	TaskPhase2NextActionRestart:       "STREAM_TASK_PHRASE2_NEXT_ACTION_RESTART",
	TaskPhase2NextActionJumpNextBlock: "STREAM_TASK_PHRASE2_NEXT_ACTION_JUMP_NEXT_BLK",
}

Functions

func BeginBlocker

func BeginBlocker(ctx sdk.Context, keeper Keeper)

BeginBlocker runs the logic of BeginBlocker with version 0. BeginBlocker resets keeper cache.

func CreateGenAccounts

func CreateGenAccounts(numAccs int, genCoins sdk.Coins) (addrKeysSlice mock.AddrKeysSlice, genAccs []auth.Account)

func EndBlocker

func EndBlocker(ctx sdk.Context, k Keeper)

func MockApplyBlock

func MockApplyBlock(app *MockApp, blockHeight int64, txs []auth.StdTx)

func NewKafkaEngine

func NewKafkaEngine(url string, logger log.Logger, cfg *appCfg.StreamConfig) (types.IStreamEngine, error)

func NewMySQLEngine

func NewMySQLEngine(url string, log log.Logger, cfg *appCfg.StreamConfig) (types.IStreamEngine, error)

func NewPulsarEngine

func NewPulsarEngine(url string, logger log.Logger, cfg *appCfg.StreamConfig) (types.IStreamEngine, error)

func NewRedisEngine

func NewRedisEngine(url string, logger log.Logger, cfg *appCfg.StreamConfig) (types.IStreamEngine, error)

func ParseStreamEngineConfig

func ParseStreamEngineConfig(logger log.Logger, cfg *appCfg.StreamConfig) (map[EngineKind]types.IStreamEngine, error)

func ProduceOrderTxs

func ProduceOrderTxs(app *MockApp, ctx sdk.Context, numToGenerate int, addrKeys mock.AddrKeys,
	orderMsg *ordertypes.MsgNewOrders) []auth.StdTx

Types

type AppModule

type AppModule struct {
	AppModuleBasic
	// contains filtered or unexported fields
}

func NewAppModule

func NewAppModule(k Keeper) AppModule

NewAppModule creates a new AppModule Object

func (AppModule) BeginBlock

func (am AppModule) BeginBlock(ctx sdk.Context, _ abci.RequestBeginBlock)

func (AppModule) EndBlock

func (AppModule) ExportGenesis

func (am AppModule) ExportGenesis(ctx sdk.Context) json.RawMessage

func (AppModule) InitGenesis

func (am AppModule) InitGenesis(ctx sdk.Context, data json.RawMessage) []abci.ValidatorUpdate

func (AppModule) Name

func (AppModule) Name() string

func (AppModule) NewHandler

func (am AppModule) NewHandler() sdk.Handler

func (AppModule) NewQuerierHandler

func (am AppModule) NewQuerierHandler() sdk.Querier

func (AppModule) QuerierRoute

func (am AppModule) QuerierRoute() string

func (AppModule) RegisterInvariants

func (am AppModule) RegisterInvariants(ir sdk.InvariantRegistry)

func (AppModule) Route

func (am AppModule) Route() string

type AppModuleBasic

type AppModuleBasic struct{}

app module Basics object

func (AppModuleBasic) DefaultGenesis

func (AppModuleBasic) DefaultGenesis() json.RawMessage

func (AppModuleBasic) GetQueryCmd

func (AppModuleBasic) GetQueryCmd(cdc *codec.Codec) *cobra.Command

Get the root query command of this module

func (AppModuleBasic) GetTxCmd

func (AppModuleBasic) GetTxCmd(cdc *codec.Codec) *cobra.Command

Get the root tx command of this module

func (AppModuleBasic) Name

func (AppModuleBasic) Name() string

func (AppModuleBasic) RegisterCodec

func (AppModuleBasic) RegisterCodec(cdc *codec.Codec)

func (AppModuleBasic) RegisterRESTRoutes

func (AppModuleBasic) RegisterRESTRoutes(ctx context.CLIContext, rtr *mux.Router)

Register rest routes

func (AppModuleBasic) ValidateGenesis

func (AppModuleBasic) ValidateGenesis(bz json.RawMessage) error

Validation check of the Genesis

type AtomTaskResult

type AtomTaskResult struct {
	// contains filtered or unexported fields
}

type AtomTaskRunner

type AtomTaskRunner struct {
	// contains filtered or unexported fields
}

type BaseMarketKeeper

type BaseMarketKeeper struct {
}

type CacheQueue

type CacheQueue struct {
	// contains filtered or unexported fields
}

func (*CacheQueue) Enqueue

func (cq *CacheQueue) Enqueue(sc Context)

func (*CacheQueue) Start

func (cq *CacheQueue) Start()

type Context

type Context struct {
	// contains filtered or unexported fields
}

type Coordinator

type Coordinator struct {
	// contains filtered or unexported fields
}

func NewCoordinator

func NewCoordinator(logger log.Logger, taskCh chan *TaskWithData, resultCh chan Task, timeout int, engineMap map[EngineKind]types.IStreamEngine) *Coordinator

type DexKeeper

type DexKeeper = types.DexKeeper

type EngineCreator

type EngineCreator func(url string, logger log.Logger, cfg *appCfg.StreamConfig) (types.IStreamEngine, error)

func GetEngineCreator

func GetEngineCreator(eKind EngineKind, sKind Kind) (EngineCreator, error)

type EngineKind

type EngineKind byte

func StringToEngineKind

func StringToEngineKind(kind string) EngineKind

type KafkaEngine

type KafkaEngine struct {
	// contains filtered or unexported fields
}

func (*KafkaEngine) URL

func (ke *KafkaEngine) URL() string

func (*KafkaEngine) Write

func (ke *KafkaEngine) Write(data types.IStreamData, success *bool)

type Keeper

type Keeper struct {
	// contains filtered or unexported fields
}

nolint

func NewKeeper

func NewKeeper(orderKeeper types.OrderKeeper, tokenKeeper types.TokenKeeper, dexKeeper types.DexKeeper, accountKeeper types.AccountKeeper,
	swapKeeper types.SwapKeeper, farmKeeper types.FarmKeeper, cdc *codec.Codec, logger log.Logger, cfg *config.Config, metrics *monitor.StreamMetrics) Keeper

nolint

func (Keeper) AnalysisEnable

func (k Keeper) AnalysisEnable() bool

AnalysisEnable returns true when analysis is enable

func (Keeper) GetMarketKeeper

func (k Keeper) GetMarketKeeper() MarketKeeper

GetMarketKeeper returns market keeper

func (Keeper) OnAccountUpdated

func (k Keeper) OnAccountUpdated(acc auth.Account)

OnAccountUpdated called by auth when account updated

func (Keeper) OnAddNewTokenPair

func (k Keeper) OnAddNewTokenPair(ctx sdk.Context, tokenPair *dex.TokenPair)

OnAddNewTokenPair called by dex when new token pair listed

func (Keeper) OnFarmClaim

func (k Keeper) OnFarmClaim(ctx sdk.Context, address sdk.AccAddress, poolName string, claimedCoins sdk.SysCoins)

func (Keeper) OnSwapCreateExchange

func (k Keeper) OnSwapCreateExchange(ctx sdk.Context, swapTokenPair ammswap.SwapTokenPair)

func (Keeper) OnSwapToken

func (k Keeper) OnSwapToken(ctx sdk.Context, address sdk.AccAddress, swapTokenPair ammswap.SwapTokenPair, sellAmount sdk.SysCoin, buyAmount sdk.SysCoin)

OnSwapToken called by swap

func (Keeper) OnTokenPairUpdated

func (k Keeper) OnTokenPairUpdated(ctx sdk.Context)

OnTokenPairUpdated called by dex when token pair updated

func (Keeper) SyncTx

func (k Keeper) SyncTx(ctx sdk.Context, tx *auth.StdTx, txHash string, timestamp int64)

nolint

type Kind

type Kind byte

func StringToStreamKind

func StringToStreamKind(kind string) Kind

type MarketKeeper

type MarketKeeper backend.MarketKeeper

type MockApp

type MockApp struct {
	*mock.App

	BankKeeper  bank.Keeper
	OrderKeeper keeper.Keeper
	DexKeeper   dex.Keeper
	TokenKeeper token.Keeper
	// contains filtered or unexported fields
}

func GetMockApp

func GetMockApp(t *testing.T, numGenAccs int, cfg *appCfg.Config) (mockApp *MockApp, addrKeysSlice mock.AddrKeysSlice)

type MySQLEngine

type MySQLEngine struct {
	// contains filtered or unexported fields
}

func (*MySQLEngine) URL

func (e *MySQLEngine) URL() string

func (*MySQLEngine) Write

func (e *MySQLEngine) Write(data types.IStreamData, success *bool)

type PulsarEngine

type PulsarEngine struct {
	// contains filtered or unexported fields
}

func (*PulsarEngine) URL

func (e *PulsarEngine) URL() string

func (*PulsarEngine) Write

func (e *PulsarEngine) Write(data types.IStreamData, success *bool)

type RedisEngine

type RedisEngine struct {
	// contains filtered or unexported fields
}

func (*RedisEngine) URL

func (e *RedisEngine) URL() string

func (*RedisEngine) Write

func (e *RedisEngine) Write(data types.IStreamData, success *bool)

type RedisMarketKeeper

type RedisMarketKeeper struct {
	*BaseMarketKeeper
	// contains filtered or unexported fields
}

func NewRedisMarketKeeper

func NewRedisMarketKeeper(client *conn.Client, logger log.Logger) *RedisMarketKeeper

func (*RedisMarketKeeper) GetKlineByProductID

func (k *RedisMarketKeeper) GetKlineByProductID(productID uint64, granularity, size int) ([][]string, error)

func (*RedisMarketKeeper) GetTickerByProducts

func (k *RedisMarketKeeper) GetTickerByProducts(products []string) ([]map[string]string, error)

type Stream

type Stream struct {
	Cache          *common.Cache
	AnalysisEnable bool
	// contains filtered or unexported fields
}

Stream maintains the engines

func NewStream

func NewStream(orderKeeper types.OrderKeeper, tokenKeeper types.TokenKeeper, dexKeeper types.DexKeeper, swapKeeper types.SwapKeeper, farmKeeper types.FarmKeeper, cdc *codec.Codec, logger log.Logger, cfg *appCfg.Config) *Stream

func (Stream) NacosTmRpcAppName

func (s Stream) NacosTmRpcAppName() string

func (Stream) NacosTmRpcNamespaceID

func (s Stream) NacosTmRpcNamespaceID() string

func (Stream) NacosTmRpcUrls

func (s Stream) NacosTmRpcUrls() string

func (Stream) RestExternalAddr

func (s Stream) RestExternalAddr() string

func (Stream) RpcExternalAddr

func (s Stream) RpcExternalAddr() string

type Task

type Task struct {
	Height    int64         `json:"Height"`
	DoneMap   map[Kind]bool `json:"DoneMap"`
	UpdatedAt int64         `json:"UpdatedAt"`
}

func NewTask

func NewTask(blockHeight int64) *Task

func (*Task) GetStatus

func (t *Task) GetStatus() TaskConst

type TaskConst

type TaskConst int
const (
	TaskStatusInvalid TaskConst = 0 + iota
	TaskStatusSuccess
	TaskStatusStatusFail
	TaskStatusPartialSuccess
)
const (
	// Phase 1
	TaskPhase1NextActionRestart TaskConst = 100 + iota
	TaskPhase1NextActionJumpNextBlock
	TaskPhase1NextActionNewTask
	TaskPhase1NextActionReturnTask
	TaskPhase1NextActionUnknown

	// Phase 2
	TaskPhase2NextActionRestart TaskConst = 200 + iota
	TaskPhase2NextActionJumpNextBlock
)

type TaskWithData

type TaskWithData struct {
	*Task
	// contains filtered or unexported fields
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL