Documentation ¶
Overview ¶
Copyright (c) 2015-2018 All rights reserved. 本软件源代码版权归 my.oschina.net/tantexian 所有,允许复制与学习借鉴. Author: tantexian, <tantexian@qq.com> Since: 2017/8/7
Copyright (c) 2015-2018 All rights reserved. 本软件源代码版权归 my.oschina.net/tantexian 所有,允许复制与学习借鉴. Author: tantexian, <tantexian@qq.com> Since: 2017/8/6
Copyright (c) 2015-2018 All rights reserved. 本软件源代码版权归 my.oschina.net/tantexian 所有,允许复制与学习借鉴. Author: tantexian, <tantexian@qq.com> Since: 2017/8/5
Copyright (c) 2015-2018 All rights reserved. 本软件源代码版权归 my.oschina.net/tantexian 所有,允许复制与学习借鉴. Author: tantexian, <tantexian@qq.com> Since: 2017/8/7
Copyright (c) 2015-2018 All rights reserved. 本软件源代码版权归 my.oschina.net/tantexian 所有,允许复制与学习借鉴. Author: tantexian, <tantexian@qq.com> Since: 2017/8/6
Index ¶
- Constants
- Variables
- func GetHome() string
- func GetParentDirectory(dir string) string
- func GetPathSeparator() string
- func GetUnixHome() string
- func GetWindowsHome() string
- func HashCode(s string) int64
- func PathExists(path string) (bool, error)
- func TagsString2tagsCode(filterType stgcommon.TopicFilterType, tags string) int64
- type AcceptSocketService
- type AllocateMapedFileService
- type AllocateRequest
- type AppendMessageCallback
- type AppendMessageResult
- type AppendMessageStatus
- type CallSnapshot
- type CleanCommitLogService
- type CleanConsumeQueueService
- type CleanReferenceResource
- type CommitLog
- type ConsumeQueue
- type ConsumeQueueTable
- type DefaultAppendMessageCallback
- type DefaultMessageFilter
- type DefaultMessageStore
- func (self *DefaultMessageStore) AppendToCommitLog(startOffset int64, data []byte) bool
- func (self *DefaultMessageStore) CheckInDiskByConsumeOffset(topic string, queueId int32, consumeOffset int64) bool
- func (self *DefaultMessageStore) CleanExpiredConsumerQueue()
- func (self *DefaultMessageStore) CleanUnusedTopic(topics []string) int32
- func (self *DefaultMessageStore) Destroy()
- func (self *DefaultMessageStore) GetCommitLogData(offset int64) *SelectMapedBufferResult
- func (self *DefaultMessageStore) GetEarliestMessageTime(topic string, queueId int32) int64
- func (self *DefaultMessageStore) GetMaxOffsetInQueue(topic string, queueId int32) int64
- func (self *DefaultMessageStore) GetMaxPhyOffset() int64
- func (self *DefaultMessageStore) GetMessage(group string, topic string, queueId int32, offset int64, maxMsgNums int32, ...) *GetMessageResult
- func (self *DefaultMessageStore) GetMessageIds(topic string, queueId int32, minOffset, maxOffset int64, storeHost string) map[string]int64
- func (self *DefaultMessageStore) GetMessageStoreTimeStamp(topic string, queueId int32, offset int64) int64
- func (self *DefaultMessageStore) GetMinOffsetInQueue(topic string, queueId int32) int64
- func (self *DefaultMessageStore) GetOffsetInQueueByTime(topic string, queueId int32, timestamp int64) int64
- func (self *DefaultMessageStore) GetRuntimeInfo() map[string]string
- func (self *DefaultMessageStore) Load() bool
- func (self *DefaultMessageStore) LookMessageByOffset(commitLogOffset int64) *message.MessageExt
- func (self *DefaultMessageStore) Now() int64
- func (self *DefaultMessageStore) PutMessage(msg *MessageExtBrokerInner) *PutMessageResult
- func (self *DefaultMessageStore) QueryMessage(topic string, key string, maxNum int32, begin int64, end int64) *QueryMessageResult
- func (self *DefaultMessageStore) SelectOneMessageByOffset(commitLogOffset int64) *SelectMapedBufferResult
- func (self *DefaultMessageStore) SelectOneMessageByOffsetAndSize(commitLogOffset int64, msgSize int32) *SelectMapedBufferResult
- func (self *DefaultMessageStore) Shutdown()
- func (self *DefaultMessageStore) SlaveFallBehindMuch() int64
- func (self *DefaultMessageStore) Start() error
- func (self *DefaultMessageStore) UpdateHaMasterAddress(newAddr string)
- type DispatchMessageService
- type DispatchRequest
- type Files
- type FlushCommitLogService
- type FlushConsumeQueueService
- type FlushRealTimeService
- type GetMessageResult
- type GetMessageStatus
- type GroupCommitRequest
- type GroupCommitService
- type GroupTransferService
- type HAClient
- type HAConnection
- type HAService
- type IndexFile
- type IndexHeader
- type IndexService
- type MapedFile
- type MapedFileQueue
- type MappedByteBuffer
- func (m *MappedByteBuffer) Bytes() []byte
- func (m *MappedByteBuffer) Read(data []byte) (n int, err error)
- func (self *MappedByteBuffer) ReadInt16() (i int16)
- func (self *MappedByteBuffer) ReadInt32() (i int32)
- func (self *MappedByteBuffer) ReadInt64() (i int64)
- func (self *MappedByteBuffer) ReadInt8() (i int8)
- func (m *MappedByteBuffer) Write(data []byte) (n int, err error)
- func (self *MappedByteBuffer) WriteInt16(i int16) (mappedByteBuffer *MappedByteBuffer)
- func (self *MappedByteBuffer) WriteInt32(i int32) (mappedByteBuffer *MappedByteBuffer)
- func (self *MappedByteBuffer) WriteInt64(i int64) (mappedByteBuffer *MappedByteBuffer)
- func (self *MappedByteBuffer) WriteInt8(i int8) (mappedByteBuffer *MappedByteBuffer)
- type MessageExtBrokerInner
- type MessageFilter
- type MessageStore
- type MessageStoreConfig
- type PutMessageResult
- type PutMessageStatus
- type QueryMessageResult
- type ReadSocketService
- type ReferenceResource
- type ReputMessageService
- type RunningFlags
- type ScheduleMessageService
- type SelectMapedBufferResult
- type StoreCheckpoint
- type StoreStatsService
- type TransactionCheckExecuter
- type TransactionStateService
- type WaitNotifyObject
- type WriteSocketService
Constants ¶
const ( WaitTimeOut = 1000 * 5 DEFAULT_INITIAL_CAPACITY = 11 )
const ( MessageMagicCode = 0xAABBCCDD ^ 1880681586 + 8 BlankMagicCode = 0xBBCCDDEE ^ 1880681586 + 8 )
const ( END_FILE_MIN_BLANK_LENGTH = 4 + 4 TOTALSIZE = 4 // 1 TOTALSIZE MAGICCODE = 4 // 2 MAGICCODE BODYCRC = 4 // 3 BODYCRC QUEUE_ID = 4 // 4 QUEUEID FLAG = 4 // 5 FLAG QUEUE_OFFSET = 8 // 6 QUEUEOFFSET PHYSICAL_OFFSET = 8 // 7 PHYSICALOFFSET SYSFLAG = 4 // 8 SYSFLAG BORN_TIMESTAMP = 8 // 9 BORNTIMESTAMP BORN_HOST = 8 // 10 BORNHOST STORE_TIMESTAMP = 8 // 11 STORETIMESTAMP STORE_HOST_ADDRESS = 8 // 12 STOREHOSTADDRESS RE_CONSUME_TIMES = 4 // 13 RECONSUMETIMES PREPARED_TRANSACTION_OFFSET = 8 // 14 Prepared Transaction Offset BODY_LENGTH = 4 TOPIC_LENGTH = 1 PROPERTIES_LENGTH = 2 )
const ( TotalPhysicalMemorySize = 1024 * 1024 * 1024 * 24 LongMinValue = -9223372036854775808 )
const ( OS_PAGE_SIZE = 1024 * 4 MMAPED_ENTIRE_FILE = -1 )
const ( NotReadableBit = 1 // 禁止读权限 NotWriteableBit = 1 << 1 // 禁止写权限 WriteLogicsQueueErrorBit = 1 << 2 // 逻辑队列是否发生错误 WriteIndexFileErrorBit = 1 << 3 // 索引文件是否发生错误 DiskFullBit = 1 << 4 // 磁盘空间不足 )
const ( SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXX" FIRST_DELAY_TIME = int64(1000) DELAY_FOR_A_WHILE = int64(100) DELAY_FOR_A_PERIOD = int64(10000) )
const ( FrequencyOfSampling = 1000 MaxRecordsOfSampling = 60 * 10 PrintTPSInterval = 60 * 1 )
const (
CQStoreUnitSize = 20 // 存储单元大小
)
ConsumeQueue 消费队列实现 Author zhoufei Since 2017/9/7
const (
FlushRetryTimesOver = 3
)
const (
GroupCommitRequestHighWater = 600000
)
const (
MaxManualDeleteFileTimes = 20 // 手工触发一次最多删除次数
)
const (
ReadSocketMaxBufferSize = 1024 * 1024
)
const (
RetryTimesOver = 3
)
const (
SUBSCRIPTION_ALL = "*"
)
Variables ¶
var ( HASH_SLOT_SIZE int32 = 4 INDEX_SIZE int32 = 20 INVALID_INDEX int32 = 0 )
Functions ¶
func GetParentDirectory ¶
func GetPathSeparator ¶
func GetPathSeparator() string
func GetUnixHome ¶
func GetUnixHome() string
func GetWindowsHome ¶
func GetWindowsHome() string
func PathExists ¶
func TagsString2tagsCode ¶
func TagsString2tagsCode(filterType stgcommon.TopicFilterType, tags string) int64
Types ¶
type AcceptSocketService ¶
type AcceptSocketService struct {
// contains filtered or unexported fields
}
AcceptSocketService Author zhoufei Since 2017/10/19
func NewAcceptSocketService ¶
func NewAcceptSocketService(port int32, haService *HAService) *AcceptSocketService
func (*AcceptSocketService) Shutdown ¶
func (self *AcceptSocketService) Shutdown(interrupt bool)
type AllocateMapedFileService ¶
type AllocateMapedFileService struct {
// contains filtered or unexported fields
}
func NewAllocateMapedFileService ¶
func NewAllocateMapedFileService() *AllocateMapedFileService
func (*AllocateMapedFileService) Shutdown ¶
func (self *AllocateMapedFileService) Shutdown()
func (*AllocateMapedFileService) Start ¶
func (self *AllocateMapedFileService) Start()
type AllocateRequest ¶
type AllocateRequest struct {
// contains filtered or unexported fields
}
func NewAllocateRequest ¶
func NewAllocateRequest(filePath string, fileSize int64) *AllocateRequest
type AppendMessageCallback ¶
type AppendMessageCallback interface {
// contains filtered or unexported methods
}
AppendMessageCallback 写消息回调接口 Author: tantexian, <tantexian@qq.com> Since: 2017/8/6
type AppendMessageResult ¶
type AppendMessageResult struct { Status AppendMessageStatus WroteOffset int64 WroteBytes int64 MsgId string StoreTimestamp int64 LogicsOffset int64 }
AppendMessageResult 写入commitlong返回结果集 Author gaoyanlei Since 2017/8/16
type AppendMessageStatus ¶
type AppendMessageStatus int
AppendMessageStatus 写入commitlog 返回code Author gaoyanlei Since 2017/8/16
const ( APPENDMESSAGE_PUT_OK AppendMessageStatus = iota END_OF_FILE MESSAGE_SIZE_EXCEEDED APPENDMESSAGE_UNKNOWN_ERROR )
func (AppendMessageStatus) AppendMessageString ¶
func (status AppendMessageStatus) AppendMessageString() string
type CallSnapshot ¶
type CallSnapshot struct {
// contains filtered or unexported fields
}
func NewCallSnapshot ¶
func NewCallSnapshot(timestamp, callTimesTotal int64) *CallSnapshot
type CleanCommitLogService ¶
type CleanCommitLogService struct {
// contains filtered or unexported fields
}
CleanCommitLogService 清理物理文件服务 Author zhoufei Since 2017/10/13
func NewCleanCommitLogService ¶
func NewCleanCommitLogService(defaultMessageStore *DefaultMessageStore) *CleanCommitLogService
type CleanConsumeQueueService ¶
type CleanConsumeQueueService struct {
// contains filtered or unexported fields
}
CleanConsumeQueueService 清理逻辑文件服务 Author zhoufei Since 2017/10/13
func NewCleanConsumeQueueService ¶
func NewCleanConsumeQueueService(defaultMessageStore *DefaultMessageStore) *CleanConsumeQueueService
type CleanReferenceResource ¶
type CleanReferenceResource interface {
// contains filtered or unexported methods
}
type CommitLog ¶
type CommitLog struct { MapedFileQueue *MapedFileQueue DefaultMessageStore *DefaultMessageStore GroupCommitService *GroupCommitService FlushRealTimeService *FlushRealTimeService AppendMessageCallback *DefaultAppendMessageCallback TopicQueueTable map[string]int64 // contains filtered or unexported fields }
func NewCommitLog ¶
func NewCommitLog(defaultMessageStore *DefaultMessageStore) *CommitLog
type ConsumeQueue ¶
type ConsumeQueue struct {
// contains filtered or unexported fields
}
func NewConsumeQueue ¶
func NewConsumeQueue(topic string, queueId int32, storePath string, mapedFileSize int64, defaultMessageStore *DefaultMessageStore) *ConsumeQueue
type ConsumeQueueTable ¶
type ConsumeQueueTable struct {
// contains filtered or unexported fields
}
func NewConsumeQueueTable ¶
func NewConsumeQueueTable() *ConsumeQueueTable
type DefaultAppendMessageCallback ¶
type DefaultAppendMessageCallback struct {
// contains filtered or unexported fields
}
func NewDefaultAppendMessageCallback ¶
func NewDefaultAppendMessageCallback(size int32, commitLog *CommitLog) *DefaultAppendMessageCallback
type DefaultMessageFilter ¶
type DefaultMessageFilter struct { }
DefaultMessageFilter 消息过滤规则实现 Author zhoufei Since 2017/9/6
func (*DefaultMessageFilter) IsMessageMatched ¶
func (df *DefaultMessageFilter) IsMessageMatched(subscriptionData *heartbeat.SubscriptionData, tagsCode int64) bool
type DefaultMessageStore ¶
type DefaultMessageStore struct { MessageFilter *DefaultMessageFilter // 消息过滤 MessageStoreConfig *MessageStoreConfig // 存储配置 CommitLog *CommitLog FlushConsumeQueueService *FlushConsumeQueueService // 逻辑队列刷盘服务 CleanCommitLogService *CleanCommitLogService // 清理物理文件服务 CleanConsumeQueueService *CleanConsumeQueueService // 清理逻辑文件服务 DispatchMessageService *DispatchMessageService // 分发消息索引服务 IndexService *IndexService // 消息索引服务 AllocateMapedFileService *AllocateMapedFileService // 从物理队列解析消息重新发送到逻辑队列 ReputMessageService *ReputMessageService // 从物理队列解析消息重新发送到逻辑队列 HAService *HAService // HA服务 ScheduleMessageService *ScheduleMessageService // 定时服务 TransactionStateService *TransactionStateService // 分布式事务服务 TransactionCheckExecuter *TransactionCheckExecuter // 事务回查接口 StoreStatsService *StoreStatsService // 运行时数据统计 RunningFlags *RunningFlags // 运行过程标志位 SystemClock *stgcommon.SystemClock // 优化获取时间性能,精度1ms ShutdownFlag bool // 存储服务是否启动 StoreCheckpoint *StoreCheckpoint BrokerStatsManager *stats.BrokerStatsManager // contains filtered or unexported fields }
DefaultMessageStore 存储层对外提供的接口 Author zhoufei Since 2017/9/6
func NewDefaultMessageStore ¶
func NewDefaultMessageStore(messageStoreConfig *MessageStoreConfig, brokerStatsManager *stats.BrokerStatsManager) *DefaultMessageStore
func (*DefaultMessageStore) AppendToCommitLog ¶
func (self *DefaultMessageStore) AppendToCommitLog(startOffset int64, data []byte) bool
AppendToCommitLog 向CommitLog追加数据,并分发至各个Consume Queue Author: zhoufei Since: 2017/10/24
func (*DefaultMessageStore) CheckInDiskByConsumeOffset ¶
func (self *DefaultMessageStore) CheckInDiskByConsumeOffset(topic string, queueId int32, consumeOffset int64) bool
CheckInDiskByConsumeOffset 判断消息是否在磁盘 Author: zhoufei Since: 2017/9/20
func (*DefaultMessageStore) CleanExpiredConsumerQueue ¶
func (self *DefaultMessageStore) CleanExpiredConsumerQueue()
GetMessageStoreTimeStamp 清除失效的消费队列 Author: zhoufei Since: 2017/9/21
func (*DefaultMessageStore) CleanUnusedTopic ¶
func (self *DefaultMessageStore) CleanUnusedTopic(topics []string) int32
CleanUnusedTopic 清除未使用Topic
func (*DefaultMessageStore) Destroy ¶
func (self *DefaultMessageStore) Destroy()
func (*DefaultMessageStore) GetCommitLogData ¶
func (self *DefaultMessageStore) GetCommitLogData(offset int64) *SelectMapedBufferResult
GetCommitLogData 数据复制使用:获取CommitLog数据 Author: zhoufei Since: 2017/10/23
func (*DefaultMessageStore) GetEarliestMessageTime ¶
func (self *DefaultMessageStore) GetEarliestMessageTime(topic string, queueId int32) int64
GetEarliestMessageTime 获取队列中最早的消息时间,如果找不到对应时间,则返回-1 Author: zhoufei Since: 2017/9/21
func (*DefaultMessageStore) GetMaxOffsetInQueue ¶
func (self *DefaultMessageStore) GetMaxOffsetInQueue(topic string, queueId int32) int64
GetMaxOffsetInQueue 获取指定队列最大Offset 如果队列不存在,返回-1 Author: zhoufei Since: 2017/9/20
func (*DefaultMessageStore) GetMaxPhyOffset ¶
func (self *DefaultMessageStore) GetMaxPhyOffset() int64
GetMaxPhyOffset 获取物理队列最大offset Author: zhoufei Since: 2017/10/24
func (*DefaultMessageStore) GetMessage ¶
func (self *DefaultMessageStore) GetMessage(group string, topic string, queueId int32, offset int64, maxMsgNums int32, subscriptionData *heartbeat.SubscriptionData) *GetMessageResult
func (*DefaultMessageStore) GetMessageIds ¶
func (self *DefaultMessageStore) GetMessageIds(topic string, queueId int32, minOffset, maxOffset int64, storeHost string) map[string]int64
GetMessageIds 批量获取MessageId Author: zhoufei Since: 2017/9/21
func (*DefaultMessageStore) GetMessageStoreTimeStamp ¶
func (self *DefaultMessageStore) GetMessageStoreTimeStamp(topic string, queueId int32, offset int64) int64
GetMessageStoreTimeStamp 获取队列中存储时间,如果找不到对应时间,则返回-1 Author: zhoufei Since: 2017/9/21
func (*DefaultMessageStore) GetMinOffsetInQueue ¶
func (self *DefaultMessageStore) GetMinOffsetInQueue(topic string, queueId int32) int64
GetMinOffsetInQueue 获取指定队列最小Offset 如果队列不存在,返回-1 Author: zhoufei Since: 2017/9/20
func (*DefaultMessageStore) GetOffsetInQueueByTime ¶
func (self *DefaultMessageStore) GetOffsetInQueueByTime(topic string, queueId int32, timestamp int64) int64
GetOffsetInQueueByTime 根据消息时间获取某个队列中对应的offset 1、如果指定时间(包含之前之后)有对应的消息,则获取距离此时间最近的offset(优先选择之前) 2、如果指定时间无对应消息,则返回0 Author: zhoufei Since: 2017/9/21
func (*DefaultMessageStore) GetRuntimeInfo ¶
func (self *DefaultMessageStore) GetRuntimeInfo() map[string]string
GetRuntimeInfo 获取运行时统计数据 Author: zhoufei Since: 2017/9/21
func (*DefaultMessageStore) Load ¶
func (self *DefaultMessageStore) Load() bool
func (*DefaultMessageStore) LookMessageByOffset ¶
func (self *DefaultMessageStore) LookMessageByOffset(commitLogOffset int64) *message.MessageExt
LookMessageByOffset 通过物理队列Offset,查询消息。 如果发生错误,则返回null Author: zhoufei Since: 2017/9/20
func (*DefaultMessageStore) Now ¶
func (self *DefaultMessageStore) Now() int64
func (*DefaultMessageStore) PutMessage ¶
func (self *DefaultMessageStore) PutMessage(msg *MessageExtBrokerInner) *PutMessageResult
func (*DefaultMessageStore) QueryMessage ¶
func (self *DefaultMessageStore) QueryMessage(topic string, key string, maxNum int32, begin int64, end int64) *QueryMessageResult
func (*DefaultMessageStore) SelectOneMessageByOffset ¶
func (self *DefaultMessageStore) SelectOneMessageByOffset(commitLogOffset int64) *SelectMapedBufferResult
SelectOneMessageByOffset 通过物理队列Offset,查询消息。 如果发生错误,则返回null Author: zhoufei Since: 2017/9/20
func (*DefaultMessageStore) SelectOneMessageByOffsetAndSize ¶
func (self *DefaultMessageStore) SelectOneMessageByOffsetAndSize(commitLogOffset int64, msgSize int32) *SelectMapedBufferResult
SelectOneMessageByOffsetAndSize 通过物理队列Offset、size,查询消息。 如果发生错误,则返回null Author: zhoufei Since: 2017/9/20
func (*DefaultMessageStore) Shutdown ¶
func (self *DefaultMessageStore) Shutdown()
func (*DefaultMessageStore) SlaveFallBehindMuch ¶
func (self *DefaultMessageStore) SlaveFallBehindMuch() int64
SlaveFallBehindMuch Slave落后Master多少byte Author: zhoufei Since: 2017/9/21
func (*DefaultMessageStore) Start ¶
func (self *DefaultMessageStore) Start() error
func (*DefaultMessageStore) UpdateHaMasterAddress ¶
func (self *DefaultMessageStore) UpdateHaMasterAddress(newAddr string)
UpdateHaMasterAddress 更新HaMaster地址 Author: zhoufei Since: 2017/9/21
type DispatchMessageService ¶
type DispatchMessageService struct {
// contains filtered or unexported fields
}
func NewDispatchMessageService ¶
func NewDispatchMessageService(putMsgIndexHightWater int32, defaultMessageStore *DefaultMessageStore) *DispatchMessageService
func (*DispatchMessageService) Shutdown ¶
func (self *DispatchMessageService) Shutdown()
func (*DispatchMessageService) Start ¶
func (self *DispatchMessageService) Start()
type DispatchRequest ¶
type DispatchRequest struct {
// contains filtered or unexported fields
}
type FlushCommitLogService ¶
type FlushCommitLogService interface {
// contains filtered or unexported methods
}
type FlushConsumeQueueService ¶
type FlushConsumeQueueService struct {
// contains filtered or unexported fields
}
func NewFlushConsumeQueueService ¶
func NewFlushConsumeQueueService(defaultMessageStore *DefaultMessageStore) *FlushConsumeQueueService
func (*FlushConsumeQueueService) Shutdown ¶
func (self *FlushConsumeQueueService) Shutdown()
func (*FlushConsumeQueueService) Start ¶
func (self *FlushConsumeQueueService) Start()
type FlushRealTimeService ¶
type FlushRealTimeService struct {
// contains filtered or unexported fields
}
func NewFlushRealTimeService ¶
func NewFlushRealTimeService(commitLog *CommitLog) *FlushRealTimeService
type GetMessageResult ¶
type GetMessageResult struct { // 多个连续的消息集合 MessageMapedList list.List // 用来向Consumer传送消息 MessageBufferList list.List // 枚举变量,取消息结果 Status GetMessageStatus // 当被过滤后,返回下一次开始的Offset NextBeginOffset int64 // 逻辑队列中的最小Offset MinOffset int64 // 逻辑队列中的最大Offset MaxOffset int64 // ByteBuffer 总字节数 BufferTotalSize int // 是否建议从slave拉消息 SuggestPullingFromSlave bool }
GetMessageResult 访问消息返回结果 Author gaoyanlei Since 2017/8/17
func (*GetMessageResult) GetMessageCount ¶
func (self *GetMessageResult) GetMessageCount() int
getMessageCount 获取message个数 Author gaoyanlei Since 2017/8/17
func (*GetMessageResult) Release ¶
func (self *GetMessageResult) Release()
type GetMessageStatus ¶
type GetMessageStatus int
GetMessageStatus 访问消息返回的状态码 Author gaoyanlei Since 2017/8/17
const ( // 找到消息 FOUND GetMessageStatus = iota // offset正确,但是过滤后没有匹配的消息 NO_MATCHED_MESSAGE // offset正确,但是物理队列消息正在被删除 MESSAGE_WAS_REMOVING // offset正确,但是从逻辑队列没有找到,可能正在被删除 OFFSET_FOUND_NULL // offset错误,严重溢出 OFFSET_OVERFLOW_BADLY // offset错误,溢出1个 OFFSET_OVERFLOW_ONE // offset错误,太小了 OFFSET_TOO_SMALL // 没有对应的逻辑队列 NO_MATCHED_LOGIC_QUEUE // 队列中一条消息都没有 NO_MESSAGE_IN_QUEUE )
func (GetMessageStatus) String ¶
func (self GetMessageStatus) String() string
type GroupCommitRequest ¶
type GroupCommitRequest struct {
// contains filtered or unexported fields
}
GroupCommitRequest Author zhoufei Since 2017/10/18
func NewGroupCommitRequest ¶
func NewGroupCommitRequest(nextOffset int64) *GroupCommitRequest
type GroupCommitService ¶
type GroupCommitService struct {
// contains filtered or unexported fields
}
GroupCommitService Author zhoufei Since 2017/10/18
func NewGroupCommitService ¶
func NewGroupCommitService(commitLog *CommitLog) *GroupCommitService
type GroupTransferService ¶
type GroupTransferService struct {
// contains filtered or unexported fields
}
GroupTransferService 同步进度监听服务,如果达到应用层的写入偏移量,则通知应用层该同步已经完成。 Author zhoufei Since 2017/10/18
func NewGroupTransferService ¶
func NewGroupTransferService(haService *HAService) *GroupTransferService
type HAClient ¶
type HAClient struct {
// contains filtered or unexported fields
}
HAClient HA高可用客户端 Author zhoufei Since 2017/10/18
func NewHAClient ¶
type HAConnection ¶
type HAConnection struct {
// contains filtered or unexported fields
}
HAConnection Author zhoufei Since 2017/10/19
func NewHAConnection ¶
func NewHAConnection(haService *HAService, connection *net.TCPConn) *HAConnection
type HAService ¶
type HAService struct {
// contains filtered or unexported fields
}
HAService HA高可用服务 Author zhoufei Since 2017/10/18
func NewHAService ¶
func NewHAService(defaultMessageStore *DefaultMessageStore) *HAService
type IndexHeader ¶
type IndexHeader struct {
// contains filtered or unexported fields
}
func NewIndexHeader ¶
func NewIndexHeader(mappedByteBuffer *MappedByteBuffer) *IndexHeader
type IndexService ¶
type IndexService struct {
// contains filtered or unexported fields
}
func NewIndexService ¶
func NewIndexService(messageStore *DefaultMessageStore) *IndexService
func (*IndexService) Load ¶
func (self *IndexService) Load(lastExitOK bool) bool
func (*IndexService) Shutdown ¶
func (self *IndexService) Shutdown()
func (*IndexService) Start ¶
func (self *IndexService) Start()
type MapedFile ¶
type MapedFile struct { ReferenceResource // 当前映射的虚拟内存总大小 TotalMapedVitualMemory int64 // 当前JVM中mmap句柄数量 TotalMapedFiles int32 // contains filtered or unexported fields }
maped_file 封装mapedfile类用于操作commitlog文件及consumelog文件 Author: tantexian, <tantexian@qq.com> Since: 2017/8/5
func NewMapedFile ¶
NewMapedFile 根据文件名新建mapedfile Author: tantexian, <tantexian@qq.com> Since: 2017/8/5
func (*MapedFile) AppendMessageWithCallBack ¶
func (self *MapedFile) AppendMessageWithCallBack(msg interface{}, appendMessageCallback AppendMessageCallback) *AppendMessageResult
AppendMessageWithCallBack 向MapedBuffer追加消息 Return: appendNums 成功添加消息字节数 Author: tantexian, <tantexian@qq.com> Since: 2017/8/5
type MapedFileQueue ¶
type MapedFileQueue struct { // 每次触发删除文件,最多删除多少个文件 DeleteFilesBatchMax int // contains filtered or unexported fields }
func NewMapedFileQueue ¶
func NewMapedFileQueue(storePath string, mapedFileSize int64, allocateMapedFileService *AllocateMapedFileService) *MapedFileQueue
type MappedByteBuffer ¶
type MappedByteBuffer struct { MMapBuf mmap.MMap ReadPos int // read at &buf[ReadPos] WritePos int // write at &buf[WritePos] Limit int // MMapBuf's max Size }
func NewMappedByteBuffer ¶
func NewMappedByteBuffer(mMap mmap.MMap) *MappedByteBuffer
func (*MappedByteBuffer) Bytes ¶
func (m *MappedByteBuffer) Bytes() []byte
func (*MappedByteBuffer) Read ¶
func (m *MappedByteBuffer) Read(data []byte) (n int, err error)
Read reads the next len(p) bytes from the buffer or until the buffer is drained. The return value n is the number of bytes read. If the buffer has no data to return, err is io.EOF (unless en(p) is zero); otherwise it is nil.
func (*MappedByteBuffer) ReadInt16 ¶
func (self *MappedByteBuffer) ReadInt16() (i int16)
func (*MappedByteBuffer) ReadInt32 ¶
func (self *MappedByteBuffer) ReadInt32() (i int32)
func (*MappedByteBuffer) ReadInt64 ¶
func (self *MappedByteBuffer) ReadInt64() (i int64)
func (*MappedByteBuffer) ReadInt8 ¶
func (self *MappedByteBuffer) ReadInt8() (i int8)
func (*MappedByteBuffer) Write ¶
func (m *MappedByteBuffer) Write(data []byte) (n int, err error)
Write appends the contents of data to the buffer
func (*MappedByteBuffer) WriteInt16 ¶
func (self *MappedByteBuffer) WriteInt16(i int16) (mappedByteBuffer *MappedByteBuffer)
func (*MappedByteBuffer) WriteInt32 ¶
func (self *MappedByteBuffer) WriteInt32(i int32) (mappedByteBuffer *MappedByteBuffer)
func (*MappedByteBuffer) WriteInt64 ¶
func (self *MappedByteBuffer) WriteInt64(i int64) (mappedByteBuffer *MappedByteBuffer)
func (*MappedByteBuffer) WriteInt8 ¶
func (self *MappedByteBuffer) WriteInt8(i int8) (mappedByteBuffer *MappedByteBuffer)
type MessageExtBrokerInner ¶
type MessageExtBrokerInner struct { message.MessageExt PropertiesString string TagsCode int64 }
MessageExtBrokerInner 存储内部使用的Message对象 Author gaoyanlei Since 2017/8/16
type MessageFilter ¶
type MessageFilter interface {
IsMessageMatched(subscriptionData *heartbeat.SubscriptionData, tagsCode int64) bool
}
MessageFilter 消息过滤接口 Author zhoufei Since 2017/9/6
type MessageStore ¶
type MessageStore interface { Load() bool Start() error Shutdown() // 关闭存储服务 Destroy() PutMessage(msg *MessageExtBrokerInner) *PutMessageResult GetMessage(group string, topic string, queueId int32, offset int64, maxMsgNums int32, subscriptionData *heartbeat.SubscriptionData) *GetMessageResult GetMaxOffsetInQueue(topic string, queueId int32) int64 // 获取指定队列最大Offset 如果队列不存在,返回-1 GetMinOffsetInQueue(topic string, queueId int32) int64 // 获取指定队列最小Offset 如果队列不存在,返回-1 GetCommitLogOffsetInQueue(topic string, queueId int32, cqOffset int64) int64 GetOffsetInQueueByTime(topic string, queueId int32, timestamp int64) int64 // 根据消息时间获取某个队列中对应的offset LookMessageByOffset(commitLogOffset int64) *message.MessageExt // 通过物理队列Offset,查询消息。 如果发生错误,则返回null SelectOneMessageByOffset(commitLogOffset int64) *SelectMapedBufferResult // 通过物理队列Offset,查询消息。 如果发生错误,则返回null SelectOneMessageByOffsetAndSize(commitLogOffset int64, msgSize int32) *SelectMapedBufferResult // 通过物理队列Offset、size,查询消息。 如果发生错误,则返回null GetRunningDataInfo() string GetRuntimeInfo() map[string]string // 取运行时统计数据 GetMaxPhyOffset() int64 //获取物理队列最大offset GetMinPhyOffset() int64 GetEarliestMessageTime(topic string, queueId int32) int64 // 获取队列中最早的消息时间 GetMessageStoreTimeStamp(topic string, queueId int32, offset int64) int64 GetMessageTotalInQueue(topic string, queueId int32) int64 GetCommitLogData(offset int64) *SelectMapedBufferResult // 数据复制使用:获取CommitLog数据 AppendToCommitLog(startOffset int64, data []byte) bool // 数据复制使用:向CommitLog追加数据,并分发至各个Consume Queue ExcuteDeleteFilesManualy() QueryMessage(topic string, key string, maxNum int32, begin int64, end int64) *QueryMessageResult UpdateHaMasterAddress(newAddr string) SlaveFallBehindMuch() int64 // Slave落后Master多少,单位字节 Now() int64 CleanUnusedTopic(topics []string) int32 CleanExpiredConsumerQueue() // 清除失效的消费队列 GetMessageIds(topic string, queueId int32, minOffset, maxOffset int64, storeHost string) map[string]int64 // 批量获取 messageId CheckInDiskByConsumeOffset(topic string, queueId int32, consumeOffset int64) bool //判断消息是否在磁盘 }
MessageStore 存储层对外提供的接口 Author zhoufei Since 2017/9/6
type MessageStoreConfig ¶
type MessageStoreConfig struct { StorePathRootDir string `json:"StorePathRootDir"` // 存储跟目录 StorePathCommitLog string `json:"StorePathCommitLog"` // CommitLog存储目录 StorePathConsumeQueue string `json:"StorePathConsumeQueue"` // ConsumeQueue存储目录 StorePathIndex string `json:"StorePathIndex"` // 索引文件存储目录 StoreCheckpoint string `json:"StoreCheckpoint"` // 异常退出产生的文件 AbortFile string `json:"AbortFile"` // 异常退出产生的文件 TranStateTableStorePath string `json:"TranStateTableStorePath"` // 分布式事务配置 TranStateTableMapedFileSize int32 `json:"TranStateTableMapedFileSize"` TranRedoLogStorePath string `json:"TranRedoLogStorePath"` TranRedoLogMapedFileSize int32 `json:"TranRedoLogMapedFileSize"` CheckTransactionMessageAtleastInterval int64 `json:"CheckTransactionMessageAtleastInterval"` // 事务回查至少间隔时间 CheckTransactionMessageTimerInterval int64 `json:"CheckTransactionMessageTimerInterval"` // 事务回查定时间隔时间 CheckTransactionMessageEnable bool `json:"CheckTransactionMessageEnable"` // 是否开启事务Check过程,双十一时,可以关闭 MapedFileSizeCommitLog int32 `json:"MapedFileSizeCommitLog"` // CommitLog每个文件大小 1G MapedFileSizeConsumeQueue int32 `json:"MapedFileSizeConsumeQueue"` // ConsumeQueue每个文件大小 默认存储30W条消息 FlushIntervalCommitLog int32 `json:"FlushIntervalCommitLog"` // CommitLog刷盘间隔时间(单位毫秒) FlushCommitLogTimed bool `json:"FlushCommitLogTimed"` // 是否定时方式刷盘,默认是实时刷盘 FlushIntervalConsumeQueue int32 `json:"FlushIntervalConsumeQueue"` // ConsumeQueue刷盘间隔时间(单位毫秒) CleanResourceInterval int32 `json:"CleanResourceInterval"` // 清理资源间隔时间(单位毫秒) DeleteCommitLogFilesInterval int32 `json:"DeleteCommitLogFilesInterval"` // 删除多个CommitLog文件的间隔时间(单位毫秒) DeleteConsumeQueueFilesInterval int32 `json:"DeleteConsumeQueueFilesInterval"` // 删除多个ConsumeQueue文件的间隔时间(单位毫秒) DestroyMapedFileIntervalForcibly int32 `json:"DestroyMapedFileIntervalForcibly"` // 强制删除文件间隔时间(单位毫秒) RedeleteHangedFileInterval int32 `json:"RedeleteHangedFileInterval"` // 定期检查Hanged文件间隔时间(单位毫秒) DeleteWhen string `json:"DeleteWhen"` // 何时触发删除文件, 默认凌晨4点删除文件 DiskMaxUsedSpaceRatio int32 `json:"DiskMaxUsedSpaceRatio"` // 磁盘空间最大使用率 FileReservedTime int64 `json:"FileReservedTime"` PutMsgIndexHightWater int32 `json:"PutMsgIndexHightWater"` // 写消息索引到ConsumeQueue,缓冲区高水位,超过则开始流控 MaxMessageSize int32 `json:"MaxMessageSize"` // 最大消息大小,默认512K CheckCRCOnRecover bool `json:"CheckCRCOnRecover"` // 重启时,是否校验CRC FlushCommitLogLeastPages int32 `json:"FlushCommitLogLeastPages"` // 刷CommitLog,至少刷几个PAGE FlushConsumeQueueLeastPages int32 `json:"FlushConsumeQueueLeastPages"` // 刷ConsumeQueue,至少刷几个PAGE FlushCommitLogThoroughInterval int32 `json:"FlushCommitLogThoroughInterval"` // 刷CommitLog,彻底刷盘间隔时间 FlushConsumeQueueThoroughInterval int32 `json:"FlushConsumeQueueThoroughInterval"` // 刷ConsumeQueue,彻底刷盘间隔时间 MaxTransferBytesOnMessageInMemory int32 `json:"MaxTransferBytesOnMessageInMemory"` // 最大被拉取的消息字节数,消息在内存 MaxTransferCountOnMessageInMemory int32 `json:"MaxTransferCountOnMessageInMemory"` // 最大被拉取的消息个数,消息在内存 MaxTransferBytesOnMessageInDisk int32 `json:"MaxTransferBytesOnMessageInDisk"` // 最大被拉取的消息字节数,消息在磁盘 MaxTransferCountOnMessageInDisk int32 `json:"MaxTransferCountOnMessageInDisk"` // 最大被拉取的消息个数,消息在磁盘 AccessMessageInMemoryMaxRatio int64 `json:"AccessMessageInMemoryMaxRatio"` // 命中消息在内存的最大比例 MessageIndexEnable bool `json:"MessageIndexEnable"` // 是否开启消息索引功能 MaxHashSlotNum int32 `json:"MaxHashSlotNum"` MaxIndexNum int32 `json:"MaxIndexNum"` MaxMsgsNumBatch int32 `json:"MaxMsgsNumBatch"` MessageIndexSafe bool `json:"MessageIndexSafe"` // 是否使用安全的消息索引功能,即可靠模式。可靠模式下,异常宕机恢复慢; 非可靠模式下,异常宕机恢复快 HaListenPort int32 `json:"HaListenPort"` // HA功能 HaSendHeartbeatInterval int32 `json:"HaSendHeartbeatInterval"` HaHousekeepingInterval int32 `json:"HaHousekeepingInterval"` HaTransferBatchSize int32 `json:"HaTransferBatchSize"` HaMasterAddress string `json:"HaMasterAddress"` // 如果不设置,则从NameServer获取Master HA服务地址 HaSlaveFallbehindMax int32 `json:"HaSlaveFallbehindMax"` // Slave落后Master超过此值,则认为存在异常 BrokerRole config.BrokerRole `json:"BrokerRole"` FlushDiskType config.FlushDiskType `json:"FlushDiskType"` SyncFlushTimeout int32 `json:"SyncFlushTimeout"` // 同步刷盘超时时间 MessageDelayLevel string `json:"MessageDelayLevel"` // 定时消息相关 FlushDelayOffsetInterval int64 `json:"FlushDelayOffsetInterval"` CleanFileForciblyEnable bool `json:"CleanFileForciblyEnable"` // 磁盘空间超过90%警戒水位,自动开始删除文件 SynchronizationType config.SynchronizationType `json:"SynchronizationType"` // 主从同步数据类型 }
MessageStoreConfig 存储层配置文件类 Author zhoufei Since 2017/9/6
func NewMessageStoreConfig ¶
func NewMessageStoreConfig() *MessageStoreConfig
type PutMessageResult ¶
type PutMessageResult struct { PutMessageStatus PutMessageStatus AppendMessageResult *AppendMessageResult }
PutMessageResult 写入消息返回结果 Author gaoyanlei Since 2017/8/16
type PutMessageStatus ¶
type PutMessageStatus int
PutMessageStatus 写入消息过程的返回结果 Author gaoyanlei Since 2017/8/16
const ( PUTMESSAGE_PUT_OK PutMessageStatus = iota FLUSH_DISK_TIMEOUT FLUSH_SLAVE_TIMEOUT SLAVE_NOT_AVAILABLE SERVICE_NOT_AVAILABLE CREATE_MAPEDFILE_FAILED MESSAGE_ILLEGAL PUTMESSAGE_UNKNOWN_ERROR )
func (PutMessageStatus) PutMessageString ¶
func (status PutMessageStatus) PutMessageString() string
type QueryMessageResult ¶
type QueryMessageResult struct { MessageMapedList []*SelectMapedBufferResult // 多个连续的消息集合 MessageBufferList []*MappedByteBuffer // 用来向Consumer传送消息 IndexLastUpdateTimestamp int64 IndexLastUpdatePhyoffset int64 BufferTotalSize int32 // ByteBuffer 总字节数 }
QueryMessageResult 通过Key查询消息,返回结果 Author zhoufei Since 2017/9/6
func NewQueryMessageResult ¶
func NewQueryMessageResult() *QueryMessageResult
func (*QueryMessageResult) AddMessage ¶
func (qmr *QueryMessageResult) AddMessage(mapedBuffer *SelectMapedBufferResult)
type ReadSocketService ¶
type ReadSocketService struct {
// contains filtered or unexported fields
}
ReadSocketService Author zhoufei Since 2017/10/19
func NewReadSocketService ¶
func NewReadSocketService(connection *net.TCPConn, haConnection *HAConnection) *ReadSocketService
type ReferenceResource ¶
type ReferenceResource struct { CleanReferenceResource // contains filtered or unexported fields }
func NewReferenceResource ¶
func NewReferenceResource() *ReferenceResource
type ReputMessageService ¶
type ReputMessageService struct {
// contains filtered or unexported fields
}
func NewReputMessageService ¶
func NewReputMessageService(defaultMessageStore *DefaultMessageStore) *ReputMessageService
type RunningFlags ¶
type RunningFlags struct {
// contains filtered or unexported fields
}
type ScheduleMessageService ¶
type ScheduleMessageService struct {
// contains filtered or unexported fields
}
func NewScheduleMessageService ¶
func NewScheduleMessageService(defaultMessageStore *DefaultMessageStore) *ScheduleMessageService
func (*ScheduleMessageService) Encode ¶
func (self *ScheduleMessageService) Encode() string
func (*ScheduleMessageService) Load ¶
func (self *ScheduleMessageService) Load() bool
func (*ScheduleMessageService) Shutdown ¶
func (self *ScheduleMessageService) Shutdown()
func (*ScheduleMessageService) Start ¶
func (self *ScheduleMessageService) Start()
type SelectMapedBufferResult ¶
type SelectMapedBufferResult struct { StartOffset int64 MappedByteBuffer *MappedByteBuffer Size int32 MapedFile *MapedFile // contains filtered or unexported fields }
SelectMapedBufferResult 查询Pagecache返回结果 Author zhoufei Since 2017/9/6
func NewSelectMapedBufferResult ¶
func NewSelectMapedBufferResult(startOffset int64, mappedByteBuffer *MappedByteBuffer, size int32, mapedFile *MapedFile) *SelectMapedBufferResult
func (*SelectMapedBufferResult) Release ¶
func (self *SelectMapedBufferResult) Release()
type StoreCheckpoint ¶
type StoreCheckpoint struct {
// contains filtered or unexported fields
}
func NewStoreCheckpoint ¶
func NewStoreCheckpoint(scpPath string) (*StoreCheckpoint, error)
type StoreStatsService ¶
type StoreStatsService struct {
// contains filtered or unexported fields
}
func NewStoreStatsService ¶
func NewStoreStatsService() *StoreStatsService
func (*StoreStatsService) GetGetMessageTransferedMsgCount ¶
func (self *StoreStatsService) GetGetMessageTransferedMsgCount() int64
func (*StoreStatsService) GetPutMessageTimesTotal ¶
func (self *StoreStatsService) GetPutMessageTimesTotal() int64
func (*StoreStatsService) GetRuntimeInfo ¶
func (self *StoreStatsService) GetRuntimeInfo() map[string]string
func (*StoreStatsService) Shutdown ¶
func (self *StoreStatsService) Shutdown()
func (*StoreStatsService) Start ¶
func (self *StoreStatsService) Start()
type TransactionCheckExecuter ¶
type TransactionCheckExecuter interface {
// contains filtered or unexported methods
}
type TransactionStateService ¶
type TransactionStateService struct { }
func NewTransactionStateService ¶
func NewTransactionStateService(defaultMessageStore *DefaultMessageStore) *TransactionStateService
func (*TransactionStateService) Start ¶
func (self *TransactionStateService) Start()
type WaitNotifyObject ¶
type WaitNotifyObject struct {
// contains filtered or unexported fields
}
WaitNotifyObject 用来做线程之间异步通知 Author zhoufei Since 2017/10/23
func NewWaitNotifyObject ¶
func NewWaitNotifyObject() *WaitNotifyObject
type WriteSocketService ¶
type WriteSocketService struct {
// contains filtered or unexported fields
}
WriteSocketService Author zhoufei Since 2017/10/19
func NewWriteSocketService ¶
func NewWriteSocketService(connection *net.TCPConn, haConnection *HAConnection) *WriteSocketService
Source Files ¶
- accept_socket_service.go
- allocate_maped_file_service.go
- allocate_request.go
- append_message_callback.go
- append_message_result.go
- append_message_status.go
- clean_commit_log_service.go
- clean_consume_queue_service.go
- commit_log.go
- consume_queue.go
- default_append_message_callback.go
- default_message_store.go
- dispatch_message_service.go
- dispatch_request.go
- file_util.go
- flush_commit_log_service.go
- flush_consume_queue_service.go
- flush_real_time_service.go
- get_message_result.go
- get_message_status.go
- group_commit_request.go
- group_commit_service.go
- group_transfer_service.go
- ha_client.go
- ha_connection.go
- ha_service.go
- index_file.go
- index_header.go
- index_service.go
- maped_file.go
- mapedfile_queue.go
- mapped_bytebuffer.go
- message_ext_broker_inner.go
- message_filter.go
- message_store.go
- message_store_config.go
- put_message_result.go
- put_message_status.go
- query_message_result.go
- read_socket_service.go
- reference_resource.go
- reput_message_service.go
- running_flags.go
- schedule_message_service.go
- select_maped_buffer_result.go
- store_checkpoint.go
- store_stats_service.go
- transaction_check_executer.go
- transaction_state_service.go
- wait_notify_object.go
- write_socket_service.go