Documentation ¶
Index ¶
- Constants
- Variables
- func CRC(input string) string
- func ConvertHistoryObjectToString(obj *HistoryObject) string
- func IsPow2(n int) bool
- func LOCKfunc(achan chan struct{}, src string) bool
- func LeftPad(input *string, length int)
- func NullPad(input *string, length int)
- func PrintMemoryStats()
- func PrintMemoryStatsEvery(interval time.Duration)
- func RemoveNullPad(input string) string
- func UNLOCKfunc(achan chan struct{}, src string)
- func UnixTimeMicroSec() int64
- func UnixTimeMilliSec() int64
- func UnixTimeNanoSec() int64
- func UnixTimeSec() int64
- type AccessControlList
- type BATCHLOCKS
- type BLCH
- type BQ
- type BatchLOG
- type BatchLOGGER
- type BatchOffset
- type CCC
- type ClearCache
- type ClearCacheChan
- type HISTORY
- func (his *HISTORY) AddHistory(hobj *HistoryObject, useL1Cache bool) int
- func (his *HISTORY) BatchTicker(char string, ticker chan struct{})
- func (his *HISTORY) BootHistory(history_dir string, hashdb_dir string, useHashDB bool, keyalgo int, keylen int)
- func (his *HISTORY) BootHistoryClient(historyServer string)
- func (his *HISTORY) CLOSE_HISTORY()
- func (his *HISTORY) DoCacheEvict(char string, hash string, offset int64, key string)
- func (his *HISTORY) FseekHistoryHeader(output *[]byte) (int, error)
- func (his *HISTORY) FseekHistoryLine(offset int64) (string, error)
- func (his *HISTORY) FseekHistoryMessageHash(file *os.File, offset int64, char string, rethash *string) error
- func (his *HISTORY) GetCounter(k string) uint64
- func (his *HISTORY) IndexQuery(hash string, indexRetChan chan int, offset int64) (int, error)
- func (his *HISTORY) NewRConn(historyServer string) *RemoteConn
- func (his *HISTORY) PrintCacheStats()
- func (his *HISTORY) SET_DEBUG(debug int)
- func (his *HISTORY) Sync_upcounter(k string)
- func (his *HISTORY) Sync_upcounterN(k string, v uint64)
- func (his *HISTORY) WatchBolt()
- type HistoryIndex
- type HistoryObject
- type HistorySettings
- type L1CACHE
- type L1CACHEMAP
- type L1ECH
- type L1ITEM
- type L1MUXER
- type L1PQ
- type L1PQItem
- type L1pqQ
- type L2CACHE
- func (l2 *L2CACHE) BootL2Cache(his *HISTORY)
- func (l2 *L2CACHE) GetHashFromOffset(offset int64, rethash *string)
- func (l2 *L2CACHE) L2Stats(statskey string) (retval uint64, retmap map[string]uint64)
- func (l2 *L2CACHE) OffsetToChar(offset int64) (retval string)
- func (l2 *L2CACHE) SetOffsetHash(offset int64, hash string, flagexpires bool)
- type L2CACHEMAP
- type L2ECH
- type L2ITEM
- type L2MUXER
- type L2PQ
- type L2PQItem
- type L2pqQ
- type L3CACHE
- func (l3 *L3CACHE) BootL3Cache(his *HISTORY)
- func (l3 *L3CACHE) GetOffsets(key string, char string, offsets *[]int64, his *HISTORY) int
- func (l3 *L3CACHE) L3Stats(statskey string) (retval uint64, retmap map[string]uint64)
- func (l3 *L3CACHE) SetOffsets(key string, char string, offsets []int64, flagexpires bool, src string, ...)
- type L3CACHEMAP
- type L3ECH
- type L3ITEM
- type L3MUXER
- type L3PQ
- type L3PQItem
- type L3pqQ
- type Offsets
- type RemoteConn
Constants ¶
const ( // never change this FlagExpires bool = true FlagNeverExpires bool = false )
const ( CR = "\r" LF = "\n" CRLF = CR + LF DefaultSocketPath = "./history.socket" // default launches a tcp port with a telnet interface @ localhost:49119 DefaultServerTCPAddr = "[::]:49119" )
const ( HashShort = 0x0B // 11 //KeyIndex = 0 MinKeyLen = 6 NumHashDBs = 16 //CharBucketBatchSize int = 16 //RootBUCKETSperDB = 16 ALWAYS = true // DefExpiresStr use 10 digits as spare so we can update it later without breaking offsets DefExpiresStr string = "----------" // never expires CaseLock = 0xFF // internal cache state. reply with CaseRetry while CaseLock CasePass = 0xF1 // is a reply to L1Lock and IndexQuery CaseDupes = 0x1C // is a reply and cache state CaseRetry = 0x2C // is a reply to if CaseLock or CaseWrite or if history.dat returns EOF CaseAdded = 0x3C // is a reply to WriterChan:responseChan CaseWrite = 0x4C // internal cache state. is not a reply. reply with CaseRetry while CaseWrite is happening CaseError = 0xE1 // some things drop this error ZEROPADLEN = 0xFFF // zeropads the header )
const (
FlagSearch = -1
)
const (
MinRetryWaiter = 100
)
Variables ¶
var ( DBG_CGS bool // DEBUG_CACHE_GROW_SHRINK DefaultCacheExpires int64 = 5 // gets x2 BatchFlushEvery x2 DefaultCacheExtend int64 = 5 // extends cached items after writes DefaultCachePurge int64 = 1 // checks ttl every N seconds. affects CacheExpires/Extend max to + Purge DefaultEvictsCapacity = 16 // his.cEvCap (size of Extend chan) is normally fine as is. ClearEveryN = 16 )
var ( BootHisCli bool DefaultHistoryServer = "[::1]:49119" // localhost:49119 // set only once before boot TCPchanQ = 128 DefaultDialTimeout = 5 // seconds DefaultRetryWaiter = 500 // milliseconds DefaultDialRetries = -1 // try N times and fail or <= 0 enables infinite retry )
var ( DEBUGL1 bool = false L1 bool = true // better not disable L1 cache... L1CacheExpires int64 = DefaultCacheExpires L1ExtendExpires int64 = DefaultCacheExtend L1Purge int64 = DefaultCachePurge L1InitSize int = 64 * 1024 // L1LockDelay: delays L1 locking by N milliseconds // L1 locking is most likely done per client-connection // settings this greater 0 limits the amount of articles a client can lock&send // 1ms is a max of 1000 messages/sec per conn // 100ms is a max of 10 messages/sec per conn // 250ms is a max of 4 messages/sec per conn // 1000ms is a max of 1 message /sec per conn // text peers mostly dont need more than 4 msg per sec L1LockDelay int = 0 )
var ( DEBUGL2 bool = false L2 bool = true L2CacheExpires int64 = DefaultCacheExpires L2ExtendExpires int64 = DefaultCacheExtend L2Purge int64 = DefaultCachePurge L2InitSize int = 64 * 1024 )
L2Cache: offset => hash less requests to hisDat
var ( DEBUGL3 bool = false L3 bool = true // do not disable! L3CacheExpires int64 = DefaultCacheExpires L3ExtendExpires int64 = DefaultCacheExtend L3Purge int64 = DefaultCachePurge L3InitSize int = 64 * 1024 )
* * L3Cache: key => offsets * less requests to hashDB * * disabling L3 is not a good idea!! * queues hold offsets which DB does not know about! * cache keeps track of duplicate writes * duplicate keys will get an empty_offsets * and the latest will overwrite the past write maybe still in queue *
var ( IndexParallel int = NumHashDBs NumQueueWriteChan int = NumHashDBs HisDatWriteBuffer int = 4 * 1024 )
var ( ForcedReplay bool NoReplayHisDat bool BatchFlushEvery int64 = 5120 // milliseconds HISTORY_INDEX_LOCK = make(chan struct{}, 1) // main lock HISTORY_INDEX_LOCK16 = make(chan struct{}, NumHashDBs) // sub locks BootVerbose = true //TESTHASH0 = "0f05e27ca579892a63a256dacd657f5615fab04bf81e85f53ee52103e3a4fae8" //TESTHASH1 = "f0d784ae13ce7cf1f3ab076027a6265861eb003ad80069cdfb1549dd1b8032e8" //TESTHASH2 = "f0d784ae1747092974d02bd3359f044a91ed4fd0a39dc9a1feffe646e6c7ce09" TESTHASH = "" //TESTCACKEY = "f0d784ae1" //TESTKEY = "784ae1" //TESTBUK = "0d" //TESTDB = "f" //TESTOFFSET = 123456 ROOTDBS []string //ROOTBUCKETS []string //SUBBUCKETS []string BUFLINES = 10 BUFIOBUFFER = 102 * BUFLINES // a history line with sha256 is 102 bytes long including LF or 38 bytes of payload + hashLen History HISTORY DEBUG bool = true DEBUG0 bool = false DEBUG1 bool = false DEBUG2 bool = false DEBUG9 bool = false LOCKHISTORY = make(chan struct{}, 1) HEXCHARS = []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"} )
var (
CPUProfile bool // set before boot
)
var ( // set HEX true: converts offset into hex strings to store in hashdb // dont change later once db is initialized! HEX bool = true )
Functions ¶
func ConvertHistoryObjectToString ¶
func ConvertHistoryObjectToString(obj *HistoryObject) string
func PrintMemoryStats ¶
func PrintMemoryStats()
func PrintMemoryStatsEvery ¶
func RemoveNullPad ¶
func UNLOCKfunc ¶
func UNLOCKfunc(achan chan struct{}, src string)
func UnixTimeMicroSec ¶
func UnixTimeMicroSec() int64
func UnixTimeMilliSec ¶
func UnixTimeMilliSec() int64
func UnixTimeNanoSec ¶
func UnixTimeNanoSec() int64
func UnixTimeSec ¶
func UnixTimeSec() int64
Types ¶
type AccessControlList ¶
type AccessControlList struct {
// contains filtered or unexported fields
}
var ( ACL AccessControlList DefaultACL map[string]bool // can be set before booting )
func (*AccessControlList) IsAllowed ¶
func (a *AccessControlList) IsAllowed(ip string) bool
func (*AccessControlList) SetACL ¶
func (a *AccessControlList) SetACL(ip string, val bool)
func (*AccessControlList) SetupACL ¶
func (a *AccessControlList) SetupACL()
type BATCHLOCKS ¶
type BATCHLOCKS struct {
// contains filtered or unexported fields
}
type BQ ¶
type BQ struct { Maps map[string]map[string]chan *BatchOffset BootCh chan struct{} // contains filtered or unexported fields }
BatchQueue
type BatchLOGGER ¶
type BatchLOGGER struct {
// contains filtered or unexported fields
}
type BatchOffset ¶
type BatchOffset struct {
// contains filtered or unexported fields
}
used to batch write items to hashDB
type ClearCache ¶
type ClearCache struct {
// contains filtered or unexported fields
}
type ClearCacheChan ¶
type ClearCacheChan struct {
// contains filtered or unexported fields
}
type HISTORY ¶
type HISTORY struct { L1Cache L1CACHE L2Cache L2CACHE L3Cache L3CACHE Offset int64 // the actual offset for history.dat WriterChan chan *HistoryObject // history.dat writer channel IndexChan chan *HistoryIndex // main index query channel BatchLogs BatchLOGGER BatchLocks map[string]*BATCHLOCKS // used to lock char:bucket in BoltSync and boltBucketPutBatch Counter map[string]uint64 WBR bool // WatchBoltRunning CutCharRO int // read-only value of cutChar CPUfile *os.File // ptr to file for cpu profiling MEMfile *os.File // ptr to file for mem profiling // TCPchan: used to send hobj via handleRConn to a remote historyServer TCPchan chan *HistoryObject // contains filtered or unexported fields }
func (*HISTORY) AddHistory ¶
func (his *HISTORY) AddHistory(hobj *HistoryObject, useL1Cache bool) int
func (*HISTORY) BatchTicker ¶
func (*HISTORY) BootHistory ¶
func (his *HISTORY) BootHistory(history_dir string, hashdb_dir string, useHashDB bool, keyalgo int, keylen int)
BootHistory initializes the history component, configuring its settings and preparing it for operation. It sets up the necessary directories for history and hash databases, and opens the history data file. The function also manages the communication channels for reading and writing historical data. If the `useHashDB` parameter is set to true, it initializes the history database (HashDB) and starts worker routines. Parameters:
- history_dir: The directory where history data will be stored.
- hashdb_dir: The directory where the history database (HashDB) will be stored.
- useHashDB: If true, enables the use of the history database (HashDB).
- boltOpts: Bolt database options for configuring the HashDB.
- keyalgo: The hash algorithm used for indexing historical data.
- keylen: The length of the hash values used for indexing.
func (*HISTORY) BootHistoryClient ¶
func (*HISTORY) CLOSE_HISTORY ¶
func (his *HISTORY) CLOSE_HISTORY()
func (*HISTORY) DoCacheEvict ¶
gets called in BBATCH.go:boltBucketPutBatch() after boltTX
func (*HISTORY) FseekHistoryHeader ¶
func (*HISTORY) FseekHistoryLine ¶
func (*HISTORY) FseekHistoryMessageHash ¶
func (his *HISTORY) FseekHistoryMessageHash(file *os.File, offset int64, char string, rethash *string) error
FseekHistoryMessageHash seeks to a specified offset in the history file and extracts a message-ID hash. It reads characters from the file until a tab character ('\t') is encountered, extracting the hash enclosed in curly braces. If a valid hash is found, it returns the hash as a string without curly braces. If the end of the file (EOF) is reached, it returns a special EOF marker.
func (*HISTORY) GetCounter ¶
func (*HISTORY) IndexQuery ¶
func (*HISTORY) NewRConn ¶
func (his *HISTORY) NewRConn(historyServer string) *RemoteConn
func (*HISTORY) PrintCacheStats ¶
func (his *HISTORY) PrintCacheStats()
func (*HISTORY) Sync_upcounter ¶
func (*HISTORY) Sync_upcounterN ¶
type HistoryIndex ¶
type HistoryIndex struct { Hash string Char string // first N chars of hash Offset int64 // used to search: -1 or add: > 0 a hash IndexRetChan chan int // receives a 0,1,2 :: pass|duplicate|retrylater }
used to query the index
type HistoryObject ¶
type HistoryObject struct { MessageIDHash string StorageToken string // "F" = flatstorage | "M" = mongodb | "X" = deleted Char string Arrival int64 Expires int64 Date int64 ResponseChan chan int // receives a 0,1,2 :: pass|duplicate|retrylater }
func ConvertStringToHistoryObject ¶
func ConvertStringToHistoryObject(parts []string) (*HistoryObject, error)
type HistorySettings ¶
type HistorySettings struct { // constant values once DBs are initalized Ka int // keyalgo Kl int // keylen }
builds the history.dat header
type L1CACHE ¶
type L1CACHE struct { Caches map[string]*L1CACHEMAP Extend map[string]*L1ECH Muxers map[string]*L1MUXER Counter map[string]*CCC // contains filtered or unexported fields }
func (*L1CACHE) BootL1Cache ¶
The BootL1Cache method initializes the cache system. It creates cache maps, initializes them with initial sizes, and starts goroutines to periodically purge expired entries.
func (*L1CACHE) LockL1Cache ¶
The LockL1Cache method is used to LOCK a `MessageIDHash` for processing. If the value is not in the cache or has expired, it locks the cache, updates the cache with a new value, and returns the value. Possible return values:
CaseLock == already in processing CaseWrite == already in processing CaseDupes == is a duplicate CasePass == not a duplicate == locked article for processing
type L1CACHEMAP ¶
type L1CACHEMAP struct {
// contains filtered or unexported fields
}
type L2CACHE ¶
type L2CACHE struct { Caches map[string]*L2CACHEMAP Extend map[string]*L2ECH Muxers map[string]*L2MUXER Counter map[string]*CCC // contains filtered or unexported fields }
func (*L2CACHE) BootL2Cache ¶
The BootL2Cache method initializes the L2 cache. It creates cache maps, initializes them with initial sizes, and starts goroutines to periodically clean up expired entries.
func (*L2CACHE) GetHashFromOffset ¶
The GetHashFromOffset method retrieves a hash from the L2 cache using an offset as the key.
func (*L2CACHE) OffsetToChar ¶
type L2CACHEMAP ¶
type L2CACHEMAP struct {
// contains filtered or unexported fields
}
type L3CACHE ¶
type L3CACHE struct { Caches map[string]*L3CACHEMAP Extend map[string]*L3ECH Muxers map[string]*L3MUXER Counter map[string]*CCC // contains filtered or unexported fields }
func (*L3CACHE) BootL3Cache ¶
The BootL3Cache method initializes the L3 cache. It creates cache maps, initializes them with initial sizes, and starts goroutines to periodically clean up expired entries.
func (*L3CACHE) GetOffsets ¶
The GetOffsets method retrieves a slice of offsets from the L3 cache using a key and a char.
func (*L3CACHE) SetOffsets ¶
func (l3 *L3CACHE) SetOffsets(key string, char string, offsets []int64, flagexpires bool, src string, his *HISTORY)
The SetOffsets method sets a cache item in the L3 cache using a key, char and a slice of offsets as the value. It also dynamically grows the cache when necessary.
type L3CACHEMAP ¶
type L3CACHEMAP struct {
// contains filtered or unexported fields
}
type RemoteConn ¶
type RemoteConn struct {
// contains filtered or unexported fields
}
holds connection to historyServer