Documentation ¶
Index ¶
- Constants
- Variables
- func DecodeMessage(msg []byte) (uint32, []byte, error)
- func EncodeMessage(messageSeq uint32, data []byte) []byte
- func EncodeStreamItem(s *StreamItem) []byte
- type ChannelInfo
- type Conversation
- type ConversationSet
- type Entry
- type FileStore
- func (f *FileStore) AddAllowlist(channelID string, channelType uint8, uids []string) error
- func (f *FileStore) AddDenylist(channelID string, channelType uint8, uids []string) error
- func (f *FileStore) AddIPBlacklist(ips []string) error
- func (f *FileStore) AddOrUpdateChannel(channelInfo *ChannelInfo) error
- func (f *FileStore) AddOrUpdateConversations(uid string, conversations []*Conversation) error
- func (f *FileStore) AddSubscribers(channelID string, channelType uint8, uids []string) error
- func (f *FileStore) AddSystemUIDs(uids []string) error
- func (f *FileStore) AppendMessageOfNotifyQueue(messages []Message) error
- func (f *FileStore) Close() error
- func (f *FileStore) DeleteChannel(channelID string, channelType uint8) error
- func (f *FileStore) DeleteConversation(uid string, channelID string, channelType uint8) error
- func (f *FileStore) ExistChannel(channelID string, channelType uint8) (bool, error)
- func (f *FileStore) GetAllowlist(channelID string, channelType uint8) ([]string, error)
- func (f *FileStore) GetChannel(channelID string, channelType uint8) (*ChannelInfo, error)
- func (f *FileStore) GetConversation(uid string, channelID string, channelType uint8) (*Conversation, error)
- func (f *FileStore) GetConversations(uid string) ([]*Conversation, error)
- func (f *FileStore) GetDenylist(channelID string, channelType uint8) ([]string, error)
- func (f *FileStore) GetIPBlacklist() ([]string, error)
- func (f *FileStore) GetMessageOfUserCursor(uid string) (uint32, error)
- func (f *FileStore) GetMessagesOfNotifyQueue(count int) ([]Message, error)
- func (f *FileStore) GetSubscribers(channelID string, channelType uint8) ([]string, error)
- func (f *FileStore) GetSystemUIDs() ([]string, error)
- func (f *FileStore) GetUserToken(uid string, deviceFlag uint8) (string, uint8, error)
- func (f *FileStore) Open() error
- func (f *FileStore) RemoveAllAllowlist(channelID string, channelType uint8) error
- func (f *FileStore) RemoveAllDenylist(channelID string, channelType uint8) error
- func (f *FileStore) RemoveAllSubscriber(channelID string, channelType uint8) error
- func (f *FileStore) RemoveAllowlist(channelID string, channelType uint8, uids []string) error
- func (f *FileStore) RemoveDenylist(channelID string, channelType uint8, uids []string) error
- func (f *FileStore) RemoveIPBlacklist(ips []string) error
- func (f *FileStore) RemoveMessagesOfNotifyQueue(messageIDs []int64) error
- func (f *FileStore) RemoveSubscribers(channelID string, channelType uint8, uids []string) error
- func (f *FileStore) RemoveSystemUIDs(uids []string) error
- func (f *FileStore) SaveStreamMeta(meta *StreamMeta) error
- func (f *FileStore) SyncMessageOfUser(uid string, startMessageSeq uint32, limit int) ([]Message, error)
- func (f *FileStore) UpdateMessageOfUserCursorIfNeed(uid string, messageSeq uint32) error
- func (f *FileStore) UpdateUserToken(uid string, deviceFlag uint8, deviceLevel uint8, token string) error
- type FileStoreForMsg
- func (f *FileStoreForMsg) AppendMessages(channelID string, channelType uint8, msgs []Message) (seqs []uint32, err error)
- func (f *FileStoreForMsg) AppendMessagesOfUser(uid string, msgs []Message) (seqs []uint32, err error)
- func (f *FileStoreForMsg) AppendStreamItem(channelID string, channelType uint8, streamNo string, item *StreamItem) (uint32, error)
- func (f *FileStoreForMsg) Close() error
- func (f *FileStoreForMsg) DeleteChannelAndClearMessages(channelID string, channelType uint8) error
- func (f *FileStoreForMsg) GetLastMsgSeq(channelID string, channelType uint8) (uint32, error)
- func (f *FileStoreForMsg) GetStreamItems(channelID string, channelType uint8, streamNo string) ([]*StreamItem, error)
- func (f *FileStoreForMsg) GetStreamMeta(channelID string, channelType uint8, streamNo string) (*StreamMeta, error)
- func (f *FileStoreForMsg) LoadLastMsgs(channelID string, channelType uint8, limit int) ([]Message, error)
- func (f *FileStoreForMsg) LoadLastMsgsWithEnd(channelID string, channelType uint8, endMessageSeq uint32, limit int) ([]Message, error)
- func (f *FileStoreForMsg) LoadMsg(channelID string, channelType uint8, messageSeq uint32) (Message, error)
- func (f *FileStoreForMsg) LoadNextRangeMsgs(channelID string, channelType uint8, startMessageSeq, endMessageSeq uint32, ...) ([]Message, error)
- func (f *FileStoreForMsg) LoadPrevRangeMsgs(channelID string, channelType uint8, startMessageSeq, endMessageSeq uint32, ...) ([]Message, error)
- func (f *FileStoreForMsg) SaveStreamMeta(meta *StreamMeta) error
- func (f *FileStoreForMsg) StreamEnd(channelID string, channelType uint8, streamNo string) error
- type Index
- func (idx *Index) Append(offset uint32, position uint32) error
- func (idx *Index) Close() error
- func (idx *Index) IsFull() bool
- func (idx *Index) LastPosition() MessageSeqPosition
- func (idx *Index) Lookup(targetOffset uint32) (MessageSeqPosition, error)
- func (idx *Index) Sync() error
- func (idx *Index) TruncateEntries(number int) error
- type Message
- type MessageSeqPosition
- type SegmentMode
- type Store
- type StoreConfig
- type Stream
- type StreamItem
- type StreamItemSlice
- type StreamMeta
- type SubscribeInfo
Constants ¶
View Source
const FileDefaultMode os.FileMode = 0755
FileDefaultMode FileDefaultMode
View Source
const UserQueuePrefix = "userqueue_"
Variables ¶
View Source
var ( // Encoding Encoding Encoding = binary.BigEndian // ErrorNotData ErrorNotData ErrorNotData = errors.New("no data") // MagicNumber MagicNumber MagicNumber = [2]byte{0x15, 0x16} // lm // EndMagicNumber EndMagicNumber EndMagicNumber = [1]byte{0x3} // MessageVersion log version MessageVersion = [1]byte{0x01} // SnapshotMagicNumber SnapshotMagicNumber SnapshotMagicNumber = [2]byte{0xb, 0xa} // ba // EndSnapshotMagicNumber EndSnapshotMagicNumber EndSnapshotMagicNumber = [1]byte{0xf} // BackupSlotMagicNumber BackupSlotMagicNumber BackupSlotMagicNumber = [2]byte{0xc, 0xd} // BackupMagicNumber BackupMagicNumber BackupMagicNumber = []byte("---backup start ---") // MessageSeqSize MessageSeqSize MessageSeqSize = 8 // LogDataLenSize LogDataLenSize MessageDataLenSize = 4 // AppliIndexSize AppliIndexSize AppliIndexSize = 8 IndexMaxSizeOfByte int64 = 2 * 1024 * 1024 // 索引文件的最大大小 2M )
View Source
var ( StreamVersion = [1]byte{0x01} // StreamMagicNumber StreamMagicNumber StreamMagicNumber = [2]byte{0x15, 0x16} // StreamEndMagicNumber StreamEndMagicNumber StreamEndMagicNumber = [1]byte{0x3} )
View Source
var ( // ErrIndexCorrupt ErrIndexCorrupt ErrIndexCorrupt = errors.New("corrupt index file") )
Functions ¶
func EncodeMessage ¶
func EncodeStreamItem ¶
func EncodeStreamItem(s *StreamItem) []byte
Types ¶
type ChannelInfo ¶
type ChannelInfo struct { ChannelID string `json:"-"` ChannelType uint8 `json:"-"` Ban bool `json:"ban"` // 是否被封 Large bool `json:"large"` // 是否是超大群 }
func NewChannelInfo ¶
func NewChannelInfo(channelID string, channelType uint8) *ChannelInfo
NewChannelInfo NewChannelInfo
type Conversation ¶
type Conversation struct { UID string // User UID (user who belongs to the most recent session) ChannelID string // Conversation channel ChannelType uint8 UnreadCount int // Number of unread messages Timestamp int64 // Last session timestamp (10 digits) LastMsgSeq uint32 // Sequence number of the last message LastClientMsgNo string // Last message client number LastMsgID int64 // Last message ID Version int64 // Data version }
Conversation Conversation
func (*Conversation) String ¶
func (c *Conversation) String() string
type ConversationSet ¶
type ConversationSet []*Conversation
func NewConversationSet ¶
func NewConversationSet(data []byte) ConversationSet
func (ConversationSet) Encode ¶
func (c ConversationSet) Encode() []byte
type FileStore ¶
type FileStore struct { *FileStoreForMsg // contains filtered or unexported fields }
func NewFileStore ¶
func NewFileStore(cfg *StoreConfig) *FileStore
func (*FileStore) AddAllowlist ¶
func (*FileStore) AddDenylist ¶
func (*FileStore) AddIPBlacklist ¶
func (*FileStore) AddOrUpdateChannel ¶
func (f *FileStore) AddOrUpdateChannel(channelInfo *ChannelInfo) error
func (*FileStore) AddOrUpdateConversations ¶
func (f *FileStore) AddOrUpdateConversations(uid string, conversations []*Conversation) error
func (*FileStore) AddSubscribers ¶
func (*FileStore) AddSystemUIDs ¶
func (*FileStore) AppendMessageOfNotifyQueue ¶
func (*FileStore) DeleteChannel ¶
func (*FileStore) DeleteConversation ¶
func (*FileStore) ExistChannel ¶
func (*FileStore) GetAllowlist ¶
func (*FileStore) GetChannel ¶
func (f *FileStore) GetChannel(channelID string, channelType uint8) (*ChannelInfo, error)
func (*FileStore) GetConversation ¶
func (*FileStore) GetConversations ¶
func (f *FileStore) GetConversations(uid string) ([]*Conversation, error)
func (*FileStore) GetDenylist ¶
func (*FileStore) GetIPBlacklist ¶
func (*FileStore) GetMessageOfUserCursor ¶
GetMessageOfUserCursor GetMessageOfUserCursor
func (*FileStore) GetMessagesOfNotifyQueue ¶
func (*FileStore) GetSubscribers ¶
func (*FileStore) GetSystemUIDs ¶
func (*FileStore) GetUserToken ¶
func (*FileStore) RemoveAllAllowlist ¶
func (*FileStore) RemoveAllDenylist ¶
func (*FileStore) RemoveAllSubscriber ¶
func (*FileStore) RemoveAllowlist ¶
func (*FileStore) RemoveDenylist ¶
func (*FileStore) RemoveIPBlacklist ¶
func (*FileStore) RemoveMessagesOfNotifyQueue ¶
func (*FileStore) RemoveSubscribers ¶
func (*FileStore) RemoveSystemUIDs ¶
func (*FileStore) SaveStreamMeta ¶
func (f *FileStore) SaveStreamMeta(meta *StreamMeta) error
func (*FileStore) SyncMessageOfUser ¶
func (*FileStore) UpdateMessageOfUserCursorIfNeed ¶
type FileStoreForMsg ¶
func NewFileStoreForMsg ¶
func NewFileStoreForMsg(cfg *StoreConfig) *FileStoreForMsg
func (*FileStoreForMsg) AppendMessages ¶
func (*FileStoreForMsg) AppendMessagesOfUser ¶
func (f *FileStoreForMsg) AppendMessagesOfUser(uid string, msgs []Message) (seqs []uint32, err error)
func (*FileStoreForMsg) AppendStreamItem ¶
func (f *FileStoreForMsg) AppendStreamItem(channelID string, channelType uint8, streamNo string, item *StreamItem) (uint32, error)
func (*FileStoreForMsg) Close ¶
func (f *FileStoreForMsg) Close() error
func (*FileStoreForMsg) DeleteChannelAndClearMessages ¶
func (f *FileStoreForMsg) DeleteChannelAndClearMessages(channelID string, channelType uint8) error
func (*FileStoreForMsg) GetLastMsgSeq ¶
func (f *FileStoreForMsg) GetLastMsgSeq(channelID string, channelType uint8) (uint32, error)
func (*FileStoreForMsg) GetStreamItems ¶
func (f *FileStoreForMsg) GetStreamItems(channelID string, channelType uint8, streamNo string) ([]*StreamItem, error)
func (*FileStoreForMsg) GetStreamMeta ¶
func (f *FileStoreForMsg) GetStreamMeta(channelID string, channelType uint8, streamNo string) (*StreamMeta, error)
func (*FileStoreForMsg) LoadLastMsgs ¶
func (*FileStoreForMsg) LoadLastMsgsWithEnd ¶
func (*FileStoreForMsg) LoadNextRangeMsgs ¶
func (*FileStoreForMsg) LoadPrevRangeMsgs ¶
func (*FileStoreForMsg) SaveStreamMeta ¶
func (f *FileStoreForMsg) SaveStreamMeta(meta *StreamMeta) error
type Index ¶
Index Index
func (*Index) LastPosition ¶
func (idx *Index) LastPosition() MessageSeqPosition
func (*Index) Lookup ¶
func (idx *Index) Lookup(targetOffset uint32) (MessageSeqPosition, error)
Lookup Find the largest offset less than or equal to the given targetOffset and return a pair holding this offset and its corresponding physical file position
func (*Index) TruncateEntries ¶
TruncateEntries TruncateEntries
type MessageSeqPosition ¶
MessageSeqPosition MessageSeqPosition
type Store ¶
type Store interface { Open() error Close() error // #################### user #################### // GetUserToken return token,device level and error GetUserToken(uid string, deviceFlag uint8) (string, uint8, error) UpdateUserToken(uid string, deviceFlag uint8, deviceLevel uint8, token string) error // UpdateMessageOfUserCursorIfNeed 更新用户消息队列的游标,用户读到的位置 UpdateMessageOfUserCursorIfNeed(uid string, messageSeq uint32) error // #################### channel #################### GetChannel(channelID string, channelType uint8) (*ChannelInfo, error) // AddOrUpdateChannel add or update channel AddOrUpdateChannel(channelInfo *ChannelInfo) error // ExistChannel return true if channel exist ExistChannel(channelID string, channelType uint8) (bool, error) // AddSubscribers 添加订阅者 AddSubscribers(channelID string, channelType uint8, uids []string) error // RemoveSubscribers 移除指定频道内指定uid的订阅者 RemoveSubscribers(channelID string, channelType uint8, uids []string) error // GetSubscribers 获取订阅者列表 GetSubscribers(channelID string, channelType uint8) ([]string, error) RemoveAllSubscriber(channelID string, channelType uint8) error GetAllowlist(channelID string, channelType uint8) ([]string, error) GetDenylist(channelID string, channelType uint8) ([]string, error) // DeleteChannel 删除频道 DeleteChannel(channelID string, channelType uint8) error // AddDenylist 添加频道黑名单 AddDenylist(channelID string, channelType uint8, uids []string) error // RemoveAllDenylist 移除指定频道的所有黑名单 RemoveAllDenylist(channelID string, channelType uint8) error // RemoveDenylist 移除频道内指定用户的黑名单 RemoveDenylist(channelID string, channelType uint8, uids []string) error // AddAllowlist 添加白名单 AddAllowlist(channelID string, channelType uint8, uids []string) error // RemoveAllAllowlist 移除指定频道的所有白名单 RemoveAllAllowlist(channelID string, channelType uint8) error // RemoveAllowlist 移除白名单 RemoveAllowlist(channelID string, channelType uint8, uids []string) error // #################### messages #################### // StoreMsg return seqs and error, seqs len is msgs len AppendMessages(channelID string, channelType uint8, msgs []Message) (seqs []uint32, err error) // 追加消息到用户的消息队列 AppendMessagesOfUser(uid string, msgs []Message) (seqs []uint32, err error) LoadMsg(channelID string, channelType uint8, seq uint32) (Message, error) LoadLastMsgs(channelID string, channelType uint8, limit int) ([]Message, error) // LoadLastMsgsWithEnd 加载最新的消息 end表示加载到end的位置结束加载 end=0表示不做限制 结果不包含end LoadLastMsgsWithEnd(channelID string, channelType uint8, end uint32, limit int) ([]Message, error) // LoadPrevRangeMsgs 向上加载指定范围的消息 end=0表示不做限制 比如 start=100 end=0 limit=10 则返回的消息seq为99-90的消息 // 结果包含start,不包含end LoadPrevRangeMsgs(channelID string, channelType uint8, start, end uint32, limit int) ([]Message, error) // LoadNextRangeMsgs 向下加载指定范围的消息 end=0表示不做限制 比如 start=100 end=200 limit=10 则返回的消息seq为101-111的消息, // 比如start=100 end=105 limit=10 则返回的消息seq为101-104的消息 // 结果包含start,不包含end LoadNextRangeMsgs(channelID string, channelType uint8, start, end uint32, limit int) ([]Message, error) // GetLastMsgSeq 获取最新的消息seq GetLastMsgSeq(channelID string, channelType uint8) (uint32, error) // GetMessageOfUserCursor 获取用户消息队列的游标,用户读到的位置 GetMessageOfUserCursor(uid string) (uint32, error) // SyncMessageOfUser 同步用户队列里的消息(写扩散) SyncMessageOfUser(uid string, startMessageSeq uint32, limit int) ([]Message, error) AppendMessageOfNotifyQueue(m []Message) error GetMessagesOfNotifyQueue(count int) ([]Message, error) // RemoveMessagesOfNotifyQueue 从通知队列里移除消息 RemoveMessagesOfNotifyQueue(messageIDs []int64) error DeleteChannelAndClearMessages(channelID string, channelType uint8) error // #################### conversations #################### AddOrUpdateConversations(uid string, conversations []*Conversation) error GetConversations(uid string) ([]*Conversation, error) GetConversation(uid string, channelID string, channelType uint8) (*Conversation, error) DeleteConversation(uid string, channelID string, channelType uint8) error // 删除最近会话 // #################### system uids #################### AddSystemUIDs(uids []string) error // 添加系统uid RemoveSystemUIDs(uids []string) error // 移除系统uid GetSystemUIDs() ([]string, error) // #################### message stream #################### // SaveStreamMeta 保存消息流元数据 SaveStreamMeta(meta *StreamMeta) error // StreamEnd 结束流 StreamEnd(channelID string, channelType uint8, streamNo string) error // GetStreamMeta 获取消息流元数据 GetStreamMeta(channelID string, channelType uint8, streamNo string) (*StreamMeta, error) // AppendStreamItem 追加消息流 AppendStreamItem(channelID string, channelType uint8, streamNo string, item *StreamItem) (uint32, error) // GetStreamItems 获取消息流 GetStreamItems(channelID string, channelType uint8, streamNo string) ([]*StreamItem, error) // AddIPBlacklist 添加ip黑名单 AddIPBlacklist(ips []string) error // RemoveIPBlacklist 移除ip黑名单 RemoveIPBlacklist(ips []string) error // GetIPBlacklist 获取ip黑名单 GetIPBlacklist() ([]string, error) }
type StoreConfig ¶
type StoreConfig struct { SlotNum int // DataDir string MaxSegmentCacheNum int EachMessagegMaxSizeOfBytes int SegmentMaxBytes int64 // each segment max size of bytes default 2G DecodeMessageFnc func(msg []byte) (Message, error) StreamCacheSize int // stream cache size }
func NewStoreConfig ¶
func NewStoreConfig() *StoreConfig
type StreamItem ¶
func DecodeStreamItem ¶
func DecodeStreamItem(data []byte) (*StreamItem, error)
type StreamItemSlice ¶
type StreamItemSlice []*StreamItem
func (StreamItemSlice) Len ¶
func (s StreamItemSlice) Len() int
func (StreamItemSlice) Less ¶
func (l StreamItemSlice) Less(i, j int) bool
func (StreamItemSlice) Swap ¶
func (l StreamItemSlice) Swap(i, j int)
type StreamMeta ¶
type StreamMeta struct { StreamNo string `json:"stream_no"` MessageID int64 `json:"message_id"` ChannelID string `json:"channel_id"` ChannelType uint8 `json:"channel_type"` MessageSeq uint32 `json:"message_seq"` StreamFlag okproto.StreamFlag `json:"stream_flag"` }
func (*StreamMeta) Decode ¶
func (s *StreamMeta) Decode(data []byte) error
func (*StreamMeta) Encode ¶
func (s *StreamMeta) Encode() []byte
Click to show internal directories.
Click to hide internal directories.