server

package
v0.0.2-0...-b7c631f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 20, 2025 License: Apache-2.0 Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultMessageID UniqueID = -1

	//  topic_begin_id/topicName
	// topic begin id record a topic is valid, create when topic is created, cleaned up on destroy topic
	TopicIDTitle = "topic_id/"

	// message_size/topicName record the current page message size, once current message size > RocksMq size, reset this value and open a new page
	// TODO should be cached
	MessageSizeTitle = "message_size/"

	// page_message_size/topicName/pageId record the endId of each page, it will be purged either in retention or the destroy of topic
	PageMsgSizeTitle = "page_message_size/"

	// page_ts/topicName/pageId, record the page last ts, used for TTL functionality
	PageTsTitle = "page_ts/"

	// acked_ts/topicName/pageId, record the latest ack ts of each page, will be purged on retention or destroy of the topic
	AckedTsTitle = "acked_ts/"

	RmqNotServingErrMsg = "Rocksmq is not serving"
)

Const variable that will be used in rocksmqs

View Source
const (
	MB = 1024 * 1024
)

Const value that used to convert unit

Variables

View Source
var Rmq *rocksmq

Rmq is global rocksmq instance that will be initialized only once

View Source
var RocksDBLRUCacheMaxCapacity = uint64(4 << 30)
View Source
var RocksDBLRUCacheMinCapacity = uint64(1 << 29)

RocksDB cache size limitation(TODO config it)

Functions

func CloseRocksMQ

func CloseRocksMQ()

CloseRocksMQ is used to close global rocksmq

func DeleteMessages

func DeleteMessages(db *gorocksdb.DB, topic string, startID, endID UniqueID) error

DeleteMessages in rocksdb by range of [startID, endID)

func DeserializeRmqID

func DeserializeRmqID(messageID []byte) int64

DeserializeRmqID is used to deserialize a message ID from byte array

func InitRocksMQ

func InitRocksMQ(path string) error

InitRocksMQ init global rocksmq single instance

func NewRocksMQ

func NewRocksMQ(name string) (*rocksmq, error)

NewRocksMQ step: 1. New rocksmq instance based on rocksdb with name and rocksdbkv with kvname 2. Init retention info, load retention info to memory 3. Start retention goroutine

func SerializeRmqID

func SerializeRmqID(messageID int64) []byte

SerializeRmqID is used to serialize a message ID to byte array

Types

type Consumer

type Consumer struct {
	Topic     string
	GroupName string
	MsgMutex  chan struct{}
}

Consumer is rocksmq consumer

type ConsumerMessage

type ConsumerMessage struct {
	MsgID   UniqueID
	Payload []byte
}

ConsumerMessage that consumed from rocksdb

type MockRocksMQ

type MockRocksMQ struct {
	mock.Mock
}

MockRocksMQ is an autogenerated mock type for the RocksMQ type

func NewMockRocksMQ

func NewMockRocksMQ(t mockConstructorTestingTNewMockRocksMQ) *MockRocksMQ

NewMockRocksMQ creates a new instance of MockRocksMQ. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.

func (*MockRocksMQ) CheckTopicValid

func (_m *MockRocksMQ) CheckTopicValid(topicName string) error

CheckTopicValid provides a mock function with given fields: topicName

func (*MockRocksMQ) Close

func (_m *MockRocksMQ) Close()

Close provides a mock function with given fields:

func (*MockRocksMQ) Consume

func (_m *MockRocksMQ) Consume(topicName string, groupName string, n int) ([]ConsumerMessage, error)

Consume provides a mock function with given fields: topicName, groupName, n

func (*MockRocksMQ) CreateConsumerGroup

func (_m *MockRocksMQ) CreateConsumerGroup(topicName string, groupName string) error

CreateConsumerGroup provides a mock function with given fields: topicName, groupName

func (*MockRocksMQ) CreateTopic

func (_m *MockRocksMQ) CreateTopic(topicName string) error

CreateTopic provides a mock function with given fields: topicName

func (*MockRocksMQ) DestroyConsumerGroup

func (_m *MockRocksMQ) DestroyConsumerGroup(topicName string, groupName string) error

DestroyConsumerGroup provides a mock function with given fields: topicName, groupName

func (*MockRocksMQ) DestroyTopic

func (_m *MockRocksMQ) DestroyTopic(topicName string) error

DestroyTopic provides a mock function with given fields: topicName

