Documentation ¶
Index ¶
- Constants
- Variables
- func AutoClearExpirePending()
- func AutoSplitLog(logType string)
- func ClearConsumerPending(consumer string)
- func CopyFile(src, dst string) (int64, error)
- func ExitWithNotice(notice Notice)
- func FileExists(filePath string) bool
- func GenerateId(i int64, number int64, count int64) uint64
- func GetBucketName(bucket string, pointName string) string
- func GetClientIP(r *http.Request) string
- func GetConfigCmdKey(cmd string) string
- func GetConsumerFullUrl(host string, path string, msgId uint64) string
- func GetIdBaseNumber(count int64) int64
- func GetMessageDetailKey(msgId uint64) string
- func GetMessageIdListName(bucketName string) string
- func GetMessagePendingKey(consumer string) string
- func GetMessageStatusHashField(consumerName string) string
- func GetMessageStatusHashName(id uint64) string
- func GetPointGroup(project string) string
- func GetPointName(project string, timestamp uint64) string
- func GetYamlConfig(configFile string, mc interface{}) error
- func LoadLuaScript() error
- func RecordError(err error)
- func StringHash(data string) string
- type ConsumerConfig
- type Message
- func (m *Message) GetBucketMessages(bucket string) []Message
- func (m *Message) GetMessageDetail() error
- func (m *Message) GetPendingMessageIdList(consumer string, start string, end string) (map[string]interface{}, error)
- func (m *Message) GetPointBuckets(point string) ([]string, error)
- func (m *Message) GetTimePoint() (string, error)
- func (m *Message) Save() error
- func (m *Message) SetMessageStatus(consumerName string, status int, removeFromPending bool) bool
- func (m *Message) Status(consumerName string) (string, error)
- type Notice
- type ProductConfig
- type ThrowAble
Constants ¶
View Source
const ( // redis key 前缀 RedisKeyPrefix = "dmq" // 自增ID的key IdIncrKey = "message:id:generate:incr" MessageStatusWaiting = 1 //默认状态 MessageStatusDoing = 2 //正在消费 MessageStatusDone = 3 //消费完 MessageStatusFailed = 4 //消费失败 )
View Source
const ( ErrorCodeDefault = 100 ErrorCodeParseParamsFailed = ErrorCodeDefault + iota ErrorCodeValidateFailed ErrorCodeUnknownProduct ErrorCodeUnknownCommand ErrorCodeJsonMarshal ErrorCodeRedisSave ErrorCodeFoundBucketsFailed ErrorCodeFoundPointFailed ErrorCodeRemoveBucketsFailed ErrorCodeResponseCodeNot200 ErrorCodePreRequestFailed ErrorCodeRequestFailed ErrorCodeGetStatusFailed ErrorCodeGetMessageFailed ErrorCodeRedisLoadLuaFailed ErrorCodeGetPendingFailed )
View Source
const ( // 获取时间点的lua脚本 GetTimePointLuaScript = `` /* 365-byte string literal not displayed */ // 保存消息的lua脚本 SaveMessageLuaScript = `` /* 1026-byte string literal not displayed */ )
Variables ¶
View Source
var ( RedisCli *redis.Client GetTimePointSha string SaveMessageSha string )
View Source
var (
//全局变量 Config配置信息
Config baseConfig
)
Functions ¶
func GetConsumerFullUrl ¶
获取消费者下游接口
func GetMessageStatusHashField ¶
获取消息状态的hash field key
func GetMessageStatusHashName ¶
获取消息状态的hash key 小时区分
Types ¶
type ConsumerConfig ¶
type ConsumerConfig struct { Name string `yaml:"name"` Host string `yaml:"host"` Path string `yaml:"path"` Timeout uint `yaml:"timeout"` RetryTimes uint `yaml:"retry_times"` Interval uint `yaml:"interval"` }
消费者配置
type Message ¶
type Message struct { Id uint64 `json:"id"` // 消息ID Cmd string `json:"cmd"` // command Timestamp uint64 `json:"timestamp"` // 执行时间 Params string `json:"params"` // 命令参数 Project string `json:"project"` // 项目 Bucket string `json:"bucket"` // 消息桶 CreateTime uint64 `json:"create_time"` // 创建时间 RequestId string `json:"request_id"` // 请求ID }
func (*Message) GetBucketMessages ¶
从redis获取bucket对应的任务
func (*Message) GetPendingMessageIdList ¶
func (m *Message) GetPendingMessageIdList(consumer string, start string, end string) (map[string]interface{}, error)
查看没有消费的消息IdList
func (*Message) GetPointBuckets ¶
获取时间点的buckets
func (*Message) GetTimePoint ¶
获取最近的时间点并删除 lua script 保证原子性
func (*Message) SetMessageStatus ¶
设置消息消费状态
type ProductConfig ¶
生产者
Click to show internal directories.
Click to hide internal directories.