okstore

package
v0.0.0-...-cdc279e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 9, 2024 License: Apache-2.0 Imports: 24 Imported by: 0

Documentation

Index

Constants

View Source
const FileDefaultMode os.FileMode = 0755

FileDefaultMode FileDefaultMode

View Source
const UserQueuePrefix = "userqueue_"

Variables

View Source
var (
	// Encoding Encoding
	Encoding = binary.BigEndian
	// ErrorNotData ErrorNotData
	ErrorNotData = errors.New("no data")

	// MagicNumber MagicNumber
	MagicNumber = [2]byte{0x15, 0x16} // lm
	// EndMagicNumber EndMagicNumber
	EndMagicNumber = [1]byte{0x3}
	// MessageVersion log version
	MessageVersion = [1]byte{0x01}
	// SnapshotMagicNumber SnapshotMagicNumber
	SnapshotMagicNumber = [2]byte{0xb, 0xa} // ba
	// EndSnapshotMagicNumber EndSnapshotMagicNumber
	EndSnapshotMagicNumber = [1]byte{0xf}
	// BackupSlotMagicNumber BackupSlotMagicNumber
	BackupSlotMagicNumber = [2]byte{0xc, 0xd}
	// BackupMagicNumber BackupMagicNumber
	BackupMagicNumber = []byte("---backup start ---")

	// MessageSeqSize MessageSeqSize
	MessageSeqSize = 8
	// LogDataLenSize LogDataLenSize
	MessageDataLenSize = 4
	// AppliIndexSize AppliIndexSize
	AppliIndexSize           = 8
	IndexMaxSizeOfByte int64 = 2 * 1024 * 1024 // 索引文件的最大大小 2M

)
View Source
var (
	StreamVersion = [1]byte{0x01}

	// StreamMagicNumber StreamMagicNumber
	StreamMagicNumber = [2]byte{0x15, 0x16}

	// StreamEndMagicNumber StreamEndMagicNumber
	StreamEndMagicNumber = [1]byte{0x3}
)
View Source
var (
	// ErrIndexCorrupt ErrIndexCorrupt
	ErrIndexCorrupt = errors.New("corrupt index file")
)

Functions

func DecodeMessage

func DecodeMessage(msg []byte) (uint32, []byte, error)

func EncodeMessage

func EncodeMessage(messageSeq uint32, data []byte) []byte

func EncodeStreamItem

func EncodeStreamItem(s *StreamItem) []byte

Types

type ChannelInfo

type ChannelInfo struct {
	ChannelID   string `json:"-"`
	ChannelType uint8  `json:"-"`
	Ban         bool   `json:"ban"`   // 是否被封
	Large       bool   `json:"large"` // 是否是超大群
}

func NewChannelInfo

func NewChannelInfo(channelID string, channelType uint8) *ChannelInfo

NewChannelInfo NewChannelInfo

func (*ChannelInfo) ToMap

func (c *ChannelInfo) ToMap() map[string]interface{}

ToMap ToMap

type Conversation

type Conversation struct {
	UID             string // User UID (user who belongs to the most recent session)
	ChannelID       string // Conversation channel
	ChannelType     uint8
	UnreadCount     int    // Number of unread messages
	Timestamp       int64  // Last session timestamp (10 digits)
	LastMsgSeq      uint32 // Sequence number of the last message
	LastClientMsgNo string // Last message client number
	LastMsgID       int64  // Last message ID
	Version         int64  // Data version
}

Conversation Conversation

func (*Conversation) String

func (c *Conversation) String() string

type ConversationSet

type ConversationSet []*Conversation

func NewConversationSet

func NewConversationSet(data []byte) ConversationSet

func (ConversationSet) Encode

func (c ConversationSet) Encode() []byte

type Entry

type Entry struct {
	RelativeOffset uint32
	Position       uint32
}

Entry Entry

type FileStore

type FileStore struct {
	*FileStoreForMsg
	// contains filtered or unexported fields
}

func NewFileStore

func NewFileStore(cfg *StoreConfig) *FileStore

func (*FileStore) AddAllowlist

func (f *FileStore) AddAllowlist(channelID string, channelType uint8, uids []string) error

func (*FileStore) AddDenylist

func (f *FileStore) AddDenylist(channelID string, channelType uint8, uids []string) error

