storage

package module
v0.0.0-...-cced521 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 11, 2024 License: MIT Imports: 13 Imported by: 1

README

nntp-storage

nntp-storage provides an interface via channels to read/write usenet articles from/to storage

Flat file storage stores articles in flat text files: one for head and one for body.

CacheDir structure creates 3 levels of directories [0-9a-f]/[0-9a-f]/[0-9a-f]/

Files are stored in format:

  "cachedir/[head|body]/[0-9a-f]/[0-9a-f]/[0-9a-f]/"+messageidhash[3:]+".[head|body]"

First 3 chars are cut from hash and used for directories.

Remaining string (missing first 3 chars) is used as filename.[head|body]

Hash of Message-ID should be md5, sha1, sha256 or better.

Choose wisely or a change may break things!

Xref_Linker

is needed to access "article|head|body|stat" via MsgNumber by creating softlinks from constructed Xref info

it creates soft(sym)links to message-ids in

"cachedir/[head|body]/groups/"+grouphash+"/"+"Msgnum"

Xref_linker eats a *redis.Pool as optional argument. If you dont want redis: set redis_pool to nil and AskRedis = false

If you pass a *redis.Pool to Xref_linker it will create links in redis as constant (no expiry) key/value.

When crafting read.request: only set .AskRedis = true when

requesting an "article|head|body|stat" via Msgnum, not via message-ID!

If you do xref-linking in redis: better have loads of ram and fast storage for redis!

Redis killed my test import machine in one night...

redis-db > 30GB started swapping and bg-save will never complete with default settings

Redis can be enabled later at any time but will only create old entries on read if wasnt found in redis.
One can choose to disable 'do_softlinks' and run only with redis
but disabling redis later is not possible (yet) because grouphash/msgnum links do not exist without redis
and there is yet no code to export data from redis and create symlink on filesystem.

flat_storage_reader:

provides 2 channels of type 'storage.ReadReq':

-> 'storage.ReadCache.RC_head_chan'

-> 'storage.ReadCache.RC_body_chan'

type ReadReq struct {
	Sessionid       string
	File_path       string
	Cli_chan        chan ReadItem
	Cmdstring       string
	AskRedis        bool
} // end ReadReq struct

Crafting read requests

readreq := storage.ReadReq{}
readreq.Sessionid = "anything"
// the client constructs file_path from message-id hash
readreq.File_path = "/mnt/cache/0/0/0/ffffff.[head|body]"
// create the channel where client will receive reply to readreq
readreq.Cli_chan = make(chan storage.ReadItem, 1)
// Cmdstring in lowercase
readreq.Cmdstring = "[article|head|body|stat]"
readreq.AskRedis = false

// choose correct channel to put the read request for head or body

// send read request for head:
storage.ReadCache.RC_head_chan <- readreq

// send read request for body:
storage.ReadCache.RC_body_chan <- readreq

reply := <- readreq.Cli_chan

// when requesting an article:
// craft 2 requests with File_path .[head|body]
// wait for replies on Cli_chan, create one for each request
// and construct the article yourself.

// reply.Fileobj is a type []byte.
// if requesting "stat":
//  reply.Fileobj[0] is: 'S' ok OR '0' not found

// do not send the reply.Fileobj as []byte to client!!
//   contains LF and no CRLF pair to define end of line.
// get a sendable []string and send lines with CRLF to client
lines := utils.Bytes2Lines(reply.Fileobj)

flat_storage_writer:

provides 2 channels of type 'storage.CacheItem':

-> 'storage.WriteCache.WC_head_chan'

-> 'storage.WriteCache.WC_body_chan'

Pass a CacheItem object with Lines []string or Bytes []byte into the respective channels and nntp-storage will write head/body to storage.

type CacheItem struct {
	Msgidhash       string
	Head            bool
	Lines           []string	// use either []string
	Bytes           []byte		// or []byte but dont supply both!
	Size            int
} // end CacheItem struct

type ReadItem struct {
	Fileobj         []byte
	Err             error
} // end ReadItem struct

Crafting write requests

// storage backend will not count bytes again,
// Size has to be supplied from layer above.

head_part := storage.CacheItem{}
head_part.Msgidhash = sha256(article.messageid)
// if object is a head, set head to true, else leave out or set to false.
head_part.Head = true
// Lines of head should be []string, not a MIMEheader map which is sadly not ordered... :/
head_part.Lines = []string 	// use either []string
head_part.Bytes = []byte	// or []byte but dont supply both!
head_part.Size = head.byteSize

