Documentation ¶
Index ¶
- Constants
- func CheckIpAndPort(addr string) bool
- func CompareAndIncreaseOnly(target *int64, value int64) bool
- func Compress(src []byte) []byte
- func ComputNextHourTimeMillis() int64
- func ComputNextMinutesTimeMillis() int64
- func ComputNextMorningTimeMillis() int64
- func Crc32(p []byte) (int32, error)
- func CreateDir(dir string) (bool, error)
- func CreateFile(fileFullName string) (bool, error)
- func Decode(data []byte, v interface{}) error
- func Encode(v interface{}) []byte
- func ExistsDir(fileFullPath string) (bool, error)
- func ExistsFile(fileFullPath string) (bool, error)
- func File2String(filePath string) (data string, err error)
- func FormatTimestamp(stamp int64) string
- func GetCurrentTimeMillis() (currentTimeMillis int64)
- func GetDLQTopic(consumerGroup string) string
- func GetDefaultBrokerName() string
- func GetDiskPartitionSpaceUsedPercent(path string) (percent float64)
- func GetGoPath() string
- func GetNamesrvAddr() string
- func GetNamesrvPort() string
- func GetRetryTopic(consumerGroup string) string
- func GetSmartGoHome() string
- func GetSmartRegistryConfig() string
- func GetSmartgoConfigDir(config ...interface{}) string
- func GetUserHomeDir() string
- func HashCode(s string) int64
- func IsBlank(content string) bool
- func IsEmpty(content string) bool
- func IsItTimeToDo(when string) bool
- func IsNumber(content string) bool
- func IsWindowsOS() bool
- func MilliTime2String(millisecond int64) string
- func ParseClientAddr(clientAddr string) (ip string, pid int)
- func String2File(data []byte, fileName string)
- func UnCompress(src []byte) []byte
- func UnixNano() int64
- type AtomicInt64
- func (self *AtomicInt64) AddAndGet(value int) int64
- func (self *AtomicInt64) DecrementAndGet() int64
- func (self *AtomicInt64) Get() int64
- func (self *AtomicInt64) GetAndAdd(value int) int64
- func (self *AtomicInt64) GetAndDecrement() int64
- func (self *AtomicInt64) GetAndIncrement() int64
- func (self *AtomicInt64) IncrementAndGet() int64
- func (self *AtomicInt64) Set(value int)
- type BrokerConfig
- type ConfigManager
- type DataVersion
- type IConfigManager
- type RunningStats
- type Service
- type ServiceState
- type SmartgoBrokerConfig
- type SystemClock
- type TopicConfig
- type TopicFilterType
Constants ¶
const ( SMARTGO_HOME_ENV = "SMARTGO_HOME" // smartgo home目录 CLOUDMQ_HOME_PROPERTY = "smartgo.home.dir" // 默认smartgo home地址 NAMESRV_ADDR_ENV = "NAMESRV_ADDR" // namesrv地址环境变量 NAMESRV_PORT_ENV = "NAMESRV_PORT" // namesrv端口环境变量 NAMESRV_ADDR_PROPERTY = "cloudmq.namesrv.addr" // 默认namesrv_addr地址 SMARTGO_DATA_PATH_ENV = "SMARTGO_DATA_PATH" // broker、store等模块,存取数据的目录 SMARTGO_REGISTRY_CONFIG_ENV = "SMARTGO_REGISTRY_CONFIG" // registry模块的日志配置文件路径 BLOTMQ_WEB_CONFIG_ENV = "BLOTMQ_WEB_CONFIG" // console控制台web界面的配置文件 MESSAGE_COMPRESS_LEVEL = "cloudmq.message.compressLevel" WS_DOMAIN_NAME = "jmenv.tbsite.net" WS_DOMAIN_SUBGROUP = "nsaddr" WS_ADDR = "http://" + WS_DOMAIN_NAME + ":8080/smartgo/" + WS_DOMAIN_SUBGROUP // http://jmenv.tbsite.net:8080/smartgo/nsaddr DEFAULT_TOPIC = "MY_DEFAULT_TOPIC" BENCHMARK_TOPIC = "BenchmarkTest" DEFAULT_PRODUCER_GROUP = "DEFAULT_PRODUCER" DEFAULT_CONSUMER_GROUP = "DEFAULT_CONSUMER" TOOLS_CONSUMER_GROUP = "TOOLS_CONSUMER" FILTERSRV_CONSUMER_GROUP = "FILTERSRV_CONSUMER" MONITOR_CONSUMER_GROUP = "__MONITOR_CONSUMER" CLIENT_INNER_PRODUCER_GROUP = "CLIENT_INNER_PRODUCER" SELF_TEST_PRODUCER_GROUP = "SELF_TEST_P_GROUP" SELF_TEST_CONSUMER_GROUP = "SELF_TEST_C_GROUP" SELF_TEST_TOPIC = "SELF_TEST_TOPIC" OFFSET_MOVED_EVENT = "OFFSET_MOVED_EVENT" DEFAULT_CHARSET = "UTF-8" MASTER_ID = 0 RETRY_GROUP_TOPIC_PREFIX = "%RETRY%" // 为每个ConsumerGroup建立一个默认的Topic,前缀+GroupName,用来保存处理失败需要重试的消息 DLQ_GROUP_TOPIC_PREFIX = "%DLQ%" // 为每个ConsumerGroup建立一个默认的Topic,前缀+GroupName,用来保存重试多次都失败,接下来不再重试的消息 BROKER_REBLANCE_LOCKMAXLIVETIME = "smartgo.broker.rebalance.lockMaxLiveTime" SMARTGO_CONF_DIR = "/git.oschina.net/cloudzone/smartgo/conf/" MSG_BODY_DIR = "/tmp/blotmq/msgbodys/" // 消息body内容存储在stgweb站点所在服务器路径 )
mix_all: 大杂烩 Author: yintongqiang Since: 2017/8/10
const ( DefaultReadQueueNums = 16 DefaultWriteQueueNums = 16 )
const ( WINDOWS = "windows" // windows operating system MAX_VALUE = 0x7fffffffffffffff TIMEFORMAT = "2006-01-02 15:04:05" )
Variables ¶
This section is empty.
Functions ¶
func CheckIpAndPort ¶
CheckIpAndPort 校验ip:port是否有效 Author: tianyuliang Since: 2017/9/27
func ComputNextHourTimeMillis ¶
func ComputNextHourTimeMillis() int64
ComputNextMinutesTimeMillis 下一整点小时(分、秒、毫秒置为0) Author rongzhihong Since 2017/9/5
func ComputNextMinutesTimeMillis ¶
func ComputNextMinutesTimeMillis() int64
ComputNextMinutesTimeMillis 下一整点分钟(秒、毫秒置为0) Author rongzhihong Since 2017/9/5
func ComputNextMorningTimeMillis ¶
func ComputNextMorningTimeMillis() int64
ComputNextMorningTimeMillis 下一整点天(时、分、秒、毫秒置为0) Author rongzhihong Since 2017/9/5
func Crc32 ¶
Crc32 获得循环冗余校验码 Author: rongzhihong Since: 2017/10/26
func Decode ¶
Decode Json Decode Author: rongzhihong Since: 2017/9/19
func Encode ¶
func Encode(v interface{}) []byte
Encode Json Encode Author: rongzhihong Since: 2017/9/19
func ExistsDir ¶
ExistsDir 校验文件是否存在 Author: tianyuliang Since: 2017/9/15
func ExistsFile ¶
ExistsFile 校验文件是否存在 Author: tianyuliang Since: 2017/9/15
func FormatTimestamp ¶
FormatTimestamp 转化时间戳为字符串(自动适配毫秒数)
使用示例 (1)FormatTimestamp(1505716870) ==> 2017/9/18 14:41:10
(2)FormatTimestamp(1505716870921) ==> 2017/9/18 14:41:10.921
Author: tianyuliang Since: 2017/9/18
func GetCurrentTimeMillis ¶
func GetCurrentTimeMillis() (currentTimeMillis int64)
GetCurrentTimeMillis 得到当前时间的毫秒数 Author: tianyuliang Since: 2017/9/6
func GetDefaultBrokerName ¶
func GetDefaultBrokerName() string
GetDefaultBrokerName 获取默认broker名称 Author: tianyuliang Since: 2017/9/29
func GetDiskPartitionSpaceUsedPercent ¶
GetDiskPartitionSpaceUsedPercent 获取磁盘分区空间使用率 Author rongzhihong Since 2017/9/5
func GetNamesrvAddr ¶
func GetNamesrvAddr() string
GetNamesrvAddr 获取环境变量“NAMESRV_ADDR”的值 Author: tianyuliang Since: 2017/9/27
func GetNamesrvPort ¶
func GetNamesrvPort() string
GetNamesrvPort 获取环境变量“NAMESRV_PORT”的值 Author: tianyuliang Since: 2017/9/27
func GetSmartGoHome ¶
func GetSmartGoHome() string
GetSmartGoHome 获取环境变量“SMARTGO_HOME”的值 Author: tianyuliang Since: 2017/9/27
func GetSmartRegistryConfig ¶
func GetSmartRegistryConfig() string
GetSmartRegistryConfig 获取环境变量“SMARTGO_REGISTRY_CONFIG”的值 Author: tianyuliang Since: 2017/9/27
func GetSmartgoConfigDir ¶
func GetSmartgoConfigDir(config ...interface{}) string
GetSmartgoConfigDir 为了IDEA开发调试,得到当前项目conf配置项路径,路径末尾带上"/"字符 Author: tianyuliang Since: 2017/9/27
func GetUserHomeDir ¶
func GetUserHomeDir() string
GetUserHomeDir 获取当前操作系统登陆用户的Home目录
注意:
(1)一台服务器,启动多个broker,因此就需要环境变量“SMARTGO_DATA_PATH”来区别每个broker的数据目录 (2)如果配置了环境变量“SMARTGO_DATA_PATH”,那么user.Current().HomeDir的路径会被覆盖
Author: tianyuliang Since: 2017/9/27
func IsBlank ¶
IsBlank 是否为空:false:不为空, true:为空 Author: rongzhihong Since: 2017/9/19
func IsEmpty ¶
IsEmpty 校验字符串是否为空 Author: tianyuliang Since: 2017/10/20
func IsItTimeToDo ¶
IsItTimeToDo Author: zhoufei Since: 2017/10/13
func IsNumber ¶
IsNumber 是否是数字:true:是, false:否 Author: rongzhihong Since: 2017/9/19
func IsWindowsOS ¶
func IsWindowsOS() bool
isWindowsOS check current os is windows if current is windows operating system, return true ; otherwise return false Author rongzhihong Since 2017/9/8
func MilliTime2String ¶
MillsTime2String 将毫秒时间转为字符时间 Author: rongzhihong Since: 2017/9/19
func ParseClientAddr ¶
ParseClientAddr 转化客户端地址 Author: tianyuliang Since: 2017/9/27
func String2File ¶
写文件 2017/8/28 Add by yintongjiang,windows"\\"需改成"/"
Types ¶
type AtomicInt64 ¶
type AtomicInt64 struct {
// contains filtered or unexported fields
}
func NewAtomicIn64 ¶
func NewAtomicIn64(initialValue int) *AtomicInt64
func (*AtomicInt64) AddAndGet ¶
func (self *AtomicInt64) AddAndGet(value int) int64
func (*AtomicInt64) DecrementAndGet ¶
func (self *AtomicInt64) DecrementAndGet() int64
func (*AtomicInt64) Get ¶
func (self *AtomicInt64) Get() int64
func (*AtomicInt64) GetAndAdd ¶
func (self *AtomicInt64) GetAndAdd(value int) int64
func (*AtomicInt64) GetAndDecrement ¶
func (self *AtomicInt64) GetAndDecrement() int64
func (*AtomicInt64) GetAndIncrement ¶
func (self *AtomicInt64) GetAndIncrement() int64
func (*AtomicInt64) IncrementAndGet ¶
func (self *AtomicInt64) IncrementAndGet() int64
func (*AtomicInt64) Set ¶
func (self *AtomicInt64) Set(value int)
type BrokerConfig ¶
type BrokerConfig struct { SmartGoHome string `json:"smartgoHome"` // 环境变量"SMARTGO_HOME"(smartgo工程目录) StorePathRootDir string `json:"storePathRootDir"` // broker、store等模块的数据存储目录 NamesrvAddr string `json:"namesrvAddr"` // namsrv地址 BrokerIP1 string `json:"brokerIP1"` // 本机ip1地址 BrokerIP2 string `json:"brokerIP2"` // 本机ip2地址 BrokerName string `json:"brokerName"` // 当前机器hostName(通常来说被broker-a.toml配置项覆盖) BrokerClusterName string `json:"brokerClusterName"` // 集群名称(通常来说被broker-a.toml配置项覆盖) BrokerId int64 `json:"brokerId"` // 默认值MasterId BrokerPort int `json:"brokerPort"` // broker对外提供服务端口 BrokerPermission int `json:"brokerPermission"` // Broker权限 DefaultTopicQueueNums int32 `json:"defaultTopicQueueNums"` // 默认topic队列数 AutoCreateTopicEnable bool `json:"autoCreateTopicEnable"` // 自动创建Topic功能是否开启(生产环境建议关闭) ClusterTopicEnable bool `json:"clusterTopicEnable"` // 自动创建以集群名字命名的Topic功能是否开启 BrokerTopicEnable bool `json:"brokerTopicEnable"` // 自动创建以服务器名字命名的Topic功能是否开启 AutoCreateSubscriptionGroup bool `json:"autoCreateSubscriptionGroup"` // 自动创建订阅组功能是否开启(线上建议关闭) SendMessageThreadPoolNums int `json:"sendMessageThreadPoolNums"` // SendMessageProcessor处理线程数 PullMessageThreadPoolNums int `json:"pullMessageThreadPoolNums"` // PullMessageProcessor处理线程数 AdminBrokerThreadPoolNums int `json:"adminBrokerThreadPoolNums"` // AdminBrokerProcessor处理线程数 ClientManageThreadPoolNums int `json:"clientManageThreadPoolNums"` // ClientManageProcessor处理线程数 FlushConsumerOffsetInterval int `json:"flushConsumerOffsetInterval"` // 刷新ConsumerOffest定时间隔 FlushConsumerOffsetHistoryInterval int `json:"flushConsumerOffsetHistoryInterval"` // 此字段目前没有使用 RejectTransactionMessage bool `json:"rejectTransactionMessage"` // 是否拒绝接收事务消息 FetchNamesrvAddrByAddressServer bool `json:"fetchNamesrvAddrByAddressServer"` // 是否从地址服务器寻找NameServer地址,正式发布后,默认值为false SendThreadPoolQueueCapacity int `json:"sendThreadPoolQueueCapacity"` // 发送消息对应的线程池阻塞队列size PullThreadPoolQueueCapacity int `json:"pullThreadPoolQueueCapacity"` // 订阅消息对应的线程池阻塞队列size FilterServerNums int32 `json:"filterServerNums"` // 过滤服务器数量 LongPollingEnable bool `json:"longPollingEnable"` // Consumer订阅消息时,Broker是否开启长轮询 ShortPollingTimeMills int `json:"shortPollingTimeMills"` // 如果是短轮询,服务器挂起时间 NotifyConsumerIdsChangedEnable bool `json:"notifyConsumerIdsChangedEnable"` // notify consumerId changed 开关 OffsetCheckInSlave bool `json:"offsetCheckInSlave"` // slave 是否需要纠正位点 HaMasterAddress string `json:"haMasterAddress"` // 适用场景:HA功能配置(将slave角色的 ha地址,指向master角色) }
BrokerConfig Broker配置项 Author gaoyanlei Since 2017/8/8
func NewBrokerConfig ¶
func NewBrokerConfig(brokerName, brokerClusterName string) *BrokerConfig
NewBrokerConfig 初始化BrokerConfig(默认AutoCreateTopicEnable=true) Author: tianyuliang Since: 2017/9/28
func NewCustomBrokerConfig ¶
func NewCustomBrokerConfig(cfg *SmartgoBrokerConfig) *BrokerConfig
NewCustomBrokerConfig 初始化BrokerConfig(根据传入参数autoCreateTopicEnable来标记:是否自动创建Topic) Author: tianyuliang Since: 2017/9/28
func NewDefaultBrokerConfig ¶
func NewDefaultBrokerConfig() *BrokerConfig
NewDefaultBrokerConfig 初始化默认BrokerConfig(默认AutoCreateTopicEnable=true) Author gaoyanlei Since 2017/8/9
func (*BrokerConfig) CheckBrokerConfigAttr ¶
func (self *BrokerConfig) CheckBrokerConfigAttr(smartgoBrokerFilePath ...string) bool
CheckBrokerConfigAttr 校验broker启动的所必需的SmartGoHome、namesrv配置 Author: tianyuliang Since: 2017/9/22
func (*BrokerConfig) HasReadable ¶
func (self *BrokerConfig) HasReadable() bool
HasReadable 校验Broker是否有读权限 Author: tianyuliang Since: 2017/9/29
func (*BrokerConfig) HasWriteable ¶
func (self *BrokerConfig) HasWriteable() bool
HasWriteable 校验Broker是否有写权限 Author: tianyuliang Since: 2017/9/29
type ConfigManager ¶
type ConfigManager struct {
IConfigManager
}
type DataVersion ¶
func NewDataVersion ¶
func NewDataVersion(timestamp ...int64) *DataVersion
func (*DataVersion) AssignNewOne ¶
func (self *DataVersion) AssignNewOne(dataVersion DataVersion)
func (*DataVersion) Equals ¶
func (this *DataVersion) Equals(dataVersion *DataVersion) bool
func (*DataVersion) NextVersion ¶
func (self *DataVersion) NextVersion()
func (*DataVersion) ToJson ¶
func (self *DataVersion) ToJson() string
func (*DataVersion) ToString ¶
func (self *DataVersion) ToString() string
type IConfigManager ¶
type RunningStats ¶
type RunningStats int
const ( COMMIT_LOG_MAX_OFFSET RunningStats = iota COMMIT_LOG_MIN_OFFSET COMMIT_LOG_DISK_RATIO CONSUME_QUEUE_DISK_RATIO SCHEDULE_MESSAGE_OFFSET )
func (RunningStats) String ¶
func (state RunningStats) String() string
type Service ¶
type Service interface {
// contains filtered or unexported methods
}
type ServiceState ¶
type ServiceState int
ServiceState: 服务状态枚举 Author: yintongqiang Since: 2017/8/10
const ( CREATE_JUST ServiceState = iota // Service just created,not start RUNNING // Service Running SHUTDOWN_ALREADY // Service shutdown START_FAILED // Service Start failure )
func (ServiceState) String ¶
func (state ServiceState) String() string
type SmartgoBrokerConfig ¶
type SmartgoBrokerConfig struct { BrokerClusterName string // 集群名称 BrokerName string // broker名称 BrokerId int64 // broker id BrokerIP string // broker自身启动的IP地址 BrokerPort int // broker对应服务的端口 DeleteWhen int // 何时触发“删除无效Message” FileReservedTime int // 消息保存时间 BrokerRole string // broker角色 主/备 FlushDiskType string // 刷盘方式 AutoCreateTopicEnable bool // 是否允许客户端自动创建Topic StorePathRootDir string // broker、store等模块的数据存储目录 HaMasterAddress string // 适用场景:HA功能配置(将slave角色的 ha地址,指向master角色) }
SmartgoBrokerConfig 启动smartgoBroker所必需的配置项 Author: tianyuliang Since: 2017/9/26
func (*SmartgoBrokerConfig) IsBlank ¶
func (self *SmartgoBrokerConfig) IsBlank() bool
IsBlank 判断配置项是否读取成功 Author: tianyuliang Since: 2017/9/26
func (*SmartgoBrokerConfig) ToString ¶
func (self *SmartgoBrokerConfig) ToString() string
ToString 打印smartgoBroker配置项 Author: tianyuliang Since: 2017/9/26
type SystemClock ¶
type SystemClock struct {
// contains filtered or unexported fields
}
func NewSystemClock ¶
func NewSystemClock(precision int64) *SystemClock
func (*SystemClock) Now ¶
func (self *SystemClock) Now() int64
func (*SystemClock) Shutdown ¶
func (self *SystemClock) Shutdown()
func (*SystemClock) Start ¶
func (self *SystemClock) Start()
type TopicConfig ¶
type TopicConfig struct { SEPARATOR string TopicName string `json:"topicName"` ReadQueueNums int32 `json:"readQueueNums"` WriteQueueNums int32 `json:"writeQueueNums"` Perm int `json:"perm"` TopicFilterType TopicFilterType `json:"topicFilterType"` TopicSysFlag int `json:"topicSysFlag"` Order bool `json:"order"` }
func NewCustomTopicConfig ¶
func NewCustomTopicConfig(topicName string, readQueueNums, writeQueueNums int32, topicSysFlag int, filterType ...TopicFilterType) *TopicConfig
func NewDefaultTopicConfig ¶
func NewDefaultTopicConfig(topicName string, readQueueNums, writeQueueNums int32, perm int, filterType TopicFilterType) *TopicConfig
func NewTopicConfig ¶
func NewTopicConfig(topicName string) *TopicConfig
func (*TopicConfig) ToPermString ¶
func (self *TopicConfig) ToPermString() string
func (*TopicConfig) ToString ¶
func (self *TopicConfig) ToString() string
type TopicFilterType ¶
type TopicFilterType int
TopicFilterType Topic过滤方式,默认为单Tag过滤 Author gaoyanlei Since 2017/8/9
const ( SINGLE_TAG TopicFilterType = iota // 每个消息只能有一个Tag MULTI_TAG // (1)每个消息可以有多个Tag(暂时不支持,后续视情况支持)(2)为什么暂时不支持?(3)此功能可能会对用户造成困扰,且方案并不完美,所以暂不支持 )
func (TopicFilterType) ToString ¶
func (self TopicFilterType) ToString() string
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package doublylinkedlist implements the doubly-linked list.
|
Package doublylinkedlist implements the doubly-linked list. |