func (*FileStore) AddIPBlacklist

func (f *FileStore) AddIPBlacklist(ips []string) error

func (*FileStore) AddOrUpdateChannel

func (f *FileStore) AddOrUpdateChannel(channelInfo *ChannelInfo) error

func (*FileStore) AddOrUpdateConversations

func (f *FileStore) AddOrUpdateConversations(uid string, conversations []*Conversation) error

func (*FileStore) AddSubscribers

func (f *FileStore) AddSubscribers(channelID string, channelType uint8, uids []string) error

func (*FileStore) AddSystemUIDs

func (f *FileStore) AddSystemUIDs(uids []string) error

func (*FileStore) AppendMessageOfNotifyQueue

func (f *FileStore) AppendMessageOfNotifyQueue(messages []Message) error

func (*FileStore) Close

func (f *FileStore) Close() error

func (*FileStore) DeleteChannel

func (f *FileStore) DeleteChannel(channelID string, channelType uint8) error

func (*FileStore) DeleteConversation

func (f *FileStore) DeleteConversation(uid string, channelID string, channelType uint8) error

func (*FileStore) ExistChannel

func (f *FileStore) ExistChannel(channelID string, channelType uint8) (bool, error)

func (*FileStore) GetAllowlist

func (f *FileStore) GetAllowlist(channelID string, channelType uint8) ([]string, error)

func (*FileStore) GetChannel

func (f *FileStore) GetChannel(channelID string, channelType uint8) (*ChannelInfo, error)

func (*FileStore) GetConversation

func (f *FileStore) GetConversation(uid string, channelID string, channelType uint8) (*Conversation, error)

func (*FileStore) GetConversations

func (f *FileStore) GetConversations(uid string) ([]*Conversation, error)

func (*FileStore) GetDenylist

func (f *FileStore) GetDenylist(channelID string, channelType uint8) ([]string, error)

func (*FileStore) GetIPBlacklist

func (f *FileStore) GetIPBlacklist() ([]string, error)

func (*FileStore) GetMessageOfUserCursor

func (f *FileStore) GetMessageOfUserCursor(uid string) (uint32, error)

GetMessageOfUserCursor GetMessageOfUserCursor

func (*FileStore) GetMessagesOfNotifyQueue

func (f *FileStore) GetMessagesOfNotifyQueue(count int) ([]Message, error)

func (*FileStore) GetSubscribers

func (f *FileStore) GetSubscribers(channelID string, channelType uint8) ([]string, error)

func (*FileStore) GetSystemUIDs

func (f *FileStore) GetSystemUIDs() ([]string, error)

func (*FileStore) GetUserToken

func (f *FileStore) GetUserToken(uid string, deviceFlag uint8) (string, uint8, error)

func (*FileStore) Open

func (f *FileStore) Open() error

func (*FileStore) RemoveAllAllowlist

func (f *FileStore) RemoveAllAllowlist(channelID string, channelType uint8) error

func (*FileStore) RemoveAllDenylist

func (f *FileStore) RemoveAllDenylist(channelID string, channelType uint8) error

func (*FileStore) RemoveAllSubscriber

func (f *FileStore) RemoveAllSubscriber(channelID string, channelType uint8) error

func (*FileStore) RemoveAllowlist

func (f *FileStore) RemoveAllowlist(channelID string, channelType uint8, uids []string) error

func (*FileStore) RemoveDenylist

func (f *FileStore) RemoveDenylist(channelID string, channelType uint8, uids []string) error

func (*FileStore) RemoveIPBlacklist

func (f *FileStore) RemoveIPBlacklist(ips []string) error

func (*FileStore) RemoveMessagesOfNotifyQueue

func (f *FileStore) RemoveMessagesOfNotifyQueue(messageIDs []int64) error

func (*FileStore) RemoveSubscribers

func (f *FileStore) RemoveSubscribers(channelID string, channelType uint8, uids []string) error

func (*FileStore) RemoveSystemUIDs

func (f *FileStore) RemoveSystemUIDs(uids []string) error

func (*FileStore) SaveStreamMeta

func (f *FileStore) SaveStreamMeta(meta *StreamMeta) error

func (*FileStore) SyncMessageOfUser

func (f *FileStore) SyncMessageOfUser(uid string, startMessageSeq uint32, limit int) ([]Message, error)

