stgcommon

package
v0.0.0-...-0a980e4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 27, 2019 License: Apache-2.0 Imports: 23 Imported by: 0

README

smartgocommon

smartgocommon is ...

Read the docs

Documentation

Index

Constants

View Source
const (
	SMARTGO_HOME_ENV                = "SMARTGO_HOME"            // smartgo home目录
	CLOUDMQ_HOME_PROPERTY           = "smartgo.home.dir"        // 默认smartgo home地址
	NAMESRV_ADDR_ENV                = "NAMESRV_ADDR"            // namesrv地址环境变量
	NAMESRV_PORT_ENV                = "NAMESRV_PORT"            // namesrv端口环境变量
	NAMESRV_ADDR_PROPERTY           = "cloudmq.namesrv.addr"    // 默认namesrv_addr地址
	SMARTGO_DATA_PATH_ENV           = "SMARTGO_DATA_PATH"       // broker、store等模块,存取数据的目录
	SMARTGO_REGISTRY_CONFIG_ENV     = "SMARTGO_REGISTRY_CONFIG" // registry模块的日志配置文件路径
	BLOTMQ_WEB_CONFIG_ENV           = "BLOTMQ_WEB_CONFIG"       // console控制台web界面的配置文件
	MESSAGE_COMPRESS_LEVEL          = "cloudmq.message.compressLevel"
	WS_DOMAIN_NAME                  = "jmenv.tbsite.net"
	WS_DOMAIN_SUBGROUP              = "nsaddr"
	WS_ADDR                         = "http://" + WS_DOMAIN_NAME + ":8080/smartgo/" + WS_DOMAIN_SUBGROUP // http://jmenv.tbsite.net:8080/smartgo/nsaddr
	DEFAULT_TOPIC                   = "MY_DEFAULT_TOPIC"
	BENCHMARK_TOPIC                 = "BenchmarkTest"
	DEFAULT_PRODUCER_GROUP          = "DEFAULT_PRODUCER"
	DEFAULT_CONSUMER_GROUP          = "DEFAULT_CONSUMER"
	TOOLS_CONSUMER_GROUP            = "TOOLS_CONSUMER"
	FILTERSRV_CONSUMER_GROUP        = "FILTERSRV_CONSUMER"
	MONITOR_CONSUMER_GROUP          = "__MONITOR_CONSUMER"
	CLIENT_INNER_PRODUCER_GROUP     = "CLIENT_INNER_PRODUCER"
	SELF_TEST_PRODUCER_GROUP        = "SELF_TEST_P_GROUP"
	SELF_TEST_CONSUMER_GROUP        = "SELF_TEST_C_GROUP"
	SELF_TEST_TOPIC                 = "SELF_TEST_TOPIC"
	OFFSET_MOVED_EVENT              = "OFFSET_MOVED_EVENT"
	DEFAULT_CHARSET                 = "UTF-8"
	MASTER_ID                       = 0
	RETRY_GROUP_TOPIC_PREFIX        = "%RETRY%" // 为每个ConsumerGroup建立一个默认的Topic,前缀+GroupName,用来保存处理失败需要重试的消息
	DLQ_GROUP_TOPIC_PREFIX          = "%DLQ%"   // 为每个ConsumerGroup建立一个默认的Topic,前缀+GroupName,用来保存重试多次都失败,接下来不再重试的消息
	BROKER_REBLANCE_LOCKMAXLIVETIME = "smartgo.broker.rebalance.lockMaxLiveTime"
	SMARTGO_CONF_DIR                = "/github.com/ttstringiot/golangiot/conf/"
	MSG_BODY_DIR                    = "/tmp/blotmq/msgbodys/" // 消息body内容存储在stgweb站点所在服务器路径
)

mix_all: 大杂烩 Author: yintongqiang Since: 2017/8/10

View Source
const (
	DefaultReadQueueNums  = 16
	DefaultWriteQueueNums = 16
)
View Source
const (
	WINDOWS    = "windows" // windows operating system
	MAX_VALUE  = 0x7fffffffffffffff
	TIMEFORMAT = "2006-01-02 15:04:05"
)

Variables

This section is empty.

Functions

func CheckIpAndPort

func CheckIpAndPort(addr string) bool

CheckIpAndPort 校验ip:port是否有效 Author: tianyuliang Since: 2017/9/27

func CompareAndIncreaseOnly

func CompareAndIncreaseOnly(target *int64, value int64) bool

func Compress

func Compress(src []byte) []byte

压缩

func ComputNextHourTimeMillis

