Documentation ¶
Index ¶
- Constants
- Variables
- func ArbitrateConfigs(c *Configure)
- func DisableDebug()
- func EnableDebug()
- func GetLocalIP() string
- func LoadConfig()
- func NewBroker()
- func NewMsgChan() msgChan
- func PanicIfErr(err error)
- func PortToLocalAddr(port int) string
- func ServeAPI(w http.ResponseWriter, r *http.Request)
- func ServeHTTP(l net.Listener)
- func ServeHome(w http.ResponseWriter, r *http.Request)
- type Broker
- type Client
- type Configure
- type Exporter
- type Logger
- func (l *Logger) Flags() int
- func (l *Logger) Level() int32
- func (l *Logger) Panic(v ...interface{})
- func (l *Logger) Panicf(format string, v ...interface{})
- func (l *Logger) Panicln(v ...interface{})
- func (l *Logger) Prefix() string
- func (l *Logger) Print(v ...interface{})
- func (l *Logger) Printf(format string, v ...interface{})
- func (l *Logger) Println(v ...interface{})
- func (l *Logger) SetFlags(flag int)
- func (l *Logger) SetLevel(level int32)
- func (l *Logger) SetPrefix(prefix string)
- type StableStorage
- type Stat
- type Storage
- type StorageAOF
- func (s *StorageAOF) Close() error
- func (s *StorageAOF) Delete(m ...*msg.Message) error
- func (s *StorageAOF) DeleteOps() uint64
- func (s *StorageAOF) Get() (*msg.Message, error)
- func (s *StorageAOF) Rewrite(curMaxID uint64, startup bool) bool
- func (s *StorageAOF) Save(m ...*msg.Message) error
- func (s *StorageAOF) Stat() os.FileInfo
- func (s *StorageAOF) Truncate()
- type TopicQueue
Constants ¶
const ( //RUNNING status RUNNING = iota //STOP status STOP )
const ( //LEVELON status LEVELON = 0 //LEVELOFF status LEVELOFF = 1 )
const ( //SAVE action SAVE = 1 //DELETE action DELETE = 0 //NEVERSYNC strategy NEVERSYNC = 0 //EVERYSECOND strategy EVERYSECOND = 1 //ALWAYSSYNC strategy ALWAYSSYNC = 2 )
Variables ¶
var ( //ErrEmptyMsgList occurs when the msgs on disk is scanned over. ErrEmptyMsgList = errors.New("empty Msg list") )
var OneBroker sync.Once
OneBroker Only one broker
Functions ¶
func ArbitrateConfigs ¶
func ArbitrateConfigs(c *Configure)
ArbitrateConfigs will check the config file
func PortToLocalAddr ¶
PortToLocalAddr convert the port to "host:port"
Types ¶
type Broker ¶
type Broker struct {
// contains filtered or unexported fields
}
Broker instance
var DefaultBroker *Broker
DefaultBroker used by singleton
func (*Broker) Replay ¶
func (b *Broker) Replay()
Replay will recover the msgs in db when the broker startup
func (*Broker) TotalTopic ¶
TotalTopic returns the number of topics currently.
type Configure ¶
type Configure struct { HttpPort int MsgPort int Auth bool UserName string Token string Retry int Aof string SyncType int Threshold int }
Configure schema
type Exporter ¶
Exporter instance
func NewExporter ¶
NewExporter creates a new Exporter
func (*Exporter) Collect ¶
func (e *Exporter) Collect(ch chan<- prometheus.Metric)
Collect fetches and updates the appropriate metrics.
func (*Exporter) Describe ¶
func (e *Exporter) Describe(ch chan<- *prometheus.Desc)
Describe outputs the metric descriptions.
type Logger ¶
type Logger struct {
// contains filtered or unexported fields
}
Logger instance
type StableStorage ¶
type StableStorage struct {
// contains filtered or unexported fields
}
StableStorage instance
func (*StableStorage) Delete ¶
func (s *StableStorage) Delete(msgs ...*msg.Message) error
Delete remove the msg from disk
func (*StableStorage) Get ¶
func (s *StableStorage) Get() (*msg.Message, error)
Get read the msg from disk
type Stat ¶
Stat instance
type Storage ¶
type Storage interface { //Save store the msg into disk Save(m ...*msg.Message) error //Get read the msg from disk Get() (*msg.Message, error) //Delete remove the msg from disk Delete(m ...*msg.Message) error //Close shutdown the storage Close() error //Truncate will remove the db file, that is only used in dev. Truncate() }
Storage interface
type StorageAOF ¶
type StorageAOF struct { MaxID uint64 // contains filtered or unexported fields }
StorageAOF instance
func NewStorageAOF ¶
func NewStorageAOF(filename string, syncType int32, threshold int) *StorageAOF
NewStorageAOF creates a AOF storage
func (*StorageAOF) DeleteOps ¶
func (s *StorageAOF) DeleteOps() uint64
DeleteOps returns the deleteops of the AOF, to determine when to rewrite.
func (*StorageAOF) Rewrite ¶
func (s *StorageAOF) Rewrite(curMaxID uint64, startup bool) bool
Rewrite with param curMaxID, it won't rewrite messages that written into aof file after rewrite started. I achieve this by compare the MaxID, when background rewrite is invoked, pass s.MaxID as params curMaxID. when discover a Message ID > curMaxID in the progress of reading old file, then break. however, messages should never be duplicated. this takes effect on SAVE cmd. and DELETE cmd won't need de-duplicated, because msgID is never duplicated, and each can only be delete once.
type TopicQueue ¶
type TopicQueue struct {
// contains filtered or unexported fields
}
TopicQueue instance, represent a topic.
func NewTopicQueue ¶
func NewTopicQueue(broker *Broker, topic string) *TopicQueue
NewTopicQueue creates a TopicQueue
func (*TopicQueue) NumberOfSubscribers ¶
func (t *TopicQueue) NumberOfSubscribers() int64
NumberOfSubscribers returns the current number of subscribers
func (*TopicQueue) Status ¶
func (t *TopicQueue) Status() int
Status returns the runtime status of the topic