Documentation ¶
Index ¶
- Constants
- Variables
- func AddDeviceInfoHours(ctx context.Context, upsertDevice []*model.DeviceInfoHour) error
- func GetDeviceUserId(ctx context.Context, deviceId string) string
- func GetSchedulerConfigs(ctx context.Context, key string) ([]*types.SchedulerCfg, error)
- func OnlineIPCount(ctx context.Context) error
- func QueryDataByDate(DateFrom, DateTo string) []map[string]string
- func RegisterFetcher(fetcher func() Fetcher)
- func SetSchedulerConfigs(ctx context.Context, key string, val interface{}) error
- func SumAllNodes() error
- func SumDailyReward(ctx context.Context, sumTime time.Time, devices []*model.DeviceInfo) error
- func SumDeviceInfoProfit() error
- func SumUserDailyReward(ctx context.Context) error
- func ToDeviceInfo(node types.NodeInfo, areaId string) *model.DeviceInfo
- func ToDeviceInfoHour(device *model.DeviceInfo, t time.Time) *model.DeviceInfoHour
- func UpdateUserRewardFields(ctx context.Context) error
- type AssertFetcher
- type BaseFetcher
- type EtcdClient
- type Fetcher
- type Job
- type NodeFetcher
- type Scheduler
- type Statistic
- type StorageFetcher
- type SystemInfoFetcher
Constants ¶
const ( DeviceStatusOffline = "offline" DeviceStatusOnline = "online" DeviceStatusAbnormal = "abnormal" DeviceStatusCodeOffline = 3 DeviceStatusCodeOnline = 1 DeviceStatusCodeAbnormal = 2 )
const (
DefaultCommissionPercent = 5
)
const LockerTTL = 30 * time.Second
Variables ¶
var ( DefaultAreaId = "Asia-China-Guangdong-Shenzhen" SchedulerConfigKeyPrefix = "TITAN::SCHEDULERCFG" )
var FetcherRegistry []func() Fetcher
FetcherRegistry to keep track of registered fetchers
var SumDevicesInterval = time.Second * 5
Functions ¶
func AddDeviceInfoHours ¶
func AddDeviceInfoHours(ctx context.Context, upsertDevice []*model.DeviceInfoHour) error
AddDeviceInfoHours 写入 device_info_hour 表
func GetSchedulerConfigs ¶
func OnlineIPCount ¶
func QueryDataByDate ¶
func RegisterFetcher ¶
func RegisterFetcher(fetcher func() Fetcher)
RegisterFetcher allows registering new fetchers
func SetSchedulerConfigs ¶
func SumAllNodes ¶
func SumAllNodes() error
func SumDailyReward ¶
SumDailyReward 写入或更新 device_info_daily表
func SumDeviceInfoProfit ¶
func SumDeviceInfoProfit() error
func SumUserDailyReward ¶
SumUserDailyReward 奖励规则 普通用户:
- 邀请人, 可得5%的佣金
- 受邀人, 无津贴
KOL:
- 邀请人, 可得 cli 端 5% 佣金, 移动端 10%-15%-20% 的对应等级比例佣金
- 受邀人, 5%-7%-10% 的津贴
func ToDeviceInfo ¶
func ToDeviceInfo(node types.NodeInfo, areaId string) *model.DeviceInfo
func ToDeviceInfoHour ¶
func ToDeviceInfoHour(device *model.DeviceInfo, t time.Time) *model.DeviceInfoHour
func UpdateUserRewardFields ¶
UpdateUserRewardFields update users table reward, referral_reward, referrer_commission_reward, from_kol_bonus_reward, device_count fields
Types ¶
type AssertFetcher ¶
type AssertFetcher struct {
BaseFetcher
}
AssertFetcher represents a fetcher for asset information.
func (AssertFetcher) Fetch ¶
func (a AssertFetcher) Fetch(ctx context.Context, scheduler *Scheduler) error
Fetch fetches asset information.
func (AssertFetcher) Finalize ¶
func (a AssertFetcher) Finalize() error
type BaseFetcher ¶
type BaseFetcher struct {
// contains filtered or unexported fields
}
BaseFetcher is a basic implementation of the Fetcher interface.
func (BaseFetcher) GetJobQueue ¶
func (b BaseFetcher) GetJobQueue() chan Job
GetJobQueue returns the job queue channel.
type EtcdClient ¶
type EtcdClient struct {
// contains filtered or unexported fields
}
func NewEtcdClient ¶
func NewEtcdClient(addresses []string) (*EtcdClient, error)
type Fetcher ¶
type Fetcher interface { Fetch(ctx context.Context, scheduler *Scheduler) error Push(ctx context.Context, job Job) GetJobQueue() chan Job Finalize() error }
Fetcher is an interface for fetching and processing data.
type NodeFetcher ¶
type NodeFetcher struct {
BaseFetcher
}
NodeFetcher handles fetching information about all nodes
func (*NodeFetcher) Fetch ¶
func (n *NodeFetcher) Fetch(ctx context.Context, scheduler *Scheduler) error
Fetch fetches information about all nodes 流程如下: 1. 遍历拉取节点的数据, 每次上限为 1000 个(调度器那边设置上限也是1000) 2. 区分在线和离线的节点, 创建一个任务, 任务步骤: 2.1 更新 device_info表, 使用的是 INSERT INTO ... ON DUPLICATE KEY UPDATE ... , 在线的需要更新多个字段, 离线的只更新在线状态为离线 2.2 写入 device_info_hour 表, 每次拉取都会记录到这个表, 5分钟一条记录 2.3 统计每个节点当天的 收益,在线等数据, 并写到 device_info_daily 表, 唯一主键为 device_id 和 time, 每个节点每天增加一条记录 3. 把任务 Push 到队列等待执行 4. Finalize 任务, 执行以下统计 4.1 统计每个节点的每日收益,昨日收益,七天收益和月收益等, 更新到 device_info 表 4.2 统计所有节点的总收益,总内存和总的存储等总览页面数据的统计
func (*NodeFetcher) Finalize ¶
func (n *NodeFetcher) Finalize() error
type Scheduler ¶
func FetchSchedulersFromEtcd ¶
func FetchSchedulersFromEtcd(etcdClient *EtcdClient) ([]*Scheduler, error)
type Statistic ¶
type Statistic struct {
// contains filtered or unexported fields
}
Statistic represents the statistics manager.
func New ¶
func New(cfg config.StatisticsConfig, client *EtcdClient) *Statistic
New creates a new Statistic instance.
func (*Statistic) Once ¶
Once 使用 redis 分布式锁, 当部署多个服务时,保证只有一个服务进行数据拉取和统计,避免重复执行任务,获得锁的服务会执行任务,获取不到锁的则跳过.
func (*Statistic) SumDeviceInfoDaily ¶
func (*Statistic) UpdateSchedulers ¶
type StorageFetcher ¶
type StorageFetcher struct {
BaseFetcher
}
func (*StorageFetcher) Fetch ¶
func (c *StorageFetcher) Fetch(ctx context.Context, scheduler *Scheduler) error
Fetch fetches storage information and processes the data.
func (*StorageFetcher) Finalize ¶
func (c *StorageFetcher) Finalize() error
type SystemInfoFetcher ¶
type SystemInfoFetcher struct {
BaseFetcher
}
func (*SystemInfoFetcher) Fetch ¶
func (s *SystemInfoFetcher) Fetch(ctx context.Context, scheduler *Scheduler) error
func (*SystemInfoFetcher) Finalize ¶
func (s *SystemInfoFetcher) Finalize() error