func ComputNextHourTimeMillis() int64

ComputNextMinutesTimeMillis 下一整点小时(分、秒、毫秒置为0) Author rongzhihong Since 2017/9/5

func ComputNextMinutesTimeMillis

func ComputNextMinutesTimeMillis() int64

ComputNextMinutesTimeMillis 下一整点分钟(秒、毫秒置为0) Author rongzhihong Since 2017/9/5

func ComputNextMorningTimeMillis

func ComputNextMorningTimeMillis() int64

ComputNextMorningTimeMillis 下一整点天(时、分、秒、毫秒置为0) Author rongzhihong Since 2017/9/5

func Crc32

func Crc32(p []byte) (int32, error)

Crc32 获得循环冗余校验码 Author: rongzhihong Since: 2017/10/26

func CreateDir

func CreateDir(dir string) (bool, error)

CreateDir 创建文件夹

func CreateFile

func CreateFile(fileFullName string) (bool, error)

CreateFile 创建文件

func Decode

func Decode(data []byte, v interface{}) error

Decode Json Decode Author: rongzhihong Since: 2017/9/19

func Encode

func Encode(v interface{}) []byte

Encode Json Encode Author: rongzhihong Since: 2017/9/19

func ExistsDir

func ExistsDir(fileFullPath string) (bool, error)

ExistsDir 校验文件是否存在 Author: tianyuliang Since: 2017/9/15

func ExistsFile

func ExistsFile(fileFullPath string) (bool, error)

ExistsFile 校验文件是否存在 Author: tianyuliang Since: 2017/9/15

func File2String

func File2String(filePath string) (data string, err error)

File2String 读取文件内容

func FormatTimestamp

func FormatTimestamp(stamp int64) string

FormatTimestamp 转化时间戳为字符串(自动适配毫秒数)

使用示例 (1)FormatTimestamp(1505716870) ==> 2017/9/18 14:41:10

(2)FormatTimestamp(1505716870921) ==> 2017/9/18 14:41:10.921

Author: tianyuliang Since: 2017/9/18

func GetCurrentTimeMillis

func GetCurrentTimeMillis() (currentTimeMillis int64)

GetCurrentTimeMillis 得到当前时间的毫秒数 Author: tianyuliang Since: 2017/9/6

func GetDLQTopic

func GetDLQTopic(consumerGroup string) string

func GetDefaultBrokerName

func GetDefaultBrokerName() string

GetDefaultBrokerName 获取默认broker名称 Author: tianyuliang Since: 2017/9/29

func GetDiskPartitionSpaceUsedPercent

func GetDiskPartitionSpaceUsedPercent(path string) (percent float64)

GetDiskPartitionSpaceUsedPercent 获取磁盘分区空间使用率 Author rongzhihong Since 2017/9/5

func GetGoPath

func GetGoPath() string

GetGoPath 获取GoPath路径 Author: tianyuliang Since: 2017/9/27

func GetNamesrvAddr

func GetNamesrvAddr() string

GetNamesrvAddr 获取环境变量“NAMESRV_ADDR”的值 Author: tianyuliang Since: 2017/9/27

func GetNamesrvPort

func GetNamesrvPort() string

GetNamesrvPort 获取环境变量“NAMESRV_PORT”的值 Author: tianyuliang Since: 2017/9/27

func GetRetryTopic

func GetRetryTopic(consumerGroup string) string

func GetSmartGoHome

func GetSmartGoHome() string

GetSmartGoHome 获取环境变量“SMARTGO_HOME”的值 Author: tianyuliang Since: 2017/9/27

func GetSmartRegistryConfig

func GetSmartRegistryConfig() string

GetSmartRegistryConfig 获取环境变量“SMARTGO_REGISTRY_CONFIG”的值 Author: tianyuliang Since: 2017/9/27

func GetSmartgoConfigDir

func GetSmartgoConfigDir(config ...interface{}) string

GetSmartgoConfigDir 为了IDEA开发调试,得到当前项目conf配置项路径,路径末尾带上"/"字符 Author: tianyuliang Since: 2017/9/27

func GetUserHomeDir

func GetUserHomeDir() string

GetUserHomeDir 获取当前操作系统登陆用户的Home目录

注意:

	(1)一台服务器,启动多个broker,因此就需要环境变量“SMARTGO_DATA_PATH”来区别每个broker的数据目录
 (2)如果配置了环境变量“SMARTGO_DATA_PATH”,那么user.Current().HomeDir的路径会被覆盖