func (*MockRocksMQ) EXPECT

func (_m *MockRocksMQ) EXPECT() *MockRocksMQ_Expecter

func (*MockRocksMQ) ExistConsumerGroup

func (_m *MockRocksMQ) ExistConsumerGroup(topicName string, groupName string) (bool, *Consumer, error)

ExistConsumerGroup provides a mock function with given fields: topicName, groupName

func (*MockRocksMQ) GetLatestMsg

func (_m *MockRocksMQ) GetLatestMsg(topicName string) (int64, error)

GetLatestMsg provides a mock function with given fields: topicName

func (*MockRocksMQ) Notify

func (_m *MockRocksMQ) Notify(topicName string, groupName string)

Notify provides a mock function with given fields: topicName, groupName

func (*MockRocksMQ) Produce

func (_m *MockRocksMQ) Produce(topicName string, messages []ProducerMessage) ([]int64, error)

Produce provides a mock function with given fields: topicName, messages

func (*MockRocksMQ) RegisterConsumer

func (_m *MockRocksMQ) RegisterConsumer(consumer *Consumer) error

RegisterConsumer provides a mock function with given fields: consumer

func (*MockRocksMQ) Seek

func (_m *MockRocksMQ) Seek(topicName string, groupName string, msgID int64) error

Seek provides a mock function with given fields: topicName, groupName, msgID

func (*MockRocksMQ) SeekToLatest

func (_m *MockRocksMQ) SeekToLatest(topicName string, groupName string) error

SeekToLatest provides a mock function with given fields: topicName, groupName

type MockRocksMQ_CheckTopicValid_Call

type MockRocksMQ_CheckTopicValid_Call struct {
	*mock.Call
}

MockRocksMQ_CheckTopicValid_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckTopicValid'

func (*MockRocksMQ_CheckTopicValid_Call) Return

func (*MockRocksMQ_CheckTopicValid_Call) Run

type MockRocksMQ_Close_Call

type MockRocksMQ_Close_Call struct {
	*mock.Call
}

MockRocksMQ_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close'

func (*MockRocksMQ_Close_Call) Return

func (*MockRocksMQ_Close_Call) Run

func (_c *MockRocksMQ_Close_Call) Run(run func()) *MockRocksMQ_Close_Call

type MockRocksMQ_Consume_Call

type MockRocksMQ_Consume_Call struct {
	*mock.Call
}

MockRocksMQ_Consume_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Consume'

func (*MockRocksMQ_Consume_Call) Return

func (*MockRocksMQ_Consume_Call) Run

func (_c *MockRocksMQ_Consume_Call) Run(run func(topicName string, groupName string, n int)) *MockRocksMQ_Consume_Call

type MockRocksMQ_CreateConsumerGroup_Call

type MockRocksMQ_CreateConsumerGroup_Call struct {
	*mock.Call
}

MockRocksMQ_CreateConsumerGroup_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CreateConsumerGroup'

func (*MockRocksMQ_CreateConsumerGroup_Call) Return

func (*MockRocksMQ_CreateConsumerGroup_Call) Run

type MockRocksMQ_CreateTopic_Call

type MockRocksMQ_CreateTopic_Call struct {
	*mock.Call
}

MockRocksMQ_CreateTopic_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CreateTopic'

func (*MockRocksMQ_CreateTopic_Call) Return

func (*MockRocksMQ_CreateTopic_Call) Run

type MockRocksMQ_DestroyConsumerGroup_Call

type MockRocksMQ_DestroyConsumerGroup_Call struct {
	*mock.Call
}

MockRocksMQ_DestroyConsumerGroup_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DestroyConsumerGroup'

func (*MockRocksMQ_DestroyConsumerGroup_Call) Return

func (*MockRocksMQ_DestroyConsumerGroup_Call) Run

type MockRocksMQ_DestroyTopic_Call

type MockRocksMQ_DestroyTopic_Call struct {
	*mock.Call
}

MockRocksMQ_DestroyTopic_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DestroyTopic'

func (*MockRocksMQ_DestroyTopic_Call) Return

func (*MockRocksMQ_DestroyTopic_Call) Run

type MockRocksMQ_ExistConsumerGroup_Call

type MockRocksMQ_ExistConsumerGroup_Call struct {
	*mock.Call
}

MockRocksMQ_ExistConsumerGroup_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ExistConsumerGroup'

