Documentation ¶
Overview ¶
Package skiplist implements skip list based maps and sets.
Skip lists are a data structure that can be used in place of balanced trees. Skip lists use probabilistic balancing rather than strictly enforced balancing and as a result the algorithms for insertion and deletion in skip lists are much simpler and significantly faster than equivalent algorithms for balanced trees.
Skip lists were first described in Pugh, William (June 1990). "Skip lists: a probabilistic alternative to balanced trees". Communications of the ACM 33 (6): 668–676
Index ¶
- Constants
- Variables
- func GetQueueFileName(dataRoot string, base string, fileNum int64) string
- func GetTopicFullName(topic string, part int) string
- func IsValidChannelName(name string) bool
- func IsValidTopicName(name string) bool
- func NewGUIDFactory(nodeID int64) *guidFactory
- func ReadResponse(r io.Reader) ([]byte, error)
- func ReadUnpackedResponse(r io.Reader) (int32, []byte, error)
- func UnpackResponse(response []byte) (int32, []byte, error)
- type BackendQueueEnd
- type BackendQueueReader
- type BackendQueueWriter
- type Channel
- func (c *Channel) AddClient(clientID int64, client Consumer) error
- func (c *Channel) CheckBack(m *Message)
- func (c *Channel) Close() error
- func (c *Channel) CommitDtPreMsg(msgId MessageID) error
- func (c *Channel) Delete() error
- func (c *Channel) Depth() int64
- func (c *Channel) Empty() error
- func (c *Channel) Exiting() bool
- func (c *Channel) FinishMessage(clientID int64, id MessageID) error
- func (c *Channel) GetDtPreMsgByCmtMsg(msgId MessageID) (*Message, error)
- func (c *Channel) HandleSyncChannelFromSlave() ([]byte, error)
- func (c *Channel) IsPaused() bool
- func (c *Channel) ListMost10Item() []*Message
- func (c *Channel) Pause() error
- func (c *Channel) PutMessageDeferred(msg *Message, timeout time.Duration)
- func (c *Channel) RemoveClient(clientID int64)
- func (c *Channel) RequeueMessage(clientID int64, id MessageID, timeout time.Duration) error
- func (c *Channel) RestartPreDtMsgTimeout(msg *Message, timeout time.Duration)
- func (c *Channel) StartDeferredTimeout(msg *Message, timeout time.Duration) error
- func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout time.Duration) error
- func (c *Channel) StartPreDtMsgTimeout(msg *Message, timeout time.Duration) error
- func (c *Channel) TouchMessage(clientID int64, id MessageID, clientMsgTimeout time.Duration) error
- func (c *Channel) UnPause() error
- func (c *Channel) UpdateBackendQueueEnd(backendQueueEnd BackendQueueEnd)
- type ChannelStats
- type Channels
- type ChannelsByName
- type Client
- type ClientStats
- type Consumer
- type IntervalTree
- type Iterator
- type Logger
- type Message
- type MessageDtType
- type MessageID
- type NSQD
- func (n *NSQD) AddClient(clientID int64, client Client)
- func (n *NSQD) ChannelList(topicName string) []string
- func (n *NSQD) DeleteExistingTopic(topicName string) error
- func (n *NSQD) Exit()
- func (n *NSQD) GetError() error
- func (n *NSQD) GetExistingTopic(topicName string) (*Topic, error)
- func (n *NSQD) GetHealth() string
- func (n *NSQD) GetProducerStats() []ClientStats
- func (n *NSQD) GetStartTime() time.Time
- func (n *NSQD) GetStats(topic string, channel string, includeClients bool) []TopicStats
- func (n *NSQD) GetTopic(topicName string) *Topic
- func (n *NSQD) GetTopicMapCopy() []*Topic
- func (n *NSQD) GetTopicsAndChannelsBytes() ([]byte, error)
- func (n *NSQD) HandleSyncTopicFromSlave(topicName string, ...) ([]byte, error)
- func (n *NSQD) IsAuthEnabled() bool
- func (n *NSQD) IsHealthy() bool
- func (n *NSQD) LoadMetadata() error
- func (n *NSQD) Main() error
- func (n *NSQD) Notify(v interface{})
- func (n *NSQD) PersistMetadata() error
- func (n *NSQD) RealHTTPAddr() *net.TCPAddr
- func (n *NSQD) RealHTTPSAddr() *net.TCPAddr
- func (n *NSQD) RealTCPAddr() *net.TCPAddr
- func (n *NSQD) RemoveClient(clientID int64)
- func (n *NSQD) SetHealth(err error)
- func (n *NSQD) SlaveSyncChannel(topicName, channelName string) ([]byte, error)
- func (n *NSQD) SlaveSyncLoop()
- func (n *NSQD) TopicList() []string
- type Options
- type Ordered
- type PubCount
- type QueueInterval
- func (QI *QueueInterval) End() int64
- func (QI *QueueInterval) EndCnt() int64
- func (QI *QueueInterval) HighAtDimension(dim uint64) int64
- func (QI *QueueInterval) ID() uint64
- func (QI *QueueInterval) LowAtDimension(dim uint64) int64
- func (QI *QueueInterval) OverlapsAtDimension(inter augmentedtree.Interval, dim uint64) bool
- func (QI *QueueInterval) Start() int64
- type ReadResult
- type Response
- type Set
- func (s *Set) Add(key interface{})
- func (s *Set) Contains(key interface{}) bool
- func (s *Set) GetMaxLevel() int
- func (s *Set) Iterator() Iterator
- func (s *Set) Len() int
- func (s *Set) Range(from, to interface{}) Iterator
- func (s *Set) Remove(key interface{}) (ok bool)
- func (s *Set) SetMaxLevel(newMaxLevel int)
- type SkipList
- func (s *SkipList) Delete(key interface{}) (value interface{}, ok bool)
- func (s *SkipList) Get(key interface{}) (value interface{}, ok bool)
- func (s *SkipList) GetGreaterOrEqual(min interface{}) (actualKey, value interface{}, ok bool)
- func (s *SkipList) Iterator() Iterator
- func (s *SkipList) Len() int
- func (s *SkipList) Range(from, to interface{}) Iterator
- func (s *SkipList) Seek(key interface{}) Iterator
- func (s *SkipList) SeekToFirst() Iterator
- func (s *SkipList) SeekToLast() Iterator
- func (s *SkipList) Set(key, value interface{})
- type Slave
- type Topic
- func (t *Topic) AggregateChannelE2eProcessingLatency() *quantile.Quantile
- func (t *Topic) Close() error
- func (t *Topic) CommitDtPreMsg(msgId MessageID)
- func (t *Topic) Delete() error
- func (t *Topic) DeleteExistingChannel(channelName string) error
- func (t *Topic) Empty() error
- func (t *Topic) Exiting() bool
- func (t *Topic) FlushTopicAndChannels() error
- func (t *Topic) GenerateID() MessageID
- func (t *Topic) GetChannel(channelName string) *Channel
- func (t *Topic) GetChannelMapCopy() map[string]*Channel
- func (t *Topic) GetDtPreMsgByCmtMsg(msgId MessageID) (*Message, error)
- func (t *Topic) GetExistingChannel(channelName string) (*Channel, error)
- func (t *Topic) HandleSyncTopicFromSlave(totalMsgCnt, filenum, fileoffset, virtutaloffset, maxnum int64) ([]byte, error)
- func (t *Topic) IsPaused() bool
- func (t *Topic) Pause() error
- func (t *Topic) PutMessage(m *Message) error
- func (t *Topic) PutMessages(msgs []*Message) error
- func (t *Topic) Start()
- func (t *Topic) UnPause() error
- func (t *Topic) UpdatedBackendQueueEndCallback()
- type TopicStats
- type Topics
- type TopicsByName
- type Uint64Slice
Constants ¶
const ( LOG_DEBUG = lg.DEBUG LOG_INFO = lg.INFO LOG_WARN = lg.WARN LOG_ERROR = lg.ERROR LOG_FATAL = lg.FATAL )
const ( UNKONW_STATUS = MessageDtType(0) PRE_STATUS = MessageDtType(1) CANCEL_STATUS = MessageDtType(2) COMMIT_STATUS = MessageDtType(3) )
const ( TLSNotRequired = iota TLSRequiredExceptHTTP TLSRequired )
const ( FrameTypeResponse int32 = 0 FrameTypeError int32 = 1 FrameTypeMessage int32 = 2 FrameTypeUnknown int32 = -1 )
frame types
const DefaultMaxLevel = 32
const (
MAX_POSSIBLE_MSG_SIZE = 1 << 28
)
const (
MAX_QUEUE_OFFSET_META_DATA_KEEP = 100
)
const (
MsgIDLength = 16
)
Variables ¶
var ( ErrInvalidOffset = errors.New("invalid offset") ErrNeedFixQueueStart = errors.New("init queue start should be fixed") )
var DataInconsistentError = errors.New("DataInconsistentError")
var EndOfMsgVirOffsetError = errors.New("EndOfMsgVirOffsetError")
var ErrIDBackwards = errors.New("ID went backward")
var ErrSequenceExpired = errors.New("sequence expired")
var ErrTimeBackwards = errors.New("time has gone backwards")
var FlagInconsistentError = errors.New("FlagInconsistentError")
var MagicDT = []byte(" DT")
MagicDT is the initial identifier sent when connecting for DT clients
var MagicV1 = []byte(" V1")
MagicV1 is the initial identifier sent when connecting for V1 clients
var MagicV2 = []byte(" V2")
MagicV2 is the initial identifier sent when connecting for V2 clients
var StepInconsistentError = errors.New("StepInconsistentError")
Functions ¶
func GetTopicFullName ¶
func IsValidChannelName ¶
IsValidChannelName checks a channel name for correctness
func IsValidTopicName ¶
IsValidTopicName checks a topic name for correctness
func NewGUIDFactory ¶
func NewGUIDFactory(nodeID int64) *guidFactory
func ReadResponse ¶
ReadResponse is a client-side utility function to read from the supplied Reader according to the NSQ protocol spec:
[x][x][x][x][x][x][x][x]... | (int32) || (binary) | 4-byte || N-byte ------------------------... size data
func ReadUnpackedResponse ¶
ReadUnpackedResponse reads and parses data from the underlying TCP connection according to the NSQ TCP protocol spec and returns the frameType, data or error
func UnpackResponse ¶
UnpackResponse is a client-side utility function that unpacks serialized data according to NSQ protocol spec:
[x][x][x][x][x][x][x][x]... | (int32) || (binary) | 4-byte || N-byte ------------------------... frame ID data
Returns a triplicate of: frame type, data ([]byte), error
Types ¶
type BackendQueueEnd ¶
type BackendQueueEnd interface { Offset() int64 TotalMsgCnt() int64 IsSame(BackendQueueEnd) bool }
type BackendQueueReader ¶
type BackendQueueReader interface { Put([]byte) (BackendQueueEnd, error) Close() error Delete() error Empty() error Depth() int64 ReaderFlush() error GetQueueReadEnd() BackendQueueEnd GetQueueCurMemRead() BackendQueueEnd UpdateBackendQueueEnd(BackendQueueEnd) TryReadOne() (*ReadResult, bool) Confirm(start int64, end int64, endCnt int64) bool }
BackendQueueReader represents reader for current topic's consumer
type BackendQueueWriter ¶
type BackendQueueWriter interface { Put(data []byte) (BackendQueueEnd, error) Close() error Delete() error Empty() error WriterFlush() (bool, bool, error) EndInfo() GetQueueReadEnd() BackendQueueEnd GetQueueCurWriterEnd() BackendQueueEnd }
BackendQueue represents the behavior for the secondary message storage system
func NewDiskQueueWriter ¶
type Channel ¶
Channel represents the concrete type for a NSQ channel (and also implements the Queue interface)
There can be multiple channels per topic, each with there own unique set of subscribers (clients).
Channels maintain all client and message metadata, orchestrating in-flight messages, timeouts, requeuing, etc.
func NewChannel ¶
func NewChannel(topicName string, channelName string, chEnd BackendQueueEnd, ctx *context, maxWin int64, deleteCallback func(*Channel)) *Channel
NewChannel creates a new instance of the Channel type and returns a pointer
func (*Channel) CommitDtPreMsg ¶
func (*Channel) FinishMessage ¶
FinishMessage successfully discards an in-flight message
func (*Channel) GetDtPreMsgByCmtMsg ¶
func (*Channel) HandleSyncChannelFromSlave ¶
func (*Channel) ListMost10Item ¶
func (*Channel) PutMessageDeferred ¶
func (*Channel) RemoveClient ¶
RemoveClient removes a client from the Channel's client list
func (*Channel) RequeueMessage ¶
RequeueMessage requeues a message based on `time.Duration`, ie:
`timeoutMs` == 0 - requeue a message immediately `timeoutMs` > 0 - asynchronously wait for the specified timeout
and requeue a message (aka "deferred requeue")
func (*Channel) RestartPreDtMsgTimeout ¶
func (*Channel) StartDeferredTimeout ¶
func (*Channel) StartInFlightTimeout ¶
func (*Channel) StartPreDtMsgTimeout ¶
func (*Channel) TouchMessage ¶
TouchMessage resets the timeout for an in-flight message
func (*Channel) UpdateBackendQueueEnd ¶
func (c *Channel) UpdateBackendQueueEnd(backendQueueEnd BackendQueueEnd)
type ChannelStats ¶
type ChannelStats struct { ChannelName string `json:"channel_name"` Depth int64 `json:"depth"` BackendDepth int64 `json:"backend_depth"` InFlightCount int `json:"in_flight_count"` DeferredCount int `json:"deferred_count"` MessageCount uint64 `json:"message_count"` RequeueCount uint64 `json:"requeue_count"` TimeoutCount uint64 `json:"timeout_count"` ClientCount int `json:"client_count"` Clients []ClientStats `json:"clients"` Paused bool `json:"paused"` E2eProcessingLatency *quantile.Result `json:"e2e_processing_latency"` }
func NewChannelStats ¶
func NewChannelStats(c *Channel, clients []ClientStats, clientCount int) ChannelStats
type ChannelsByName ¶
type ChannelsByName struct {
Channels
}
func (ChannelsByName) Less ¶
func (c ChannelsByName) Less(i, j int) bool
type Client ¶
type Client interface { Stats() ClientStats IsProducer() bool }
type ClientStats ¶
type ClientStats struct { ClientID string `json:"client_id"` Hostname string `json:"hostname"` Version string `json:"version"` RemoteAddress string `json:"remote_address"` State int32 `json:"state"` ReadyCount int64 `json:"ready_count"` InFlightCount int64 `json:"in_flight_count"` MessageCount uint64 `json:"message_count"` FinishCount uint64 `json:"finish_count"` RequeueCount uint64 `json:"requeue_count"` ConnectTime int64 `json:"connect_ts"` SampleRate int32 `json:"sample_rate"` Deflate bool `json:"deflate"` Snappy bool `json:"snappy"` UserAgent string `json:"user_agent"` Authed bool `json:"authed,omitempty"` AuthIdentity string `json:"auth_identity,omitempty"` AuthIdentityURL string `json:"auth_identity_url,omitempty"` PubCounts []PubCount `json:"pub_counts,omitempty"` TLS bool `json:"tls"` CipherSuite string `json:"tls_cipher_suite"` TLSVersion string `json:"tls_version"` TLSNegotiatedProtocol string `json:"tls_negotiated_protocol"` TLSNegotiatedProtocolIsMutual bool `json:"tls_negotiated_protocol_is_mutual"` }
type Consumer ¶
type Consumer interface { UnPause() Pause() Close() error TimedOutMessage() Stats() ClientStats Empty() }
type IntervalTree ¶
type IntervalTree struct {
// contains filtered or unexported fields
}
func NewIntervalTree ¶
func NewIntervalTree() *IntervalTree
func (*IntervalTree) AddOrMerge ¶
func (IT *IntervalTree) AddOrMerge(inter *QueueInterval) *QueueInterval
func (*IntervalTree) DeleteInterval ¶
func (self *IntervalTree) DeleteInterval(inter *QueueInterval)
func (*IntervalTree) Len ¶
func (IT *IntervalTree) Len() int64
type Iterator ¶
type Iterator interface { // Next returns true if the iterator contains subsequent elements // and advances its state to the next element if that is possible. Next() (ok bool) // Previous returns true if the iterator contains previous elements // and rewinds its state to the previous element if that is possible. Previous() (ok bool) // Key returns the current key. Key() interface{} // Value returns the current value. Value() interface{} // Seek reduces iterative seek costs for searching forward into the Skip List // by remarking the range of keys over which it has scanned before. If the // requested key occurs prior to the point, the Skip List will start searching // as a safeguard. It returns true if the key is within the known range of // the list. Seek(key interface{}) (ok bool) // Close this iterator to reap resources associated with it. While not // strictly required, it will provide extra hints for the garbage collector. Close() }
Iterator is an interface that you can use to iterate through the skip list (in its entirety or fragments). For an use example, see the documentation of SkipList.
Key and Value return the key and the value of the current node.
type Message ¶
type Message struct { ID MessageID Body []byte Timestamp int64 Attempts uint16 MovedSize int64 //for backend queue BackendQueueEnd // contains filtered or unexported fields }
func NewMessage ¶
type MessageDtType ¶
type MessageDtType int
type MessageID ¶
type MessageID [MsgIDLength]byte
type NSQD ¶
func (*NSQD) ChannelList ¶
func (*NSQD) DeleteExistingTopic ¶
DeleteExistingTopic removes a topic only if it exists
func (*NSQD) GetExistingTopic ¶
GetExistingTopic gets a topic only if it exists
func (*NSQD) GetProducerStats ¶
func (n *NSQD) GetProducerStats() []ClientStats
func (*NSQD) GetStartTime ¶
func (*NSQD) GetStats ¶
func (n *NSQD) GetStats(topic string, channel string, includeClients bool) []TopicStats
func (*NSQD) GetTopic ¶
GetTopic performs a thread safe operation to return a pointer to a Topic object (potentially new)
func (*NSQD) GetTopicMapCopy ¶
func (*NSQD) GetTopicsAndChannelsBytes ¶
func (*NSQD) HandleSyncTopicFromSlave ¶
func (*NSQD) IsAuthEnabled ¶
func (*NSQD) LoadMetadata ¶
func (*NSQD) PersistMetadata ¶
func (*NSQD) RealHTTPAddr ¶
func (*NSQD) RealHTTPSAddr ¶
func (*NSQD) RealTCPAddr ¶
func (*NSQD) RemoveClient ¶
func (*NSQD) SlaveSyncChannel ¶
func (*NSQD) SlaveSyncLoop ¶
func (n *NSQD) SlaveSyncLoop()
type Options ¶
type Options struct { // basic options ID int64 `flag:"node-id" cfg:"id"` LogLevel lg.LogLevel `flag:"log-level"` LogPrefix string `flag:"log-prefix"` Logger Logger TCPAddress string `flag:"tcp-address"` HTTPAddress string `flag:"http-address"` HTTPSAddress string `flag:"https-address"` BroadcastAddress string `flag:"broadcast-address"` NSQLookupdTCPAddresses []string `flag:"lookupd-tcp-address" cfg:"nsqlookupd_tcp_addresses"` AuthHTTPAddresses []string `flag:"auth-http-address" cfg:"auth_http_addresses"` HTTPClientConnectTimeout time.Duration `flag:"http-client-connect-timeout" cfg:"http_client_connect_timeout"` HTTPClientRequestTimeout time.Duration `flag:"http-client-request-timeout" cfg:"http_client_request_timeout"` NsqdMasterAddr string `flag:"nsqd-master-addr"` // diskqueue options DataPath string `flag:"data-path"` MemQueueSize int64 `flag:"mem-queue-size"` MaxBytesPerFile int64 `flag:"max-bytes-per-file"` SyncEvery int64 `flag:"sync-every"` DtCheckBackTimeout time.Duration `flag:"dt-checkback-timeout"` SyncTimeout time.Duration `flag:"sync-timeout"` LoopReadTimeout time.Duration `flag:"loop-read-timeout"` QueueScanInterval time.Duration QueueScanRefreshInterval time.Duration QueueScanSelectionCount int QueueScanWorkerPoolMax int QueueScanDirtyPercent float64 // msg and command options MsgTimeout time.Duration `flag:"msg-timeout"` MaxMsgTimeout time.Duration `flag:"max-msg-timeout"` MaxMsgSize int64 `flag:"max-msg-size"` MaxBodySize int64 `flag:"max-body-size"` MaxReqTimeout time.Duration `flag:"max-req-timeout"` ClientTimeout time.Duration // client overridable configuration options MaxHeartbeatInterval time.Duration `flag:"max-heartbeat-interval"` MaxRdyCount int64 `flag:"max-rdy-count"` MaxOutputBufferSize int64 `flag:"max-output-buffer-size"` MaxOutputBufferTimeout time.Duration `flag:"max-output-buffer-timeout"` MinOutputBufferTimeout time.Duration `flag:"min-output-buffer-timeout"` MaxConfirmWin int64 `flag:"max-confirm-win"` OutputBufferTimeout time.Duration `flag:"output-buffer-timeout"` MaxChannelConsumers int `flag:"max-channel-consumers"` // statsd integration StatsdAddress string `flag:"statsd-address"` StatsdPrefix string `flag:"statsd-prefix"` StatsdInterval time.Duration `flag:"statsd-interval"` StatsdMemStats bool `flag:"statsd-mem-stats"` StatsdUDPPacketSize int `flag:"statsd-udp-packet-size"` // e2e message latency E2EProcessingLatencyWindowTime time.Duration `flag:"e2e-processing-latency-window-time"` E2EProcessingLatencyPercentiles []float64 `flag:"e2e-processing-latency-percentile" cfg:"e2e_processing_latency_percentiles"` // TLS config TLSCert string `flag:"tls-cert"` TLSKey string `flag:"tls-key"` TLSClientAuthPolicy string `flag:"tls-client-auth-policy"` TLSRootCAFile string `flag:"tls-root-ca-file"` TLSRequired int `flag:"tls-required"` TLSMinVersion uint16 `flag:"tls-min-version"` // compression DeflateEnabled bool `flag:"deflate"` MaxDeflateLevel int `flag:"max-deflate-level"` SnappyEnabled bool `flag:"snappy"` }
func NewOptions ¶
func NewOptions() *Options
type Ordered ¶
Ordered is an interface which can be linearly ordered by the LessThan method, whereby this instance is deemed to be less than other. Additionally, Ordered instances should behave properly when compared using == and !=.
type QueueInterval ¶
type QueueInterval struct {
// contains filtered or unexported fields
}
func (*QueueInterval) End ¶
func (QI *QueueInterval) End() int64
func (*QueueInterval) EndCnt ¶
func (QI *QueueInterval) EndCnt() int64
func (*QueueInterval) HighAtDimension ¶
func (QI *QueueInterval) HighAtDimension(dim uint64) int64
func (*QueueInterval) ID ¶
func (QI *QueueInterval) ID() uint64
the augmentedtree use the low and the id to determin if the interval is the duplicate so here we use the end as the id of segment
func (*QueueInterval) LowAtDimension ¶
func (QI *QueueInterval) LowAtDimension(dim uint64) int64
func (*QueueInterval) OverlapsAtDimension ¶
func (QI *QueueInterval) OverlapsAtDimension(inter augmentedtree.Interval, dim uint64) bool
func (*QueueInterval) Start ¶
func (QI *QueueInterval) Start() int64
type ReadResult ¶
ReadResult represents the result for TryReadOne()
type Set ¶
type Set struct {
// contains filtered or unexported fields
}
Set is an ordered set data structure.
Its elements must implement the Ordered interface. It uses a SkipList for storage, and it gives you similar performance guarantees.
To iterate over a set (where s is a *Set):
for i := s.Iterator(); i.Next(); { // do something with i.Key(). // i.Value() will be nil. }
func NewCustomSet ¶
NewCustomSet returns a new Set that will use lessThan as the comparison function. lessThan should define a linear order on elements you intend to use with the Set.
func NewStringSet ¶
func NewStringSet() *Set
NewStringSet returns a new Set that accepts string elements.
func (*Set) GetMaxLevel ¶
GetMaxLevel returns MaxLevel fo the underlying skip list.
func (*Set) Range ¶
Range returns an iterator that will go through all the elements of the set that are greater or equal than from, but less than to.
func (*Set) SetMaxLevel ¶
SetMaxLevel sets MaxLevel in the underlying skip list.
type SkipList ¶
type SkipList struct { // MaxLevel determines how many items the SkipList can store // efficiently (2^MaxLevel). // // It is safe to increase MaxLevel to accomodate more // elements. If you decrease MaxLevel and the skip list // already contains nodes on higer levels, the effective // MaxLevel will be the greater of the new MaxLevel and the // level of the highest node. // // A SkipList with MaxLevel equal to 0 is equivalent to a // standard linked list and will not have any of the nice // properties of skip lists (probably not what you want). MaxLevel int // contains filtered or unexported fields }
A SkipList is a map-like data structure that maintains an ordered collection of key-value pairs. Insertion, lookup, and deletion are all O(log n) operations. A SkipList can efficiently store up to 2^MaxLevel items.
To iterate over a skip list (where s is a *SkipList):
for i := s.Iterator(); i.Next(); { // do something with i.Key() and i.Value() }
func NewCustomMap ¶
NewCustomMap returns a new SkipList that will use lessThan as the comparison function. lessThan should define a linear order on keys you intend to use with the SkipList.
func NewSkipList ¶
func NewSkipList() *SkipList
New returns a new SkipList.
Its keys must implement the Ordered interface.
func NewStringMap ¶
func NewStringMap() *SkipList
NewStringMap returns a SkipList that accepts string keys.
func (*SkipList) Delete ¶
Delete removes the node with the given key.
It returns the old value and whether the node was present.
func (*SkipList) Get ¶
Get returns the value associated with key from s (nil if the key is not present in s). The second return value is true when the key is present.
func (*SkipList) GetGreaterOrEqual ¶
GetGreaterOrEqual finds the node whose key is greater than or equal to min. It returns its value, its actual key, and whether such a node is present in the skip list.
func (*SkipList) Range ¶
Range returns an iterator that will go through all the elements of the skip list that are greater or equal than from, but less than to.
func (*SkipList) Seek ¶
Seek returns a bidirectional iterator starting with the first element whose key is greater or equal to key; otherwise, a nil iterator is returned.
func (*SkipList) SeekToFirst ¶
SeekToFirst returns a bidirectional iterator starting from the first element in the list if the list is populated; otherwise, a nil iterator is returned.
func (*SkipList) SeekToLast ¶
SeekToLast returns a bidirectional iterator starting from the last element in the list if the list is populated; otherwise, a nil iterator is returned.
type Slave ¶
func (*Slave) ConnToMaster ¶
func (*Slave) FilterResponse ¶
func (s *Slave) FilterResponse()
func (*Slave) HandleResponse ¶
func (s *Slave) HandleResponse()
type Topic ¶
func (*Topic) AggregateChannelE2eProcessingLatency ¶
func (*Topic) CommitDtPreMsg ¶
func (*Topic) DeleteExistingChannel ¶
DeleteExistingChannel removes a channel from the topic only if it exists
func (*Topic) FlushTopicAndChannels ¶
func (*Topic) GenerateID ¶
func (*Topic) GetChannel ¶
GetChannel performs a thread safe operation to return a pointer to a Channel object (potentially new) for the given Topic
func (*Topic) GetChannelMapCopy ¶
func (*Topic) GetDtPreMsgByCmtMsg ¶
func (*Topic) GetExistingChannel ¶
func (*Topic) HandleSyncTopicFromSlave ¶
func (*Topic) PutMessage ¶
func (*Topic) PutMessages ¶
PutMessages writes multiple Messages to the queue
func (*Topic) UpdatedBackendQueueEndCallback ¶
func (t *Topic) UpdatedBackendQueueEndCallback()
type TopicStats ¶
type TopicStats struct { TopicName string `json:"topic_name"` Channels []ChannelStats `json:"channels"` Depth int64 `json:"depth"` BackendDepth int64 `json:"backend_depth"` MessageCount uint64 `json:"message_count"` MessageBytes uint64 `json:"message_bytes"` Paused bool `json:"paused"` E2eProcessingLatency *quantile.Result `json:"e2e_processing_latency"` }
func NewTopicStats ¶
func NewTopicStats(t *Topic, channels []ChannelStats) TopicStats
type TopicsByName ¶
type TopicsByName struct {
Topics
}
func (TopicsByName) Less ¶
func (t TopicsByName) Less(i, j int) bool
type Uint64Slice ¶
type Uint64Slice []uint64
func (Uint64Slice) Len ¶
func (s Uint64Slice) Len() int
func (Uint64Slice) Less ¶
func (s Uint64Slice) Less(i, j int) bool
func (Uint64Slice) Swap ¶
func (s Uint64Slice) Swap(i, j int)
Source Files ¶
- IntervalTree.go
- backend_queue.go
- buffer_pool.go
- channel.go
- client_v2.go
- context.go
- diskqueue_info.go
- diskqueue_reader.go
- diskqueue_writer.go
- dqname.go
- dummy_backend_queue.go
- guid.go
- http.go
- in_flight_pqueue.go
- logger.go
- lookup.go
- lookup_peer.go
- message.go
- nsqd.go
- options.go
- pre_dt_pqueue.go
- protocol.go
- protocol_dt.go
- protocol_v2.go
- slave.go
- stats.go
- statsd.go
- tcp.go
- topic.go
- tree.go