Author: tianyuliang Since: 2017/9/27

func HashCode

func HashCode(s string) int64

func IsBlank

func IsBlank(content string) bool

IsBlank 是否为空:false:不为空, true:为空 Author: rongzhihong Since: 2017/9/19

func IsEmpty

func IsEmpty(content string) bool

IsEmpty 校验字符串是否为空 Author: tianyuliang Since: 2017/10/20

func IsItTimeToDo

func IsItTimeToDo(when string) bool

IsItTimeToDo Author: zhoufei Since: 2017/10/13

func IsNumber

func IsNumber(content string) bool

IsNumber 是否是数字:true:是, false:否 Author: rongzhihong Since: 2017/9/19

func IsWindowsOS

func IsWindowsOS() bool

isWindowsOS check current os is windows if current is windows operating system, return true ; otherwise return false Author rongzhihong Since 2017/9/8

func MilliTime2String

func MilliTime2String(millisecond int64) string

MillsTime2String 将毫秒时间转为字符时间 Author: rongzhihong Since: 2017/9/19

func ParseClientAddr

func ParseClientAddr(clientAddr string) (ip string, pid int)

ParseClientAddr 转化客户端地址 Author: tianyuliang Since: 2017/9/27

func String2File

func String2File(data []byte, fileName string)

写文件 2017/8/28 Add by yintongjiang,windows"\\"需改成"/"

func UnCompress

func UnCompress(src []byte) []byte

解压

func UnixNano

func UnixNano() int64

UnixNano return current time unix

Types

type AtomicInt64

type AtomicInt64 struct {
	// contains filtered or unexported fields
}

func NewAtomicIn64

func NewAtomicIn64(initialValue int) *AtomicInt64

func (*AtomicInt64) AddAndGet

func (self *AtomicInt64) AddAndGet(value int) int64

func (*AtomicInt64) DecrementAndGet

func (self *AtomicInt64) DecrementAndGet() int64

func (*AtomicInt64) Get

func (self *AtomicInt64) Get() int64

func (*AtomicInt64) GetAndAdd

func (self *AtomicInt64) GetAndAdd(value int) int64

func (*AtomicInt64) GetAndDecrement

func (self *AtomicInt64) GetAndDecrement() int64

func (*AtomicInt64) GetAndIncrement

func (self *AtomicInt64) GetAndIncrement() int64

func (*AtomicInt64) IncrementAndGet

func (self *AtomicInt64) IncrementAndGet() int64

func (*AtomicInt64) Set

func (self *AtomicInt64) Set(value int)

type BrokerConfig

