Documentation ¶
Overview ¶
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Index ¶
- Constants
- Variables
- func NewMessageStore(config *Config, brokerStats stats.BrokerStats) store.MessageStore
- type BrokerRoleType
- type Clock
- type Config
- type FlushDiskType
- type MessageFilter
- type PersistentMessageStore
- func (ms *PersistentMessageStore) AppendToCommitLog(startOffset int64, data []byte) bool
- func (ms *PersistentMessageStore) BrokerStats() stats.BrokerStats
- func (ms *PersistentMessageStore) CheckInDiskByConsumeOffset(topic string, queueId int32, consumeOffset int64) bool
- func (ms *PersistentMessageStore) CleanExpiredConsumerQueue()
- func (ms *PersistentMessageStore) CleanUnusedTopic(topics []string) int32
- func (ms *PersistentMessageStore) CommitLogOffsetInQueue(topic string, queueId int32, cqOffset int64) int64
- func (ms *PersistentMessageStore) Destroy()
- func (ms *PersistentMessageStore) EarliestMessageTime(topic string, queueId int32) int64
- func (ms *PersistentMessageStore) EncodeScheduleMsg() string
- func (ms *PersistentMessageStore) ExcuteDeleteFilesManualy()
- func (ms *PersistentMessageStore) GetCommitLogData(offset int64) store.BufferResult
- func (ms *PersistentMessageStore) GetMessage(group string, topic string, queueId int32, offset int64, maxMsgNums int32, ...) *store.GetMessageResult
- func (ms *PersistentMessageStore) Load() bool
- func (ms *PersistentMessageStore) LookMessageByOffset(commitLogOffset int64) *message.MessageExt
- func (ms *PersistentMessageStore) MaxOffsetInQueue(topic string, queueId int32) int64
- func (ms *PersistentMessageStore) MaxPhyOffset() int64
- func (ms *PersistentMessageStore) MessageIds(topic string, queueId int32, minOffset, maxOffset int64, storeHost string) map[string]int64
- func (ms *PersistentMessageStore) MessageStoreTimeStamp(topic string, queueId int32, offset int64) int64
- func (ms *PersistentMessageStore) MessageTotalInQueue(topic string, queueId int32) int64
- func (ms *PersistentMessageStore) MinOffsetInQueue(topic string, queueId int32) int64
- func (ms *PersistentMessageStore) MinPhyOffset() int64
- func (ms *PersistentMessageStore) OffsetInQueueByTime(topic string, queueId int32, timestamp int64) int64
- func (ms *PersistentMessageStore) PutMessage(msg *store.MessageExtInner) *store.PutMessageResult
- func (ms *PersistentMessageStore) QueryMessage(topic string, key string, maxNum int32, begin int64, end int64) *store.QueryMessageResult
- func (ms *PersistentMessageStore) RunningDataInfo() string
- func (ms *PersistentMessageStore) RuntimeInfo() map[string]string
- func (ms *PersistentMessageStore) SelectOneMessageByOffset(commitLogOffset int64) store.BufferResult
- func (ms *PersistentMessageStore) SelectOneMessageByOffsetAndSize(commitLogOffset int64, msgSize int32) store.BufferResult
- func (ms *PersistentMessageStore) Shutdown()
- func (ms *PersistentMessageStore) SlaveFallBehindMuch() int64
- func (ms *PersistentMessageStore) Start() error
- func (ms *PersistentMessageStore) StoreStats() stats.StoreStats
- func (ms *PersistentMessageStore) UpdateHaMasterAddress(newAddr string)
- type SynchronizationMethod
Constants ¶
const ( WaitTimeOut = 1000 * 5 DEFAULT_INITIAL_CAPACITY = 11 )
const ( MessageMagicCode = 0xAABBCCDD ^ 1880681586 + 8 BlankMagicCode = 0xBBCCDDEE ^ 1880681586 + 8 )
const ( END_FILE_MIN_BLANK_LENGTH = 4 + 4 TOTALSIZE = 4 // 1 TOTALSIZE MAGICCODE = 4 // 2 MAGICCODE BODYCRC = 4 // 3 BODYCRC QUEUE_ID = 4 // 4 QUEUEID FLAG = 4 // 5 FLAG QUEUE_OFFSET = 8 // 6 QUEUEOFFSET PHYSICAL_OFFSET = 8 // 7 PHYSICALOFFSET SYSFLAG = 4 // 8 SYSFLAG BORN_TIMESTAMP = 8 // 9 BORNTIMESTAMP BORN_HOST = 8 // 10 BORNHOST STORE_TIMESTAMP = 8 // 11 STORETIMESTAMP STORE_HOST_ADDRESS = 8 // 12 STOREHOSTADDRESS RE_CONSUME_TIMES = 4 // 13 RECONSUMETIMES PREPARED_TRANSACTION_OFFSET = 8 // 14 Prepared Transaction Offset BODY_LENGTH = 4 TOPIC_LENGTH = 1 PROPERTIES_LENGTH = 2 )
const ( OS_PAGE_SIZE = 1024 * 4 MMAPED_ENTIRE_FILE = -1 )
const ( TotalPhysicalMemorySize = 1024 * 1024 * 1024 * 24 LongMinValue = -9223372036854775808 )
const ( NotReadableBit = 1 // 禁止读权限 NotWriteableBit = 1 << 1 // 禁止写权限 WriteLogicsQueueErrorBit = 1 << 2 // 逻辑队列是否发生错误 WriteIndexFileErrorBit = 1 << 3 // 索引文件是否发生错误 DiskFullBit = 1 << 4 // 磁盘空间不足 )
const ( SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXX" FIRST_DELAY_TIME = int64(1000) DELAY_FOR_A_WHILE = int64(100) DELAY_FOR_A_PERIOD = int64(10000) )
const ( COMMIT_LOG_MAX_OFFSET runningStats = iota COMMIT_LOG_MIN_OFFSET COMMIT_LOG_DISK_RATIO CONSUME_QUEUE_DISK_RATIO SCHEDULE_MESSAGE_OFFSET )
const (
CQStoreUnitSize = 20 // 存储单元大小
)
const (
FlushRetryTimesOver = 3
)
const (
MaxManualDeleteFileTimes = 20 // 手工触发一次最多删除次数
)
const (
ReadSocketMaxBufferSize = 1024 * 1024
)
const (
RetryTimesOver = 3
)
const (
SUBSCRIPTION_ALL = "*"
)
Variables ¶
var ( HASH_SLOT_SIZE int32 = 4 INDEX_SIZE int32 = 20 INVALID_INDEX int32 = 0 )
Functions ¶
func NewMessageStore ¶
func NewMessageStore(config *Config, brokerStats stats.BrokerStats) store.MessageStore
Types ¶
type BrokerRoleType ¶
type BrokerRoleType int
const ( ASYNC_MASTER BrokerRoleType = iota // 异步复制Master SYNC_MASTER // 同步双写Master SLAVE // Slave )
func ParseBrokerRoleType ¶
func ParseBrokerRoleType(desc string) (BrokerRoleType, error)
func (BrokerRoleType) String ¶
func (brt BrokerRoleType) String() string
type Config ¶
type Config struct { StorePathRootDir string `json:"StorePathRootDir"` // 存储跟目录 StorePathCommitLog string `json:"StorePathCommitLog"` // CommitLog存储目录 StorePathConsumeQueue string `json:"StorePathConsumeQueue"` // ConsumeQueue存储目录 StorePathIndex string `json:"StorePathIndex"` // 索引文件存储目录 StoreCheckpoint string `json:"StoreCheckpoint"` // 异常退出产生的文件 AbortFile string `json:"AbortFile"` // 异常退出产生的文件 TranStateTableStorePath string `json:"TranStateTableStorePath"` // 分布式事务配置 TranStateTableMappedFileSize int32 `json:"TranStateTableMappedFileSize"` // TranRedoLogStorePath string `json:"TranRedoLogStorePath"` // TranRedoLogMappedFileSize int32 `json:"TranRedoLogMappedFileSize"` // CheckTransactionMessageAtleastInterval int64 `json:"CheckTransactionMessageAtleastInterval"` // 事务回查至少间隔时间 CheckTransactionMessageTimerInterval int64 `json:"CheckTransactionMessageTimerInterval"` // 事务回查定时间隔时间 CheckTransactionMessageEnable bool `json:"CheckTransactionMessageEnable"` // 是否开启事务Check过程,双十一时,可以关闭 MappedFileSizeCommitLog int32 `json:"MappedFileSizeCommitLog"` // CommitLog每个文件大小 1G MappedFileSizeConsumeQueue int32 `json:"MappedFileSizeConsumeQueue"` // ConsumeQueue每个文件大小 默认存储30W条消息 FlushIntervalCommitLog int32 `json:"FlushIntervalCommitLog"` // CommitLog刷盘间隔时间(单位毫秒) FlushCommitLogTimed bool `json:"FlushCommitLogTimed"` // 是否定时方式刷盘,默认是实时刷盘 FlushIntervalConsumeQueue int32 `json:"FlushIntervalConsumeQueue"` // ConsumeQueue刷盘间隔时间(单位毫秒) CleanResourceInterval int32 `json:"CleanResourceInterval"` // 清理资源间隔时间(单位毫秒) DeleteCommitLogFilesInterval int32 `json:"DeleteCommitLogFilesInterval"` // 删除多个CommitLog文件的间隔时间(单位毫秒) DeleteConsumeQueueFilesInterval int32 `json:"DeleteConsumeQueueFilesInterval"` // 删除多个ConsumeQueue文件的间隔时间(单位毫秒) DestroyMappedFileIntervalForcibly int32 `json:"DestroyMappedFileIntervalForcibly"` // 强制删除文件间隔时间(单位毫秒) RedeleteHangedFileInterval int32 `json:"RedeleteHangedFileInterval"` // 定期检查Hanged文件间隔时间(单位毫秒) DeleteWhen string `json:"DeleteWhen"` // 何时触发删除文件, 默认凌晨4点删除文件 DiskMaxUsedSpaceRatio int32 `json:"DiskMaxUsedSpaceRatio"` // 磁盘空间最大使用率 FileReservedTime int64 `json:"FileReservedTime"` PutMsgIndexHightWater int32 `json:"PutMsgIndexHightWater"` // 写消息索引到ConsumeQueue,缓冲区高水位,超过则开始流控 MaxMessageSize int32 `json:"MaxMessageSize"` // 最大消息大小,默认512K CheckCRCOnRecover bool `json:"CheckCRCOnRecover"` // 重启时,是否校验CRC FlushCommitLogLeastPages int32 `json:"FlushCommitLogLeastPages"` // 刷CommitLog,至少刷几个PAGE FlushConsumeQueueLeastPages int32 `json:"FlushConsumeQueueLeastPages"` // 刷ConsumeQueue,至少刷几个PAGE FlushCommitLogThoroughInterval int32 `json:"FlushCommitLogThoroughInterval"` // 刷CommitLog,彻底刷盘间隔时间 FlushConsumeQueueThoroughInterval int32 `json:"FlushConsumeQueueThoroughInterval"` // 刷ConsumeQueue,彻底刷盘间隔时间 MaxTransferBytesOnMessageInMemory int32 `json:"MaxTransferBytesOnMessageInMemory"` // 最大被拉取的消息字节数,消息在内存 MaxTransferCountOnMessageInMemory int32 `json:"MaxTransferCountOnMessageInMemory"` // 最大被拉取的消息个数,消息在内存 MaxTransferBytesOnMessageInDisk int32 `json:"MaxTransferBytesOnMessageInDisk"` // 最大被拉取的消息字节数,消息在磁盘 MaxTransferCountOnMessageInDisk int32 `json:"MaxTransferCountOnMessageInDisk"` // 最大被拉取的消息个数,消息在磁盘 AccessMessageInMemoryMaxRatio int64 `json:"AccessMessageInMemoryMaxRatio"` // 命中消息在内存的最大比例 MessageIndexEnable bool `json:"MessageIndexEnable"` // 是否开启消息索引功能 MaxHashSlotNum int32 `json:"MaxHashSlotNum"` MaxIndexNum int32 `json:"MaxIndexNum"` MaxMsgsNumBatch int32 `json:"MaxMsgsNumBatch"` MessageIndexSafe bool `json:"MessageIndexSafe"` // 是否使用安全的消息索引功能,即可靠模式。可靠模式下,异常宕机恢复慢; 非可靠模式下,异常宕机恢复快 HaListenPort int32 `json:"HaListenPort"` // HA功能 HaSendHeartbeatInterval int32 `json:"HaSendHeartbeatInterval"` // HaHousekeepingInterval int32 `json:"HaHousekeepingInterval"` // HaTransferBatchSize int32 `json:"HaTransferBatchSize"` // HaMasterAddress string `json:"HaMasterAddress"` // 如果不设置,则从NameServer获取Master HA服务地址 HaSlaveFallbehindMax int32 `json:"HaSlaveFallbehindMax"` // Slave落后Master超过此值,则认为存在异常 BrokerRole BrokerRoleType `json:"BrokerRole"` // FlushDisk FlushDiskType `json:"FlushDisk"` // SyncFlushTimeout int32 `json:"SyncFlushTimeout"` // 同步刷盘超时时间 MessageDelayLevel string `json:"MessageDelayLevel"` // 定时消息相关 FlushDelayOffsetInterval int64 `json:"FlushDelayOffsetInterval"` // CleanFileForciblyEnable bool `json:"CleanFileForciblyEnable"` // 磁盘空间超过90%警戒水位,自动开始删除文件 SyncMethod SynchronizationMethod `json:"SyncMethod"` // 主从同步数据类型 }
Config 存储层配置文件类 Author zhoufei Since 2017/9/6
type FlushDiskType ¶
type FlushDiskType int
const ( // 同步刷盘 SYNC_FLUSH FlushDiskType = iota // 异步刷盘 ASYNC_FLUSH )
func ParseFlushDiskType ¶
func ParseFlushDiskType(desc string) (FlushDiskType, error)
func (FlushDiskType) String ¶
func (fdt FlushDiskType) String() string
type MessageFilter ¶
type MessageFilter interface {
IsMessageMatched(subscriptionData *heartbeat.SubscriptionData, tagsCode int64) bool
}
MessageFilter 消息过滤接口 Author zhoufei Since 2017/9/6
type PersistentMessageStore ¶
type PersistentMessageStore struct {
// contains filtered or unexported fields
}
PersistentMessageStore 存储层对外提供的接口 Author zhoufei Since 2017/9/6
func (*PersistentMessageStore) AppendToCommitLog ¶
func (ms *PersistentMessageStore) AppendToCommitLog(startOffset int64, data []byte) bool
AppendToCommitLog 向CommitLog追加数据,并分发至各个Consume Queue Author: zhoufei Since: 2017/10/24
func (*PersistentMessageStore) BrokerStats ¶
func (ms *PersistentMessageStore) BrokerStats() stats.BrokerStats
func (*PersistentMessageStore) CheckInDiskByConsumeOffset ¶
func (ms *PersistentMessageStore) CheckInDiskByConsumeOffset(topic string, queueId int32, consumeOffset int64) bool
CheckInDiskByConsumeOffset 判断消息是否在磁盘 Author: zhoufei Since: 2017/9/20
func (*PersistentMessageStore) CleanExpiredConsumerQueue ¶
func (ms *PersistentMessageStore) CleanExpiredConsumerQueue()
CleanExpiredConsumerQueue 清除失效的消费队列 Author: zhoufei Since: 2017/9/21
func (*PersistentMessageStore) CleanUnusedTopic ¶
func (ms *PersistentMessageStore) CleanUnusedTopic(topics []string) int32
CleanUnusedTopic 清除未使用Topic
func (*PersistentMessageStore) CommitLogOffsetInQueue ¶
func (ms *PersistentMessageStore) CommitLogOffsetInQueue(topic string, queueId int32, cqOffset int64) int64
CommitLogOffsetInQueue
Author: luoji, <gunsluo@gmail.com> Since: 2018-01-02
func (*PersistentMessageStore) Destroy ¶
func (ms *PersistentMessageStore) Destroy()
func (*PersistentMessageStore) EarliestMessageTime ¶
func (ms *PersistentMessageStore) EarliestMessageTime(topic string, queueId int32) int64
EarliestMessageTime 获取队列中最早的消息时间,如果找不到对应时间,则返回-1 Author: zhoufei Since: 2017/9/21
func (*PersistentMessageStore) EncodeScheduleMsg ¶
func (ms *PersistentMessageStore) EncodeScheduleMsg() string
func (*PersistentMessageStore) ExcuteDeleteFilesManualy ¶
func (ms *PersistentMessageStore) ExcuteDeleteFilesManualy()
func (*PersistentMessageStore) GetCommitLogData ¶
func (ms *PersistentMessageStore) GetCommitLogData(offset int64) store.BufferResult
GetCommitLogData 数据复制使用:获取CommitLog数据 Author: zhoufei Since: 2017/10/23
func (*PersistentMessageStore) GetMessage ¶
func (ms *PersistentMessageStore) GetMessage(group string, topic string, queueId int32, offset int64, maxMsgNums int32, subscriptionData *heartbeat.SubscriptionData) *store.GetMessageResult
func (*PersistentMessageStore) LookMessageByOffset ¶
func (ms *PersistentMessageStore) LookMessageByOffset(commitLogOffset int64) *message.MessageExt
LookMessageByOffset 通过物理队列Offset,查询消息。 如果发生错误,则返回null Author: zhoufei Since: 2017/9/20
func (*PersistentMessageStore) MaxOffsetInQueue ¶
func (ms *PersistentMessageStore) MaxOffsetInQueue(topic string, queueId int32) int64
MaxOffsetInQueue 获取指定队列最大Offset 如果队列不存在,返回-1 Author: zhoufei Since: 2017/9/20
func (*PersistentMessageStore) MaxPhyOffset ¶
func (ms *PersistentMessageStore) MaxPhyOffset() int64
MaxPhyOffset 获取物理队列最大offset Author: zhoufei Since: 2017/10/24
func (*PersistentMessageStore) MessageIds ¶
func (ms *PersistentMessageStore) MessageIds(topic string, queueId int32, minOffset, maxOffset int64, storeHost string) map[string]int64
MessageIds 批量获取MessageId Author: zhoufei Since: 2017/9/21
func (*PersistentMessageStore) MessageStoreTimeStamp ¶
func (ms *PersistentMessageStore) MessageStoreTimeStamp(topic string, queueId int32, offset int64) int64
MessageStoreTimeStamp 获取队列中存储时间,如果找不到对应时间,则返回-1 Author: zhoufei Since: 2017/9/21
func (*PersistentMessageStore) MessageTotalInQueue ¶
func (ms *PersistentMessageStore) MessageTotalInQueue(topic string, queueId int32) int64
func (*PersistentMessageStore) MinOffsetInQueue ¶
func (ms *PersistentMessageStore) MinOffsetInQueue(topic string, queueId int32) int64
MinOffsetInQueue 获取指定队列最小Offset 如果队列不存在,返回-1 Author: zhoufei Since: 2017/9/20
func (*PersistentMessageStore) MinPhyOffset ¶
func (ms *PersistentMessageStore) MinPhyOffset() int64
func (*PersistentMessageStore) OffsetInQueueByTime ¶
func (ms *PersistentMessageStore) OffsetInQueueByTime(topic string, queueId int32, timestamp int64) int64
OffsetInQueueByTime 根据消息时间获取某个队列中对应的offset 1、如果指定时间(包含之前之后)有对应的消息,则获取距离此时间最近的offset(优先选择之前) 2、如果指定时间无对应消息,则返回0 Author: zhoufei Since: 2017/9/21
func (*PersistentMessageStore) PutMessage ¶
func (ms *PersistentMessageStore) PutMessage(msg *store.MessageExtInner) *store.PutMessageResult
func (*PersistentMessageStore) QueryMessage ¶
func (ms *PersistentMessageStore) QueryMessage(topic string, key string, maxNum int32, begin int64, end int64) *store.QueryMessageResult
func (*PersistentMessageStore) RunningDataInfo ¶
func (ms *PersistentMessageStore) RunningDataInfo() string
func (*PersistentMessageStore) RuntimeInfo ¶
func (ms *PersistentMessageStore) RuntimeInfo() map[string]string
RuntimeInfo 获取运行时统计数据 Author: zhoufei Since: 2017/9/21
func (*PersistentMessageStore) SelectOneMessageByOffset ¶
func (ms *PersistentMessageStore) SelectOneMessageByOffset(commitLogOffset int64) store.BufferResult
SelectOneMessageByOffset 通过物理队列Offset,查询消息。 如果发生错误,则返回null Author: zhoufei Since: 2017/9/20
func (*PersistentMessageStore) SelectOneMessageByOffsetAndSize ¶
func (ms *PersistentMessageStore) SelectOneMessageByOffsetAndSize(commitLogOffset int64, msgSize int32) store.BufferResult
SelectOneMessageByOffsetAndSize 通过物理队列Offset、size,查询消息。 如果发生错误,则返回null Author: zhoufei Since: 2017/9/20
func (*PersistentMessageStore) Shutdown ¶
func (ms *PersistentMessageStore) Shutdown()
func (*PersistentMessageStore) SlaveFallBehindMuch ¶
func (ms *PersistentMessageStore) SlaveFallBehindMuch() int64
SlaveFallBehindMuch Slave落后Master多少byte Author: zhoufei Since: 2017/9/21
func (*PersistentMessageStore) Start ¶
func (ms *PersistentMessageStore) Start() error
func (*PersistentMessageStore) StoreStats ¶
func (ms *PersistentMessageStore) StoreStats() stats.StoreStats
func (*PersistentMessageStore) UpdateHaMasterAddress ¶
func (ms *PersistentMessageStore) UpdateHaMasterAddress(newAddr string)
UpdateHaMasterAddress 更新HaMaster地址 Author: zhoufei Since: 2017/9/21
type SynchronizationMethod ¶
type SynchronizationMethod int
const ( SYNCHRONIZATION_FULL SynchronizationMethod = iota // 同步所有文件的数据 SYNCHRONIZATION_LAST // 同步最后一个文件的数据 )
Source Files ¶
- allocate.go
- clock.go
- commit_log.go
- config.go
- consume.go
- consume_queue.go
- dispatch.go
- ha.go
- index.go
- mapped_buffer_result.go
- mapped_byte_buffer.go
- mapped_file.go
- mapped_file_queue.go
- message_filter.go
- message_store.go
- reput_message.go
- running_flags.go
- schedule.go
- store_checkpoint.go
- transaction.go
- utils.go