func (*FileStore) UpdateMessageOfUserCursorIfNeed

func (f *FileStore) UpdateMessageOfUserCursorIfNeed(uid string, messageSeq uint32) error

func (*FileStore) UpdateUserToken

func (f *FileStore) UpdateUserToken(uid string, deviceFlag uint8, deviceLevel uint8, token string) error

UpdateUserToken UpdateUserToken

type FileStoreForMsg

type FileStoreForMsg struct {
	oklog.Log
	// contains filtered or unexported fields
}

func NewFileStoreForMsg

func NewFileStoreForMsg(cfg *StoreConfig) *FileStoreForMsg

func (*FileStoreForMsg) AppendMessages

func (f *FileStoreForMsg) AppendMessages(channelID string, channelType uint8, msgs []Message) (seqs []uint32, err error)

func (*FileStoreForMsg) AppendMessagesOfUser

func (f *FileStoreForMsg) AppendMessagesOfUser(uid string, msgs []Message) (seqs []uint32, err error)

func (*FileStoreForMsg) AppendStreamItem

func (f *FileStoreForMsg) AppendStreamItem(channelID string, channelType uint8, streamNo string, item *StreamItem) (uint32, error)

func (*FileStoreForMsg) Close

func (f *FileStoreForMsg) Close() error

func (*FileStoreForMsg) DeleteChannelAndClearMessages

func (f *FileStoreForMsg) DeleteChannelAndClearMessages(channelID string, channelType uint8) error

func (*FileStoreForMsg) GetLastMsgSeq

func (f *FileStoreForMsg) GetLastMsgSeq(channelID string, channelType uint8) (uint32, error)

func (*FileStoreForMsg) GetStreamItems

func (f *FileStoreForMsg) GetStreamItems(channelID string, channelType uint8, streamNo string) ([]*StreamItem, error)

func (*FileStoreForMsg) GetStreamMeta

func (f *FileStoreForMsg) GetStreamMeta(channelID string, channelType uint8, streamNo string) (*StreamMeta, error)

func (*FileStoreForMsg) LoadLastMsgs

func (f *FileStoreForMsg) LoadLastMsgs(channelID string, channelType uint8, limit int) ([]Message, error)

func (*FileStoreForMsg) LoadLastMsgsWithEnd

func (f *FileStoreForMsg) LoadLastMsgsWithEnd(channelID string, channelType uint8, endMessageSeq uint32, limit int) ([]Message, error)

func (*FileStoreForMsg) LoadMsg

func (f *FileStoreForMsg) LoadMsg(channelID string, channelType uint8, messageSeq uint32) (Message, error)

func (*FileStoreForMsg) LoadNextRangeMsgs

func (f *FileStoreForMsg) LoadNextRangeMsgs(channelID string, channelType uint8, startMessageSeq, endMessageSeq uint32, limit int) ([]Message, error)

func (*FileStoreForMsg) LoadPrevRangeMsgs

func (f *FileStoreForMsg) LoadPrevRangeMsgs(channelID string, channelType uint8, startMessageSeq, endMessageSeq uint32, limit int) ([]Message, error)

func (*FileStoreForMsg) SaveStreamMeta

func (f *FileStoreForMsg) SaveStreamMeta(meta *StreamMeta) error

func (*FileStoreForMsg) StreamEnd

func (f *FileStoreForMsg) StreamEnd(channelID string, channelType uint8, streamNo string) error

type Index

type Index struct {
	oklog.Log
	// contains filtered or unexported fields
}

Index Index

func NewIndex

func NewIndex(path string, baseMessageSeq uint32) *Index

NewIndex NewIndex

func (*Index) Append

func (idx *Index) Append(offset uint32, position uint32) error

Append Append

func (*Index) Close

func (idx *Index) Close() error

Close Close

func (*Index) IsFull

func (idx *Index) IsFull() bool

IsFull Is full

func (*Index) LastPosition

func (idx *Index) LastPosition() MessageSeqPosition

func (*Index) Lookup

func (idx *Index) Lookup(targetOffset uint32) (MessageSeqPosition, error)

Lookup Find the largest offset less than or equal to the given targetOffset and return a pair holding this offset and its corresponding physical file position

func (*Index) Sync

func (idx *Index) Sync() error

Sync Sync

func (*Index) TruncateEntries

func (idx *Index) TruncateEntries(number int) error

TruncateEntries TruncateEntries

type Message

type Message interface {
	GetMessageID() int64
	SetSeq(seq uint32)
	GetSeq() uint32
	Encode() []byte
	Decode(msg []byte) error
}

type MessageSeqPosition

type MessageSeqPosition struct {
	MessageSeq uint32
	Position   int64
}

MessageSeqPosition MessageSeqPosition

type SegmentMode

type SegmentMode int
const (
	SegmentModeAll SegmentMode = iota
	SegmentModeRead
)

type Store

type Store interface {
	Open() error
	Close() error
	// #################### user ####################
	// GetUserToken return token,device level and error
	GetUserToken(uid string, deviceFlag uint8) (string, uint8, error)
	UpdateUserToken(uid string, deviceFlag uint8, deviceLevel uint8, token string) error
	// UpdateMessageOfUserCursorIfNeed 更新用户消息队列的游标,用户读到的位置
	UpdateMessageOfUserCursorIfNeed(uid string, messageSeq uint32) error

	// #################### channel ####################
	GetChannel(channelID string, channelType uint8) (*ChannelInfo, error)
	// AddOrUpdateChannel add or update channel
	AddOrUpdateChannel(channelInfo *ChannelInfo) error
	// ExistChannel return true if channel exist
	ExistChannel(channelID string, channelType uint8) (bool, error)
	// AddSubscribers 添加订阅者
	AddSubscribers(channelID string, channelType uint8, uids []string) error
	// RemoveSubscribers 移除指定频道内指定uid的订阅者
	RemoveSubscribers(channelID string, channelType uint8, uids []string) error
	// GetSubscribers 获取订阅者列表
	GetSubscribers(channelID string, channelType uint8) ([]string, error)
	RemoveAllSubscriber(channelID string, channelType uint8) error
	GetAllowlist(channelID string, channelType uint8) ([]string, error)
	GetDenylist(channelID string, channelType uint8) ([]string, error)
	// DeleteChannel 删除频道
	DeleteChannel(channelID string, channelType uint8) error
	// AddDenylist 添加频道黑名单
	AddDenylist(channelID string, channelType uint8, uids []string) error
	// RemoveAllDenylist 移除指定频道的所有黑名单
	RemoveAllDenylist(channelID string, channelType uint8) error
	// RemoveDenylist 移除频道内指定用户的黑名单
	RemoveDenylist(channelID string, channelType uint8, uids []string) error
	// AddAllowlist 添加白名单
	AddAllowlist(channelID string, channelType uint8, uids []string) error
	// RemoveAllAllowlist 移除指定频道的所有白名单
	RemoveAllAllowlist(channelID string, channelType uint8) error
	// RemoveAllowlist 移除白名单
	RemoveAllowlist(channelID string, channelType uint8, uids []string) error

	// #################### messages ####################
	// StoreMsg return seqs and error, seqs len is msgs len
	AppendMessages(channelID string, channelType uint8, msgs []Message) (seqs []uint32, err error)
	// 追加消息到用户的消息队列
	AppendMessagesOfUser(uid string, msgs []Message) (seqs []uint32, err error)
	LoadMsg(channelID string, channelType uint8, seq uint32) (Message, error)
	LoadLastMsgs(channelID string, channelType uint8, limit int) ([]Message, error)
	// LoadLastMsgsWithEnd 加载最新的消息 end表示加载到end的位置结束加载 end=0表示不做限制 结果不包含end
	LoadLastMsgsWithEnd(channelID string, channelType uint8, end uint32, limit int) ([]Message, error)
	// LoadPrevRangeMsgs 向上加载指定范围的消息 end=0表示不做限制 比如 start=100 end=0 limit=10 则返回的消息seq为99-90的消息
	// 结果包含start,不包含end
	LoadPrevRangeMsgs(channelID string, channelType uint8, start, end uint32, limit int) ([]Message, error)
	// LoadNextRangeMsgs 向下加载指定范围的消息 end=0表示不做限制 比如 start=100 end=200 limit=10 则返回的消息seq为101-111的消息,
	// 比如start=100 end=105 limit=10 则返回的消息seq为101-104的消息
	// 结果包含start,不包含end
	LoadNextRangeMsgs(channelID string, channelType uint8, start, end uint32, limit int) ([]Message, error)
	// GetLastMsgSeq 获取最新的消息seq
	GetLastMsgSeq(channelID string, channelType uint8) (uint32, error)

	// GetMessageOfUserCursor 获取用户消息队列的游标,用户读到的位置
	GetMessageOfUserCursor(uid string) (uint32, error)
	// SyncMessageOfUser 同步用户队列里的消息(写扩散)
	SyncMessageOfUser(uid string, startMessageSeq uint32, limit int) ([]Message, error)

	AppendMessageOfNotifyQueue(m []Message) error
	GetMessagesOfNotifyQueue(count int) ([]Message, error)
	// RemoveMessagesOfNotifyQueue 从通知队列里移除消息
	RemoveMessagesOfNotifyQueue(messageIDs []int64) error

	DeleteChannelAndClearMessages(channelID string, channelType uint8) error

	// #################### conversations ####################
	AddOrUpdateConversations(uid string, conversations []*Conversation) error
	GetConversations(uid string) ([]*Conversation, error)
	GetConversation(uid string, channelID string, channelType uint8) (*Conversation, error)
	DeleteConversation(uid string, channelID string, channelType uint8) error // 删除最近会话

	// #################### system uids ####################
	AddSystemUIDs(uids []string) error    // 添加系统uid
	RemoveSystemUIDs(uids []string) error // 移除系统uid
	GetSystemUIDs() ([]string, error)

	// #################### message stream ####################
	// SaveStreamMeta 保存消息流元数据
	SaveStreamMeta(meta *StreamMeta) error
	// StreamEnd 结束流
	StreamEnd(channelID string, channelType uint8, streamNo string) error
	// GetStreamMeta 获取消息流元数据
	GetStreamMeta(channelID string, channelType uint8, streamNo string) (*StreamMeta, error)
	// AppendStreamItem 追加消息流
	AppendStreamItem(channelID string, channelType uint8, streamNo string, item *StreamItem) (uint32, error)
	// GetStreamItems 获取消息流
	GetStreamItems(channelID string, channelType uint8, streamNo string) ([]*StreamItem, error)

	// AddIPBlacklist 添加ip黑名单
	AddIPBlacklist(ips []string) error
	// RemoveIPBlacklist 移除ip黑名单
	RemoveIPBlacklist(ips []string) error
	// GetIPBlacklist 获取ip黑名单
	GetIPBlacklist() ([]string, error)
}

type StoreConfig

type StoreConfig struct {
	SlotNum                    int //
	DataDir                    string
	MaxSegmentCacheNum         int
	EachMessagegMaxSizeOfBytes int
	SegmentMaxBytes            int64 // each segment max size of bytes default 2G
	DecodeMessageFnc           func(msg []byte) (Message, error)
	StreamCacheSize            int // stream cache size
}

func NewStoreConfig

func NewStoreConfig() *StoreConfig

type Stream

type Stream struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewStream

func NewStream(streamNo string, topicDir string, cfg *StoreConfig) *Stream

type StreamItem

type StreamItem struct {
	ClientMsgNo string
	StreamSeq   uint32
	Blob        []byte
}

func DecodeStreamItem

func DecodeStreamItem(data []byte) (*StreamItem, error)

type StreamItemSlice

type StreamItemSlice []*StreamItem

func (StreamItemSlice) Len

func (s StreamItemSlice) Len() int

func (StreamItemSlice) Less

func (l StreamItemSlice) Less(i, j int) bool

func (StreamItemSlice) Swap

func (l StreamItemSlice) Swap(i, j int)

type StreamMeta

type StreamMeta struct {
	StreamNo    string             `json:"stream_no"`
	MessageID   int64              `json:"message_id"`
	ChannelID   string             `json:"channel_id"`
	ChannelType uint8              `json:"channel_type"`
	MessageSeq  uint32             `json:"message_seq"`
	StreamFlag  okproto.StreamFlag `json:"stream_flag"`
}

func (*StreamMeta) Decode

func (s *StreamMeta) Decode(data []byte) error

func (*StreamMeta) Encode

func (s *StreamMeta) Encode() []byte

type SubscribeInfo

type SubscribeInfo struct {
	UID   string
	Param map[string]interface{}
}

订阅信息

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL