Documentation ¶
Index ¶
- Constants
- func BytesToInt64(blob []byte) int64
- func Contains(slice []int, value int) bool
- func CopyBytes(src []byte) []byte
- func Int32ToBytes(value int32) []byte
- func Int64ToBytes(value int64) []byte
- func Key(table byte, parts ...[]byte) []byte
- func Millis(nanos int64) int64
- func StringToBytes(s string) []byte
- func UInt32ToBytes(value uint32) []byte
- func UInt64ToBytes(value uint64) []byte
- type BinaryData
- type DB
- func (db *DB) Close()
- func (db *DB) FindMessage(topic string, partition int32, offset int64) (*Message, error)
- func (db *DB) FindOffset(topic string, partition int32) (int64, error)
- func (db *DB) SaveMessage(msg *Message) error
- func (db *DB) SaveOffset(topic string, partition int32, offset int64) error
- func (db *DB) SearchMessagesByTime(search *SearchRequest) (*SearchResponse, error)
- func (db *DB) Update(ops ...Operation) error
- type Datastore
- type Lookup
- type Message
- type Operation
- type Paging
- type SearchRequest
- type SearchResponse
- type Tag
Constants ¶
View Source
const ( MessagesByTopicPartitionOffset byte = iota Offsets TimestampIndex TimestampTopicIndex TimestampTopicPartitionIndex )
Variables ¶
This section is empty.
Functions ¶
func BytesToInt64 ¶
func Int32ToBytes ¶
func Int64ToBytes ¶
func StringToBytes ¶
func UInt32ToBytes ¶
func UInt64ToBytes ¶
Types ¶
type BinaryData ¶
type DB ¶
type DB struct {
// contains filtered or unexported fields
}
func (*DB) FindMessage ¶
func (*DB) SaveMessage ¶
func (*DB) SearchMessagesByTime ¶
func (db *DB) SearchMessagesByTime(search *SearchRequest) (*SearchResponse, error)
type Datastore ¶
type Datastore interface { Close() SaveMessage(msg *Message) error FindMessage(topic string, partition int32, offset int64) (*Message, error) SaveOffset(topic string, partition int32, offset int64) error FindOffset(topic string, partition int32) (int64, error) SearchMessagesByTime(search *SearchRequest) (*SearchResponse, error) }
type Message ¶
type Message struct { Key BinaryData `json:"key"` Value BinaryData `json:"value"` Topic string `json:"topic"` Partition int32 `json:"partition"` Offset int64 `json:"offset"` Timestamp int64 `json:"timestamp"` BlockTimestamp int64 `json:"blockTimestamp"` Tags map[string]Tag `json:"tags"` }
func MessageFromBytes ¶
func NewMessage ¶
func NewMessage(msg *sarama.ConsumerMessage) *Message
type SearchRequest ¶
type SearchRequest struct { Search string `json:"search"` Earliest int64 `json:"earliest"` Latest int64 `json:"latest"` Page int `json:"page"` Offset []byte `json:"offset"` Paging *Paging `json:"paging"` }
func (*SearchRequest) SkipRows ¶
func (r *SearchRequest) SkipRows() int
type SearchResponse ¶
type SearchResponse struct { Total int `json:"total"` Earliest int64 `json:"earliest"` Latest int64 `json:"latest"` Rows []*Message `json:"rows"` Page int `json:"page"` OffsetUsed bool `json:"offsetUsed"` Offsets map[string][]byte `json:"offsets"` Took int64 `json:"took"` TimedOut bool `json:"timedOut"` }
func (*SearchResponse) AddOffset ¶
func (r *SearchResponse) AddOffset(page int, offset []byte)
func (*SearchResponse) CleanOffsets ¶
func (r *SearchResponse) CleanOffsets()
func (*SearchResponse) TotalPages ¶
func (r *SearchResponse) TotalPages() int
Click to show internal directories.
Click to hide internal directories.