body_part := storage.CacheItem{}
body_part.Msgidhash = sha256(article.messageid) // use generated msgidhash from head if we have it here. else create one!
body_part.Lines = []string	// use either []string
body_part.Bytes = []byte	// or []byte but dont supply both!
body_part.Size = body.byteSize

// pass objects to the correct channels or
//      they will get discarded with an error message only
//          without interrupting the system!

storage.WriteCache.WC_head_chan <- head_part

storage.WriteCache.WC_body_chan <- body_part

There is no response to sender.

Fire to channel and forget.

Storage writes it, or Houston, we have a problem!

flat_storage.go
/*
 *  SETUP:
 *      import ( "github.com/go-while/nntp-storage" )
		main() {
			cachedir := "/mnt/cache"
			bodycache, headcache := true, true
			storage.StorageCounter.Make_SC()
			storage.Make_cache_dir(cachedir, bodycache, headcache)
		 } // end main
 *
 *
*/
flat_storage_reader.go
/*
 *  SETUP:
 *      //import ( "github.com/go-while/nntp-storage" )
		main() {
			rc_debug_flag_head, rc_debug_flag_body := false, false

			var redis_pool *redis.Pool = nil
			redis_expire := 900 // redis_expire used to cache stat request in redis.
								// set redis_expire to -1 to not cache stat requests at all.
								// if set to 0 defaults to 300s
								// better have some memory for redis!

			rc_head_workers, rc_head_ncq := 100, 100000
			rc_body_workers, rc_body_ncq := 100, 100000

			storage.ReadCache.Load_Readcache(rc_head_workers, rc_head_ncq, rc_body_workers, rc_body_ncq, redis_pool, redis_expire, rc_debug_flag_head)

			if storage.ReadCache.RC_head_chan == nil {
				log.Printf("ERROR RC_head_chan=nil")
				os.Exit(1)
			}
			if storage.ReadCache.RC_body_chan == nil {
				log.Printf("ERROR RC_body_chan=nil")
				os.Exit(1)
			}

		} // end main
 *
 *
*/
flat_storage_writer.go
/*
 *  SETUP:
 *      //import ( "github.com/go-while/nntp-storage" )
		main() {
			//cachedir := "/mnt/cache"
			cache_index_logdir := "/var/log/cache_index"
			wc_debug_flag_head, wc_debug_flag_body := false, false

			headcache_workers, headcache_max := 100, 1000000
			bodycache_workers, bodycache_max := 100, 1000000

			storage.Load_Writecache(headcache_workers, headcache_max, bodycache_workers, bodycache_max, cachedir, cache_index_logdir, wc_debug_flag_head)

			if storage.WriteCache.WC_head_chan == nil {
				log.Printf("ERROR WC_head_chan=nil")
				os.Exit(1)
			}
			if storage.WriteCache.WC_body_chan == nil {
				log.Printf("ERROR WC_body_chan=nil")
				os.Exit(1)
			}

		} // end main
 *
 *
*/

xref_linker

Xref Linker creates softlinks
from 'cachedir/[head|body]/groups/grouphash/msgnum'
to 'cachedir/[head|body]/[a-f0-9]/[a-f0-9]/[a-f0-9]/messageidhash[:3].[head|body]'
xref_linker.go
/*
 *  SETUP:
 *      //import ( "github.com/go-while/nntp-storage" )
		main() {
			//cachedir := "/mnt/cache"
			workers := 1
			// syncwrites limits parallel access to filesystem for xref linking
			//  can be less than workers, more does nothing.
			//  you can have more workers pushing to redis and less parallel syncwrites
			syncwrites := workers
			queuesize := 100
			do_softlinks := true
			debug_flag := false
			var redis_pool *redis.Pool = nil

			storage.XrefLinker.Load_Xref_Linker(workers, syncwrites, do_softlinks, queuesize, cachedir, redis_pool, debug_flag)

			if storage.XrefLinker.Xref_link_chan == nil {
				log.Printf("ERROR Xref_link_chan=nil")
				os.Exit(1)
			}
		}
 *
 *
*/

Contributing

Pull requests are welcome.

For major changes, please open an issue first to discuss what you would like to change.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Author

go-while

Documentation

Index

Constants

View Source
const (
	DefaultCycBufsBaseDir                = "/mnt/cb" // without trailing slash!
	DefaultReaders, DefaultWriters int64 = 1, 1
	MinInitSize                    int64 = 1024 * 1024
	MinGrowSize                    int64 = 1024 * 1024
	DefaultSize1G                  int64 = 1024 * 1024 * 1024
	DefaultFlushEveryBytes         int64 = 64 * 1024
	DefaultFlushEverySeconds       int64 = 5
	DefaultFlushEveryMessages            = 200
	CycBufType_Head                      = 0xAAA
	CycBufType_Body                      = 0xBBB
	CycBufType_Comb                      = 0xCCC
)

Variables

View Source
var (
	DOTSTUFFING bool // default false = off. frontend reader/writer should handle it.
)

Functions

func Fallocate

func Fallocate(filePath string, offset int64, length int64) error

func Get_cache_dir

func Get_cache_dir(msgidhash string, src string) (string, string, string)

func Make_cache_dir

func Make_cache_dir(cachedir string, bodycache bool, headcache bool)

func MapRegion

func MapRegion(filePath string, minPos int64, maxPos int64) (mmap.Map, *os.File, error)

Types

type CYCBUF

type CYCBUF struct {

	// the ident of this cycbuf must be unique!
	Ident string
	Hash  string

	// location to this cycbuf as full path or relative
	// identified by Cookie string without file extension
	// Path contains: /mnt/cycbuf/{Cookie}
	// later appending .[cycbuf|cycdat]
	Path string

	// 64 chars uniq random string to identify this cycbuf
	// used as fileident.cycbuf in CycBufDir
	Cookie string

	// defines what this cycbuf stores: [head|body|comb]
	Type int

	// initial size of this cycbuf
	InitCycBufSize int64

	// total number of cycles the cycbuf has done (print with 2 decimals)
	Cycles float64

	// if Rollover == true: cycbuf will grow up to InitCycBufSize
	// and revert back to start when reaching the end overwriting old messages
	// if cycbuf has more than 1 writer: every area can rollower on its own
	Rollover bool

	// if Rollover == false: cycbuf will grow by this amount of bytes
	// if set to 0: cycbuf will not grow and server stops accepting articles
	Growby int64

	// spawn this many dedicated readers for this cycbuf
	// can be changed later without problems
	Readers int64

	// spawn this many dedicated writers for this cycbuf
	// every writer writes in own area: InitCycBufSize / Writers
	// note: it is impossible to change writers later!
	Writers int64

	// flushing options
	LastFlush          int64 // timestamp
	FlushEveryBytes    int64
	FlushEveryMessages int
	FlushEverySeconds  int64

	// counter when to flush
	CtrB int
	CtrM int
	CtrS int

	TimeOpen int64 // timestamp of opening

	//	cb.Offsets[0] contains no value
	//	cb.Offsets[1:] used by writers
	Offsets map[int]*area // key: writerID, val: offset / position in cycbuf-area
	// contains filtered or unexported fields

} // end CYCBUF struct

type CacheItem

type CacheItem struct {
	Msgidhash string
	Head      bool
	Lines     *[]string
	Bytes     *[]byte // should already have "\n" (or "\r\n") as end of line!
	Size      int
	Retchan   chan int

} // end CacheItem struct

type CycBufHandler

type CycBufHandler struct {
	Depth   int /// 0 uses routing from storage.conf, >0 puts hashs in files 0__ - f__
	BaseDir string
	CycBufs map[string]*CYCBUF // key: ident, val: pointer to a CYCBUF
	// contains filtered or unexported fields
}
var (
	// CycBufsDepth values can be:
	//	0 or 16, 256, 4096, 65536
	//
	// if CycBufsDepth == 0:
	//	uses routing from storage config
	//	routing should allow setting min/max size of articles and groups we want in this cycbuf
	//
	// if CycBufsDepth (16 || 256 || 4096):
	//	creates this many CycBufs for hashs 0__ - f__ without the use of routing
	//
	AvailableCycBufsDepths = []int{0, 16, 256, 4096, 65536}

	CBH CycBufHandler
)

func (*CycBufHandler) CreateCycBuf

func (handler *CycBufHandler) CreateCycBuf(ident string, ctype int, bufpath string, datpath string, initsize int64, rollover bool, growby int64, feb int64, fem int, fes int64, readers int64, writers int64) (*CYCBUF, error)

func (*CycBufHandler) InitCycBufs

func (handler *CycBufHandler) InitCycBufs(basedir string, depth int, initsize int64, growby int64, readers int64, writers int64, mode int) (bool, error)

func (*CycBufHandler) Load_CycBufs

func (handler *CycBufHandler) Load_CycBufs(indexfile string) bool

func (*CycBufHandler) SetOffset

func (handler *CycBufHandler) SetOffset(cb *CYCBUF, wid int, offset int64)

type RC

type RC struct {
	Debug bool

	RC_head_chan chan ReadReq
	RC_body_chan chan ReadReq
	// contains filtered or unexported fields

} // end storage.RC struct
var (
	ReadCache RC
)

func (*RC) Load_Readcache

func (rc *RC) Load_Readcache(head_rc_workers uint64, rc_head_ncq int, body_rc_workers uint64, rc_body_ncq int, redis_pool *redis.Pool, redis_expire int, debug_flag bool)

func (*RC) RC_Worker_DN

func (rc *RC) RC_Worker_DN(wType string)

func (*RC) RC_Worker_Set

func (rc *RC) RC_Worker_Set(wType string, new_maxworkers uint64)

func (*RC) RC_Worker_UP

func (rc *RC) RC_Worker_UP(wType string)

func (*RC) STOP_RC

func (rc *RC) STOP_RC()

type ReadItem

type ReadItem struct {
	Fileobj *[]byte
	Err     error

} // end storage.ReadItem struct

type ReadReq

type ReadReq struct {
	Sessionid string
	File_path string
	Cli_chan  chan *ReadItem
	Cmdstring string
	AskRedis  bool

} // end storage.ReadReq struct
type RedisLink struct {
	// contains filtered or unexported fields

} // end RedisLink struct

type SC

type SC struct {
	MapSize int
	// contains filtered or unexported fields

} // end SC struct

StorageCounter is safe to use concurrently.

var (
	StorageCounter SC
)

func (*SC) Add

func (sc *SC) Add(countername string, value uint64)

func (*SC) Checkmap

func (sc *SC) Checkmap()

func (*SC) Dec

func (sc *SC) Dec(countername string)

func (*SC) Del

func (sc *SC) Del(countername string)

func (*SC) Get

func (sc *SC) Get(countername string) uint64

func (*SC) Inc

func (sc *SC) Inc(countername string)

func (*SC) Init

func (sc *SC) Init(countername string, value uint64)

func (*SC) Make_SC

func (sc *SC) Make_SC()

func (*SC) Set

func (sc *SC) Set(countername string, value uint64)

func (*SC) Sub

func (sc *SC) Sub(countername string, value uint64)

type WC

type WC struct {
	Debug bool

	WC_body_chan           chan CacheItem
	WC_head_chan           chan CacheItem
	Log_cache_history_chan chan *[]string
	// contains filtered or unexported fields
}
var (
	WriteCache WC
)

func (*WC) Load_Writecache

func (wc *WC) Load_Writecache(head_wc_workers uint64, wc_head_cache_max int, bodycache_workers uint64, wc_body_cache_max int, cachedir string, history_logdir string, redis_pool *redis.Pool, bufio_max int, debug_flag bool)

func (*WC) STOP_WC

func (wc *WC) STOP_WC()

func (*WC) WC_Worker_DN

func (wc *WC) WC_Worker_DN(wType string)

func (*WC) WC_Worker_Set

func (wc *WC) WC_Worker_Set(wType string, new_maxworkers uint64)

func (*WC) WC_Worker_UP

func (wc *WC) WC_Worker_UP(wType string)

type XL

type XL struct {
	DirsMap map[string]bool
	MapSize int
	Debug   bool

	Xref_link_chan  chan []XREF
	Redis_link_chan chan RedisLink
	// contains filtered or unexported fields

} // end XL struct
var (
	XrefLinker XL
)

func (*XL) Load_Xref_Linker

func (c *XL) Load_Xref_Linker(maxworkers int, syncwrites int, do_softlinks bool, queue int, cachedir string, redis_pool *redis.Pool, debug_xref_linker bool) bool

type XREF

type XREF struct {
	Msgid     string
	Msgidhash string
	Group     string
	Grouphash string
	Msgnum    uint64

} // end XREF struct

XREF links: * "cachedir/[head|body]/c1/c2/c3/messageidhash[:3].[head|body]" * to cachedir/[head|body]/groups/grouphash/msgnum

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL