Documentation ¶
Index ¶
- Constants
- Variables
- func AddDeviceInfoHours(ctx context.Context, startTime time.Time, upsertDevice []*model.DeviceInfoHour) error
- func GetDeviceUserId(ctx context.Context, deviceId string) string
- func GetSchedulerConfigs(ctx context.Context, key string) ([]*types.SchedulerCfg, error)
- func LoadSchedulerConfigs() (map[string][]*types.SchedulerCfg, error)
- func OnlineIPCount(ctx context.Context) error
- func QueryDataByDate(DateFrom, DateTo string) []map[string]string
- func RegisterFetcher(fetcher func() Fetcher)
- func SetSchedulerConfigs(ctx context.Context, key string, val interface{}) error
- func SumAllNodes() error
- func SumDailyReward(ctx context.Context, sumTime time.Time, devices []*model.DeviceInfo) error
- func SumDeviceInfoProfit() error
- func SumUserReward(ctx context.Context) error
- func ToDeviceInfo(node types.NodeInfo, areaId string) *model.DeviceInfo
- func ToDeviceInfoHour(device *model.DeviceInfo, t time.Time) *model.DeviceInfoHour
- type AssertFetcher
- type BaseFetcher
- type EtcdClient
- type Fetcher
- type Job
- type NodeFetcher
- type Scheduler
- type Statistic
Constants ¶
const ( DeviceStatusOffline = "offline" DeviceStatusOnline = "online" DeviceStatusAbnormal = "abnormal" DeviceStatusCodeOffline = 3 DeviceStatusCodeOnline = 1 DeviceStatusCodeAbnormal = 2 )
const (
DefaultCommissionPercent = 5
)
const LockerTTL = 30 * time.Second
Variables ¶
var ( DefaultAreaId = "Asia-China-Guangdong-Shenzhen" SchedulerConfigKeyPrefix = "TITAN::SCHEDULERCFG" )
var AllL1NodesMap = make(map[string]*model.DeviceInfo)
var FetcherRegistry []func() Fetcher
FetcherRegistry to keep track of registered fetchers
var Schedulers []*Scheduler
var SumDevicesInterval = time.Second * 5
Functions ¶
func AddDeviceInfoHours ¶
func AddDeviceInfoHours(ctx context.Context, startTime time.Time, upsertDevice []*model.DeviceInfoHour) error
AddDeviceInfoHours 写入 device_info_hour 表
func GetSchedulerConfigs ¶
func LoadSchedulerConfigs ¶
func LoadSchedulerConfigs() (map[string][]*types.SchedulerCfg, error)
func OnlineIPCount ¶
func QueryDataByDate ¶
func RegisterFetcher ¶
func RegisterFetcher(fetcher func() Fetcher)
RegisterFetcher allows registering new fetchers
func SetSchedulerConfigs ¶
func SumAllNodes ¶
func SumAllNodes() error
func SumDailyReward ¶
SumDailyReward 写入或更新 device_info_daily表
func SumDeviceInfoProfit ¶
func SumDeviceInfoProfit() error
func SumUserReward ¶
SumUserReward 奖励规则 - 普通人: 邀请好友获得积分时,您将获得他们积分的10%作为奖励。 - KOL-1级: 邀请好友获得积分时,您将获得他们积分的15%作为奖励。 - KOL-2级: 邀请好友获得积分时,您将获得他们积分的20%作为奖励。 - KOL-3级: 邀请好友获得积分时,您将获得他们积分的30%作为奖励。 二级邀请奖励 - 当您的好友(被您邀请的人)邀请其他人并获得积分时,您也将获得该二级好友积分的部分奖励: - 普通人: 获得二级好友积分的5%。 - KOL-1级: 获得二级好友积分的7.5%。 - KOL-2级: 获得二级好友积分的10%。 - KOL-3级: 获得二级好友积分的15%。
func ToDeviceInfo ¶
func ToDeviceInfo(node types.NodeInfo, areaId string) *model.DeviceInfo
func ToDeviceInfoHour ¶
func ToDeviceInfoHour(device *model.DeviceInfo, t time.Time) *model.DeviceInfoHour
Types ¶
type AssertFetcher ¶
type AssertFetcher struct {
BaseFetcher
}
AssertFetcher represents a fetcher for asset information.
func (AssertFetcher) Fetch ¶
func (a AssertFetcher) Fetch(ctx context.Context, scheduler *Scheduler) error
Fetch fetches asset information.
func (AssertFetcher) Finalize ¶
func (a AssertFetcher) Finalize() error
type BaseFetcher ¶
type BaseFetcher struct {
// contains filtered or unexported fields
}
BaseFetcher is a basic implementation of the Fetcher interface.
func (BaseFetcher) GetJobQueue ¶
func (b BaseFetcher) GetJobQueue() chan Job
GetJobQueue returns the job queue channel.
type EtcdClient ¶
type EtcdClient struct {
// contains filtered or unexported fields
}
func NewEtcdClient ¶
func NewEtcdClient(addresses []string) (*EtcdClient, error)
type Fetcher ¶
type Fetcher interface { Fetch(ctx context.Context, scheduler *Scheduler) error Push(ctx context.Context, job Job) GetJobQueue() chan Job Finalize() error }
Fetcher is an interface for fetching and processing data.
type NodeFetcher ¶
type NodeFetcher struct {
BaseFetcher
}
NodeFetcher handles fetching information about all nodes
func (*NodeFetcher) Fetch ¶
func (n *NodeFetcher) Fetch(ctx context.Context, scheduler *Scheduler) error
Fetch fetches information about all nodes 流程如下: 1. 遍历拉取节点的数据, 每次上限为 1000 个(调度器那边设置上限也是1000) 2. 区分在线和离线的节点, 创建一个任务, 任务步骤: 2.1 更新 device_info表, 使用的是 INSERT INTO ... ON DUPLICATE KEY UPDATE ... , 在线的需要更新多个字段, 离线的只更新在线状态为离线 2.2 写入 device_info_hour 表, 每次拉取都会记录到这个表, 5分钟一条记录 2.3 统计每个节点当天的 收益,在线等数据, 并写到 device_info_daily 表, 唯一主键为 device_id 和 time, 每个节点每天增加一条记录 3. 把任务 Push 到队列等待执行 4. Finalize 任务, 执行以下统计 4.1 统计每个节点的每日收益,昨日收益,七天收益和月收益等, 更新到 device_info 表 4.2 统计所有节点的总收益,总内存和总的存储等总览页面数据的统计
func (*NodeFetcher) Finalize ¶
func (n *NodeFetcher) Finalize() error
type Scheduler ¶
func FetchSchedulersFromEtcd ¶
func FetchSchedulersFromEtcd(etcdClient *EtcdClient) ([]*Scheduler, error)
type Statistic ¶
type Statistic struct {
// contains filtered or unexported fields
}
Statistic represents the statistics manager.
func New ¶
func New(cfg config.StatisticsConfig, client *EtcdClient) *Statistic
New creates a new Statistic instance.
func (*Statistic) Once ¶
Once 使用 redis 分布式锁, 当部署多个服务时,保证只有一个服务进行数据拉取和统计,避免重复执行任务,获得锁的服务会执行任务,获取不到锁的则跳过.