func (*MockRocksMQ_ExistConsumerGroup_Call) Return

func (*MockRocksMQ_ExistConsumerGroup_Call) Run

type MockRocksMQ_Expecter

type MockRocksMQ_Expecter struct {
	// contains filtered or unexported fields
}

func (*MockRocksMQ_Expecter) CheckTopicValid

func (_e *MockRocksMQ_Expecter) CheckTopicValid(topicName interface{}) *MockRocksMQ_CheckTopicValid_Call

CheckTopicValid is a helper method to define mock.On call

  • topicName string

func (*MockRocksMQ_Expecter) Close

Close is a helper method to define mock.On call

func (*MockRocksMQ_Expecter) Consume

func (_e *MockRocksMQ_Expecter) Consume(topicName interface{}, groupName interface{}, n interface{}) *MockRocksMQ_Consume_Call

Consume is a helper method to define mock.On call

  • topicName string
  • groupName string
  • n int

func (*MockRocksMQ_Expecter) CreateConsumerGroup

func (_e *MockRocksMQ_Expecter) CreateConsumerGroup(topicName interface{}, groupName interface{}) *MockRocksMQ_CreateConsumerGroup_Call

CreateConsumerGroup is a helper method to define mock.On call

  • topicName string
  • groupName string

func (*MockRocksMQ_Expecter) CreateTopic

func (_e *MockRocksMQ_Expecter) CreateTopic(topicName interface{}) *MockRocksMQ_CreateTopic_Call

CreateTopic is a helper method to define mock.On call

  • topicName string

func (*MockRocksMQ_Expecter) DestroyConsumerGroup

func (_e *MockRocksMQ_Expecter) DestroyConsumerGroup(topicName interface{}, groupName interface{}) *MockRocksMQ_DestroyConsumerGroup_Call

DestroyConsumerGroup is a helper method to define mock.On call

  • topicName string
  • groupName string

func (*MockRocksMQ_Expecter) DestroyTopic

func (_e *MockRocksMQ_Expecter) DestroyTopic(topicName interface{}) *MockRocksMQ_DestroyTopic_Call

DestroyTopic is a helper method to define mock.On call

  • topicName string

func (*MockRocksMQ_Expecter) ExistConsumerGroup

func (_e *MockRocksMQ_Expecter) ExistConsumerGroup(topicName interface{}, groupName interface{}) *MockRocksMQ_ExistConsumerGroup_Call

ExistConsumerGroup is a helper method to define mock.On call

  • topicName string
  • groupName string

func (*MockRocksMQ_Expecter) GetLatestMsg

func (_e *MockRocksMQ_Expecter) GetLatestMsg(topicName interface{}) *MockRocksMQ_GetLatestMsg_Call

GetLatestMsg is a helper method to define mock.On call

  • topicName string

func (*MockRocksMQ_Expecter) Notify

func (_e *MockRocksMQ_Expecter) Notify(topicName interface{}, groupName interface{}) *MockRocksMQ_Notify_Call

Notify is a helper method to define mock.On call

  • topicName string
  • groupName string

func (*MockRocksMQ_Expecter) Produce

func (_e *MockRocksMQ_Expecter) Produce(topicName interface{}, messages interface{}) *MockRocksMQ_Produce_Call

Produce is a helper method to define mock.On call

  • topicName string
  • messages []ProducerMessage

func (*MockRocksMQ_Expecter) RegisterConsumer

func (_e *MockRocksMQ_Expecter) RegisterConsumer(consumer interface{}) *MockRocksMQ_RegisterConsumer_Call

RegisterConsumer is a helper method to define mock.On call

  • consumer *Consumer

func (*MockRocksMQ_Expecter) Seek

func (_e *MockRocksMQ_Expecter) Seek(topicName interface{}, groupName interface{}, msgID interface{}) *MockRocksMQ_Seek_Call

Seek is a helper method to define mock.On call

  • topicName string
  • groupName string
  • msgID int64

func (*MockRocksMQ_Expecter) SeekToLatest

func (_e *MockRocksMQ_Expecter) SeekToLatest(topicName interface{}, groupName interface{}) *MockRocksMQ_SeekToLatest_Call

SeekToLatest is a helper method to define mock.On call

  • topicName string
  • groupName string

type MockRocksMQ_GetLatestMsg_Call

type MockRocksMQ_GetLatestMsg_Call struct {
	*mock.Call
}

MockRocksMQ_GetLatestMsg_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetLatestMsg'

func (*MockRocksMQ_GetLatestMsg_Call) Return

func (*MockRocksMQ_GetLatestMsg_Call) Run

type MockRocksMQ_Notify_Call

type MockRocksMQ_Notify_Call struct {
	*mock.Call
}

MockRocksMQ_Notify_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Notify'

func (*MockRocksMQ_Notify_Call) Return

func (*MockRocksMQ_Notify_Call) Run

func (_c *MockRocksMQ_Notify_Call) Run(run func(topicName string, groupName string)) *MockRocksMQ_Notify_Call

type MockRocksMQ_Produce_Call

type MockRocksMQ_Produce_Call struct {
	*mock.Call
}

MockRocksMQ_Produce_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Produce'

func (*MockRocksMQ_Produce_Call) Return

func (*MockRocksMQ_Produce_Call) Run

func (_c *MockRocksMQ_Produce_Call) Run(run func(topicName string, messages []ProducerMessage)) *MockRocksMQ_Produce_Call

type MockRocksMQ_RegisterConsumer_Call

type MockRocksMQ_RegisterConsumer_Call struct {
	*mock.Call
}

MockRocksMQ_RegisterConsumer_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RegisterConsumer'

func (*MockRocksMQ_RegisterConsumer_Call) Return

func (*MockRocksMQ_RegisterConsumer_Call) Run

type MockRocksMQ_SeekToLatest_Call

type MockRocksMQ_SeekToLatest_Call struct {
	*mock.Call
}

MockRocksMQ_SeekToLatest_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SeekToLatest'

func (*MockRocksMQ_SeekToLatest_Call) Return

func (*MockRocksMQ_SeekToLatest_Call) Run

func (_c *MockRocksMQ_SeekToLatest_Call) Run(run func(topicName string, groupName string)) *MockRocksMQ_SeekToLatest_Call

type MockRocksMQ_Seek_Call

type MockRocksMQ_Seek_Call struct {
	*mock.Call
}

MockRocksMQ_Seek_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Seek'

func (*MockRocksMQ_Seek_Call) Return

func (*MockRocksMQ_Seek_Call) Run

func (_c *MockRocksMQ_Seek_Call) Run(run func(topicName string, groupName string, msgID int64)) *MockRocksMQ_Seek_Call

type ProducerMessage

type ProducerMessage struct {
	Payload []byte
}

ProducerMessage that will be written to rocksdb

type RmqID

type RmqID struct {
	MessageID UniqueID
}

rmqID wraps message ID for rocksmq

func (*RmqID) AtEarliestPosition

func (rid *RmqID) AtEarliestPosition() bool

func (*RmqID) Equal

func (rid *RmqID) Equal(msgID []byte) (bool, error)

func (*RmqID) LessOrEqualThan

func (rid *RmqID) LessOrEqualThan(msgID []byte) (bool, error)

func (*RmqID) Serialize

func (rid *RmqID) Serialize() []byte

Serialize convert rmq message id to []byte

type RmqState

type RmqState = int64

RmqState Rocksmq state

const (
	// RmqStateStopped state stands for just created or stopped `Rocksmq` instance
	RmqStateStopped RmqState = 0
	// RmqStateHealthy state stands for healthy `Rocksmq` instance
	RmqStateHealthy RmqState = 1
)

type RocksMQ

type RocksMQ interface {
	CreateTopic(topicName string) error
	DestroyTopic(topicName string) error
	CreateConsumerGroup(topicName string, groupName string) error
	DestroyConsumerGroup(topicName string, groupName string) error
	Close()

	RegisterConsumer(consumer *Consumer) error
	GetLatestMsg(topicName string) (int64, error)
	CheckTopicValid(topicName string) error

	Produce(topicName string, messages []ProducerMessage) ([]UniqueID, error)
	Consume(topicName string, groupName string, n int) ([]ConsumerMessage, error)
	Seek(topicName string, groupName string, msgID UniqueID) error
	SeekToLatest(topicName, groupName string) error
	ExistConsumerGroup(topicName string, groupName string) (bool, *Consumer, error)

	Notify(topicName, groupName string)
}

RocksMQ is an interface thatmay be implemented by the application to do message queue operations based on rocksdb

type UniqueID

type UniqueID = typeutil.UniqueID

UniqueID is the type of message ID

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL