Documentation ¶
Index ¶
- Variables
- func NopWCloser(w io.Writer) io.WriteCloser
- type BLTopicScanner
- type CompressionType
- type IntegrityChecker
- type IntegrityError
- type IntegrityErrorType
- type Message
- func (m *Message) Bytes() []byte
- func (m *Message) CRC32() uint32
- func (m *Message) ChecksumOK() bool
- func (m *Message) CompVer() uint8
- func (m *Message) Compression() CompressionType
- func (m *Message) PLength() uint32
- func (m *Message) Payload() []byte
- func (m *Message) Size() int
- func (m *Message) Version() uint8
- type NLError
- type NetLog
- type Option
- type PersistentTopicScanner
- type SegmentMonitor
- type StreamerAtomicMap
- func (am *StreamerAtomicMap) Delete(key string)
- func (am *StreamerAtomicMap) Get(key string) (value *biglog.Streamer, ok bool)
- func (am *StreamerAtomicMap) GetAll() map[string]*biglog.Streamer
- func (am *StreamerAtomicMap) Len() int
- func (am *StreamerAtomicMap) Set(key string, value *biglog.Streamer)
- type TScannerInfo
- type Topic
- func (t *Topic) CheckIntegrity(ctx context.Context, from int64) ([]*IntegrityError, error)
- func (t *Topic) CheckSegments() error
- func (t *Topic) DeleteScanner(ID string) (err error)
- func (t *Topic) DirPath() string
- func (t *Topic) FlushBuffered() error
- func (t *Topic) Info() (i *TopicInfo, err error)
- func (t *Topic) Name() string
- func (t *Topic) NewScanner(from int64, persist bool) (ts TopicScanner, err error)
- func (t *Topic) ParseOffset(str string) (int64, error)
- func (t *Topic) Payload(offset int64) ([]byte, error)
- func (t *Topic) ReadFrom(r io.Reader) (n int64, err error)
- func (t *Topic) Scanner(ID string) (ts TopicScanner, err error)
- func (t *Topic) Sync() error
- func (t *Topic) Write(p []byte) (n int, err error)
- func (t *Topic) WriteN(p []byte, n int) (written int, err error)
- type TopicAtomicMap
- type TopicInfo
- type TopicScanner
- type TopicScannerAtomicMap
- func (am *TopicScannerAtomicMap) Delete(key string)
- func (am *TopicScannerAtomicMap) Get(key string) (value TopicScanner, ok bool)
- func (am *TopicScannerAtomicMap) GetAll() map[string]TopicScanner
- func (am *TopicScannerAtomicMap) Len() int
- func (am *TopicScannerAtomicMap) Set(key string, value TopicScanner)
- type TopicSettings
Constants ¶
This section is empty.
Variables ¶
var ( // ErrUnknown is returned when an underlying stardard Go error reaches the user. ErrUnknown = newErr(http.StatusInternalServerError, "netlog: unkwown error") // ErrInvalidDir is returned when the data folder provided does not exists or is not writable. ErrInvalidDir = newErr(http.StatusInternalServerError, "netlog: invalid data directory") // ErrBadRequest is returned when invalid parameters are received. ErrBadRequest = newErr(http.StatusBadRequest, "netlog: bad request") // ErrInvalidOffset is returned when the requested offset can not be parsed into an number. ErrInvalidOffset = newErr(http.StatusBadRequest, "netlog: invalid offset") // ErrInvalidDuration is returned when a given big duration can not be parsed ErrInvalidDuration = newErr(http.StatusBadRequest, "netlog: invalid duration") // ErrInvalidCompression is returning when the compression type defined is unknown ErrInvalidCompression = newErr(http.StatusBadRequest, "netlog: invalid compression type") // ErrTopicExists is returning when trying to create an already existing topic. ErrTopicExists = newErr(http.StatusBadRequest, "netlog: topic exists") // ErrEndOfTopic is returned when the reader has read all the way until the end of the topic. ErrEndOfTopic = newErr(http.StatusNotFound, "netlog: end of topic") // ErrTopicNotFound is returned when addressing an non-existing topic. ErrTopicNotFound = newErr(http.StatusNotFound, "netlog: topic not found") // ErrScannerNotFound is returning when using a non-existing scanner ID. ErrScannerNotFound = newErr(http.StatusNotFound, "netlog: scanner not found") // ErrOffsetNotFound is returning when the offset is no longer or not yet present in the topic. ErrOffsetNotFound = newErr(http.StatusNotFound, "netlog: offset not found") // ErrCRC is returned when a message's payload does not match's the CRC header. ErrCRC = newErr(http.StatusInternalServerError, "netlog: checksum error") // ErrBusy is retuning when trying to close or delete a topic with readers attached to it. ErrBusy = newErr(http.StatusConflict, "netlog: resource busy") )
Functions ¶
func NopWCloser ¶
func NopWCloser(w io.Writer) io.WriteCloser
NopWCloser returns a WriteCloser with a no-op Close method wrapping the provided Writer w.
Types ¶
type BLTopicScanner ¶
type BLTopicScanner struct {
// contains filtered or unexported fields
}
BLTopicScanner implements TopicScanner reading from BigLog.
func (*BLTopicScanner) Close ¶
func (ts *BLTopicScanner) Close() error
Close implements io.Closer and releases the TopicScanner resources.
func (*BLTopicScanner) Info ¶
func (ts *BLTopicScanner) Info() TScannerInfo
Info returns a TScannerInfo struct with the scanner's next offset and the initial offset.
type CompressionType ¶
type CompressionType uint8
CompressionType indicates a type of compression for message sets
const ( // CompressionDefault is used when falling back to the default compression of the system. CompressionDefault CompressionType = 0 // CompressionNone is used by messages sets with uncompressed payloads CompressionNone CompressionType = 1 // CompressionGzip is used by message sets with gzipped payloads CompressionGzip CompressionType = 2 // CompressionSnappy is used by message sets with snappy payloads CompressionSnappy CompressionType = 3 )
type IntegrityChecker ¶
type IntegrityChecker struct {
// contains filtered or unexported fields
}
IntegrityChecker is used to check the integrity of an entire topic.
func NewIntegrityChecker ¶
func NewIntegrityChecker(t *Topic, from int64) (*IntegrityChecker, error)
NewIntegrityChecker creates a new integrity checker for a given topic.
func (*IntegrityChecker) Check ¶
func (ic *IntegrityChecker) Check(ctx context.Context) (errors []*IntegrityError)
Check reads all data collecting errors which then returns. Is recommended to pass a cancellable context since this operation can be slow.
func (*IntegrityChecker) Close ¶
func (ic *IntegrityChecker) Close() error
Close releases the underlying resources.
type IntegrityError ¶
type IntegrityError struct { Offset int64 `json:"offset"` ODelta int `json:"odelta"` Type IntegrityErrorType `json:"type"` Expected string `json:"expected"` Actual string `json:"actual"` }
IntegrityError is the struct with metadata about an any integrity error found.
func CheckMessageIntegrity ¶
func CheckMessageIntegrity(m Message, delta int) *IntegrityError
CheckMessageIntegrity checks the integrity of a single message
type IntegrityErrorType ¶
type IntegrityErrorType string
IntegrityErrorType is the category of possible errors in the data.
const ( // IntegrityChecksumErr is returned when the checksum in the message // header doesn't match the checksum recalculated from the payload. IntegrityChecksumErr IntegrityErrorType = "checksum" // IntegrityLengthErr is returned when the length in the message // header doesn't match the length of the payload. IntegrityLengthErr IntegrityErrorType = "length" // IntegrityUnknownErr is returned when data can not be read because // of an underlying error reading the data. IntegrityUnknownErr IntegrityErrorType = "unknown" )
type Message ¶
type Message []byte
Message the unit of data storage.
func MessageFromPayload ¶
MessageFromPayload returns a message with the appropriate calculated headers from a give data payload.
func MessageSet ¶
func MessageSet(msgs []Message, comp CompressionType) Message
MessageSet returns a new message with a batch of compressed messages as payload Compression will compress the payload and set the compression header, please be ware that compression at this level is only meant for batching several messages into a single message-set in increase throughput. MessageSet will panic if a compression type is not provided, since nothing would indicate to streaming clients that further messages are embedded in the payload.
func ReadMessage ¶
ReadMessage reads a message from r and returns it if the message is compressed it does not attempt to unpack the contents.
func (*Message) ChecksumOK ¶
ChecksumOK recalculates the CRC from the payload and compares it with the one stored in the header
func (*Message) CompVer ¶
CompVer returns the first byte which reflects both compression and format version.
func (*Message) Compression ¶
func (m *Message) Compression() CompressionType
Compression returns the compression encoded in bits 4 to 8 of the header.
type NetLog ¶
type NetLog struct {
// contains filtered or unexported fields
}
NetLog is the main struct that serves a set of topics, usually it must be wrapped with an HTTP transport.
func NewNetLog ¶
NewNetLog creates a new NetLog in a given data folder that must exist and be writable.
func (*NetLog) CreateTopic ¶
func (nl *NetLog) CreateTopic(name string, settings TopicSettings) (t *Topic, err error)
CreateTopic creates a new topic with a given name and default settings.
func (*NetLog) DeleteTopic ¶
DeleteTopic deletes an existing topic by name.
type Option ¶
type Option func(*NetLog)
Option is the type of function used to set internal parameters.
func DefaultTopicSettings ¶
func DefaultTopicSettings(settings TopicSettings) Option
DefaultTopicSettings sets the default topic settings used if no other is defined at creation time.
func MonitorInterval ¶
func MonitorInterval(interval bigduration.BigDuration) Option
MonitorInterval defines de interval at which the segment monitor in charge of spiting and discarding segments runs.
type PersistentTopicScanner ¶
type PersistentTopicScanner struct {
// contains filtered or unexported fields
}
PersistentTopicScanner synchronizes the underlying scanner state to a given writer
func (*PersistentTopicScanner) Close ¶
func (p *PersistentTopicScanner) Close() error
Close deletes the offset tracking file, closes the offset channel and closes the underlying scanner
func (*PersistentTopicScanner) ID ¶
func (p *PersistentTopicScanner) ID() string
ID the ID of the scanner
func (*PersistentTopicScanner) Info ¶
func (p *PersistentTopicScanner) Info() TScannerInfo
Info returns a TScannerInfo struct with the scanner's next offset and the last scanned one
type SegmentMonitor ¶
type SegmentMonitor struct {
// contains filtered or unexported fields
}
SegmentMonitor periodically checks for segments to split or discard at a given interval.
type StreamerAtomicMap ¶
type StreamerAtomicMap struct {
// contains filtered or unexported fields
}
StreamerAtomicMap is a copy-on-write thread-safe map of pointers to Streamer
func NewStreamerAtomicMap ¶
func NewStreamerAtomicMap() *StreamerAtomicMap
NewStreamerAtomicMap returns a new initialized StreamerAtomicMap
func (*StreamerAtomicMap) Delete ¶
func (am *StreamerAtomicMap) Delete(key string)
Delete removes the pointer to Streamer under key from the map
func (*StreamerAtomicMap) Get ¶
func (am *StreamerAtomicMap) Get(key string) (value *biglog.Streamer, ok bool)
Get returns a pointer to Streamer for a given key
func (*StreamerAtomicMap) GetAll ¶
func (am *StreamerAtomicMap) GetAll() map[string]*biglog.Streamer
GetAll returns the underlying map of pointers to Streamer this map must NOT be modified, to change the map safely use the Set and Delete functions and Get the value again
func (*StreamerAtomicMap) Len ¶
func (am *StreamerAtomicMap) Len() int
Len returns the number of elements in the map
type TScannerInfo ¶
type TScannerInfo struct { ID string `json:"id"` Next int64 `json:"next"` From int64 `json:"from"` Persist bool `json:"persistent"` }
TScannerInfo holds the scanner's offset information
type Topic ¶
type Topic struct {
// contains filtered or unexported fields
}
Topic is a log of linear messages.
func (*Topic) CheckIntegrity ¶
CheckIntegrity scans the topic and checks for inconsistencies in the data
func (*Topic) CheckSegments ¶
CheckSegments is called by the runner and discards or splits segments when conditions are met.
func (*Topic) DeleteScanner ¶
DeleteScanner removes the scanner from the topic
func (*Topic) FlushBuffered ¶
FlushBuffered flushes all buffered messages into the BigLog. Notice that the BigLog might have a buffer on its own that this function does not flush, so calling this does not mean the data has been stored on disk.
func (*Topic) NewScanner ¶
func (t *Topic) NewScanner(from int64, persist bool) (ts TopicScanner, err error)
NewScanner creates a new scanner starting at offset `from`. If `persist` is true, the scanner and it's state will survive server restarts
func (*Topic) ParseOffset ¶
ParseOffset converts an offset string into a numeric precise offset 'beginning', 'first' or 'oldest' return the lowest available offset in the topic 'last' or 'latest' return the highest available offset in the topic 'end' or 'now' return the next offset to be written in the topic numeric string values are directly converted to integer duration notation like "1day" returns the first offset available since 1 day ago.
func (*Topic) ReadFrom ¶
ReadFrom reads an entry or stream of entries from r until EOF is reached writes the entry/stream into the topic is the entry is valid. The return value n is the number of bytes read. It implements the io.ReaderFrom interface.
func (*Topic) Scanner ¶
func (t *Topic) Scanner(ID string) (ts TopicScanner, err error)
Scanner returns an existing scanner for the topic given and ID or ErrScannerNotFound if it doesn't exists.
type TopicAtomicMap ¶
type TopicAtomicMap struct {
// contains filtered or unexported fields
}
TopicAtomicMap is a copy-on-write thread-safe map of pointers to Topic
func NewTopicAtomicMap ¶
func NewTopicAtomicMap() *TopicAtomicMap
NewTopicAtomicMap returns a new initialized TopicAtomicMap
func (*TopicAtomicMap) Delete ¶
func (am *TopicAtomicMap) Delete(key string)
Delete removes the pointer to Topic under key from the map
func (*TopicAtomicMap) Get ¶
func (am *TopicAtomicMap) Get(key string) (value *Topic, ok bool)
Get returns a pointer to Topic for a given key
func (*TopicAtomicMap) GetAll ¶
func (am *TopicAtomicMap) GetAll() map[string]*Topic
GetAll returns the underlying map of pointers to Topic this map must NOT be modified, to change the map safely use the Set and Delete functions and Get the value again
func (*TopicAtomicMap) Len ¶
func (am *TopicAtomicMap) Len() int
Len returns the number of elements in the map
func (*TopicAtomicMap) Set ¶
func (am *TopicAtomicMap) Set(key string, value *Topic)
Set inserts in the map a pointer to Topic under a given key
type TopicInfo ¶
type TopicInfo struct { *biglog.Info Scanners map[string]TScannerInfo `json:"scanners"` }
TopicInfo returns the topic information including information about size, segments, scanners and streamers
type TopicScanner ¶
type TopicScanner interface { ID() string Scan(ctx context.Context) (m Message, offset int64, err error) Info() TScannerInfo Close() error }
TopicScanner reads one by one over the messages in a topic blocking until new data is available for a period of time. TopicScanners are thread-safe.
func NewTopicScanner ¶
NewTopicScanner returns a new topic scanner ready to scan starting at offset `from`, if persist is true, the scanner and its last position will survive across server restarts
type TopicScannerAtomicMap ¶
type TopicScannerAtomicMap struct {
// contains filtered or unexported fields
}
TopicScannerAtomicMap is a copy-on-write thread-safe map of TopicScanner
func NewTopicScannerAtomicMap ¶
func NewTopicScannerAtomicMap() *TopicScannerAtomicMap
NewTopicScannerAtomicMap returns a new initialized TopicScannerAtomicMap
func (*TopicScannerAtomicMap) Delete ¶
func (am *TopicScannerAtomicMap) Delete(key string)
Delete removes the TopicScanner under key from the map
func (*TopicScannerAtomicMap) Get ¶
func (am *TopicScannerAtomicMap) Get(key string) (value TopicScanner, ok bool)
Get returns a TopicScanner for a given key
func (*TopicScannerAtomicMap) GetAll ¶
func (am *TopicScannerAtomicMap) GetAll() map[string]TopicScanner
GetAll returns the underlying map of TopicScanner this map must NOT be modified, to change the map safely use the Set and Delete functions and Get the value again
func (*TopicScannerAtomicMap) Len ¶
func (am *TopicScannerAtomicMap) Len() int
Len returns the number of elements in the map
func (*TopicScannerAtomicMap) Set ¶
func (am *TopicScannerAtomicMap) Set(key string, value TopicScanner)
Set inserts in the map a TopicScanner under a given key
type TopicSettings ¶
type TopicSettings struct { // SegAge is the age at after which old segments are discarded. SegAge bigduration.BigDuration `json:"segment_age,ommitempty"` // SegSize is the size at which a new segment should be created. SegSize int64 `json:"segment_size,ommitempty"` // BatchNumMessages is the maximum number of messages to be batched. BatchNumMessages int `json:"batch_num_messages,ommitempty"` // BatchInterval is the interval at which batched messages are flushed to disk. BatchInterval bigduration.BigDuration `json:"batch_interval,ommitempty"` // CompressionType allows to specify how batches are compressed. CompressionType CompressionType `json:"compression_type,ommitempty"` }
TopicSettings holds the tunable settings of a topic.