type BrokerConfig struct {
	SmartGoHome                        string `json:"smartgoHome"`                        // 环境变量"SMARTGO_HOME"(smartgo工程目录)
	StorePathRootDir                   string `json:"storePathRootDir"`                   // broker、store等模块的数据存储目录
	NamesrvAddr                        string `json:"namesrvAddr"`                        // namsrv地址
	BrokerIP1                          string `json:"brokerIP1"`                          // 本机ip1地址
	BrokerIP2                          string `json:"brokerIP2"`                          // 本机ip2地址
	BrokerName                         string `json:"brokerName"`                         // 当前机器hostName(通常来说被broker-a.toml配置项覆盖)
	BrokerClusterName                  string `json:"brokerClusterName"`                  // 集群名称(通常来说被broker-a.toml配置项覆盖)
	BrokerId                           int64  `json:"brokerId"`                           // 默认值MasterId
	BrokerPort                         int    `json:"brokerPort"`                         // broker对外提供服务端口
	BrokerPermission                   int    `json:"brokerPermission"`                   // Broker权限
	DefaultTopicQueueNums              int32  `json:"defaultTopicQueueNums"`              // 默认topic队列数
	AutoCreateTopicEnable              bool   `json:"autoCreateTopicEnable"`              // 自动创建Topic功能是否开启(生产环境建议关闭)
	ClusterTopicEnable                 bool   `json:"clusterTopicEnable"`                 // 自动创建以集群名字命名的Topic功能是否开启
	BrokerTopicEnable                  bool   `json:"brokerTopicEnable"`                  // 自动创建以服务器名字命名的Topic功能是否开启
	AutoCreateSubscriptionGroup        bool   `json:"autoCreateSubscriptionGroup"`        // 自动创建订阅组功能是否开启(线上建议关闭)
	SendMessageThreadPoolNums          int    `json:"sendMessageThreadPoolNums"`          // SendMessageProcessor处理线程数
	PullMessageThreadPoolNums          int    `json:"pullMessageThreadPoolNums"`          // PullMessageProcessor处理线程数
	AdminBrokerThreadPoolNums          int    `json:"adminBrokerThreadPoolNums"`          // AdminBrokerProcessor处理线程数
	ClientManageThreadPoolNums         int    `json:"clientManageThreadPoolNums"`         // ClientManageProcessor处理线程数
	FlushConsumerOffsetInterval        int    `json:"flushConsumerOffsetInterval"`        // 刷新ConsumerOffest定时间隔
	FlushConsumerOffsetHistoryInterval int    `json:"flushConsumerOffsetHistoryInterval"` // 此字段目前没有使用
	RejectTransactionMessage           bool   `json:"rejectTransactionMessage"`           // 是否拒绝接收事务消息
	FetchNamesrvAddrByAddressServer    bool   `json:"fetchNamesrvAddrByAddressServer"`    // 是否从地址服务器寻找NameServer地址,正式发布后,默认值为false
	SendThreadPoolQueueCapacity        int    `json:"sendThreadPoolQueueCapacity"`        // 发送消息对应的线程池阻塞队列size
	PullThreadPoolQueueCapacity        int    `json:"pullThreadPoolQueueCapacity"`        // 订阅消息对应的线程池阻塞队列size
	FilterServerNums                   int32  `json:"filterServerNums"`                   // 过滤服务器数量
	LongPollingEnable                  bool   `json:"longPollingEnable"`                  // Consumer订阅消息时,Broker是否开启长轮询
	ShortPollingTimeMills              int    `json:"shortPollingTimeMills"`              // 如果是短轮询,服务器挂起时间
	NotifyConsumerIdsChangedEnable     bool   `json:"notifyConsumerIdsChangedEnable"`     // notify consumerId changed 开关
	OffsetCheckInSlave                 bool   `json:"offsetCheckInSlave"`                 // slave 是否需要纠正位点
	HaMasterAddress                    string `json:"haMasterAddress"`                    // 适用场景:HA功能配置(将slave角色的 ha地址,指向master角色)
}

BrokerConfig Broker配置项 Author gaoyanlei Since 2017/8/8

func NewBrokerConfig

func NewBrokerConfig(brokerName, brokerClusterName string) *BrokerConfig

NewBrokerConfig 初始化BrokerConfig(默认AutoCreateTopicEnable=true) Author: tianyuliang Since: 2017/9/28

func NewCustomBrokerConfig

func NewCustomBrokerConfig(cfg *SmartgoBrokerConfig) *BrokerConfig

NewCustomBrokerConfig 初始化BrokerConfig(根据传入参数autoCreateTopicEnable来标记:是否自动创建Topic) Author: tianyuliang Since: 2017/9/28

func NewDefaultBrokerConfig

func NewDefaultBrokerConfig() *BrokerConfig

NewDefaultBrokerConfig 初始化默认BrokerConfig(默认AutoCreateTopicEnable=true) Author gaoyanlei Since 2017/8/9

func (*BrokerConfig) CheckBrokerConfigAttr

func (self *BrokerConfig) CheckBrokerConfigAttr(smartgoBrokerFilePath ...string) bool

CheckBrokerConfigAttr 校验broker启动的所必需的SmartGoHome、namesrv配置 Author: tianyuliang Since: 2017/9/22

func (*BrokerConfig) HasReadable

func (self *BrokerConfig) HasReadable() bool

HasReadable 校验Broker是否有读权限 Author: tianyuliang Since: 2017/9/29

func (*BrokerConfig) HasWriteable

func (self *BrokerConfig) HasWriteable() bool

HasWriteable 校验Broker是否有写权限 Author: tianyuliang Since: 2017/9/29

type ConfigManager

type ConfigManager struct {
	IConfigManager
}

type DataVersion

type DataVersion struct {
	Timestamp int64 `json:"timestamp"`
	Counter   int64 `json:"counter"`
}

func NewDataVersion

func NewDataVersion(timestamp ...int64) *DataVersion

func (*DataVersion) AssignNewOne

func (self *DataVersion) AssignNewOne(dataVersion DataVersion)

func (*DataVersion) Equals

func (this *DataVersion) Equals(dataVersion *DataVersion) bool

func (*DataVersion) NextVersion

func (self *DataVersion) NextVersion()

func (*DataVersion) ToJson

func (self *DataVersion) ToJson() string

func (*DataVersion) ToString

func (self *DataVersion) ToString() string

type IConfigManager

type IConfigManager interface {
	Encode(prettyFormat bool) string

	Decode(jsonString []byte)

	ConfigFilePath() string
}

type RunningStats

type RunningStats int
const (
	COMMIT_LOG_MAX_OFFSET RunningStats = iota
	COMMIT_LOG_MIN_OFFSET
	COMMIT_LOG_DISK_RATIO
	CONSUME_QUEUE_DISK_RATIO
	SCHEDULE_MESSAGE_OFFSET
)

func (RunningStats) String

func (state RunningStats) String() string

type Service

type Service interface {
	// contains filtered or unexported methods
}

type ServiceState

type ServiceState int

ServiceState: 服务状态枚举 Author: yintongqiang Since: 2017/8/10

const (
	CREATE_JUST      ServiceState = iota // Service just created,not start
	RUNNING                              // Service Running
	SHUTDOWN_ALREADY                     // Service shutdown
	START_FAILED                         // Service Start failure
)

func (ServiceState) String

func (state ServiceState) String() string

type SmartgoBrokerConfig

type SmartgoBrokerConfig struct {
	BrokerClusterName     string // 集群名称
	BrokerName            string // broker名称
	BrokerId              int64  // broker id
	BrokerIP              string // broker自身启动的IP地址
	BrokerPort            int    // broker对应服务的端口
	DeleteWhen            int    // 何时触发“删除无效Message”
	FileReservedTime      int    // 消息保存时间
	BrokerRole            string // broker角色 主/备
	FlushDiskType         string // 刷盘方式
	AutoCreateTopicEnable bool   // 是否允许客户端自动创建Topic
	StorePathRootDir      string // broker、store等模块的数据存储目录
	HaMasterAddress       string // 适用场景:HA功能配置(将slave角色的 ha地址,指向master角色)
}

SmartgoBrokerConfig 启动smartgoBroker所必需的配置项 Author: tianyuliang Since: 2017/9/26

func (*SmartgoBrokerConfig) IsBlank

func (self *SmartgoBrokerConfig) IsBlank() bool

IsBlank 判断配置项是否读取成功 Author: tianyuliang Since: 2017/9/26

func (*SmartgoBrokerConfig) ToString

func (self *SmartgoBrokerConfig) ToString() string

ToString 打印smartgoBroker配置项 Author: tianyuliang Since: 2017/9/26

type SystemClock

type SystemClock struct {
	// contains filtered or unexported fields
}

func NewSystemClock

func NewSystemClock(precision int64) *SystemClock

func (*SystemClock) Now

func (self *SystemClock) Now() int64

func (*SystemClock) Shutdown

func (self *SystemClock) Shutdown()

func (*SystemClock) Start

func (self *SystemClock) Start()

type TopicConfig

type TopicConfig struct {
	SEPARATOR       string
	TopicName       string          `json:"topicName"`
	ReadQueueNums   int32           `json:"readQueueNums"`
	WriteQueueNums  int32           `json:"writeQueueNums"`
	Perm            int             `json:"perm"`
	TopicFilterType TopicFilterType `json:"topicFilterType"`
	TopicSysFlag    int             `json:"topicSysFlag"`
	Order           bool            `json:"order"`
}

func NewCustomTopicConfig

func NewCustomTopicConfig(topicName string, readQueueNums, writeQueueNums int32, topicSysFlag int, filterType ...TopicFilterType) *TopicConfig

func NewDefaultTopicConfig

func NewDefaultTopicConfig(topicName string, readQueueNums, writeQueueNums int32, perm int, filterType TopicFilterType) *TopicConfig

func NewTopicConfig

func NewTopicConfig(topicName string) *TopicConfig

func (*TopicConfig) ToPermString

func (self *TopicConfig) ToPermString() string

func (*TopicConfig) ToString

func (self *TopicConfig) ToString() string

type TopicFilterType

type TopicFilterType int

TopicFilterType Topic过滤方式,默认为单Tag过滤 Author gaoyanlei Since 2017/8/9

const (
	SINGLE_TAG TopicFilterType = iota // 每个消息只能有一个Tag
	MULTI_TAG                         // (1)每个消息可以有多个Tag(暂时不支持,后续视情况支持)(2)为什么暂时不支持?(3)此功能可能会对用户造成困扰,且方案并不完美,所以暂不支持
)

func (TopicFilterType) ToString

func (self TopicFilterType) ToString() string

Directories

Path Synopsis
Package doublylinkedlist implements the doubly-linked list.
Package doublylinkedlist implements the doubly-linked list.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL