drs

package
v0.0.0-...-e06a828 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 24, 2016 License: GPL-3.0 Imports: 11 Imported by: 1

Documentation

Overview

Package drs provides scheduling to optimise parallel reading of multiple files from rotational hard drives. It does this by limiting number of open files, and by reading files in order of disk offsets. For ssd's, drs offers convenient limitation of the number of open file handle to prevent "too many open files" errors.

Index

Constants

This section is empty.

Variables

View Source
var Aggressive = DiskConfig{
	Read:    2,
	Window:  10,
	Process: 20 + runtime.NumCPU(),
	Ahead:   2 * 1024 * 1024,
	Behind:  512 * 1024,
	Lazy:    1.0,
}
View Source
var HDD = DiskConfig{
	Read:    1,
	Window:  0,
	Process: 3 + runtime.NumCPU(),
	Ahead:   0,
	Behind:  0,
	Lazy:    0.5,
}
View Source
var SSD = DiskConfig{
	Read:    runtime.NumCPU() * 2,
	Window:  0,
	Process: runtime.NumCPU() * 4,
	Ahead:   0,
	Behind:  0,
	Lazy:    -1.0,
}

Functions

func Close

func Close()

Close closes all disks

func ClosePool

func ClosePool() (used int, err error)

ClosePool closes the Bufferpool and returns the number of buffers used

func Copy

func Copy(dst io.Writer, src *File) (written int64, err error)

Copy is similar to io.Copy except that it signals done as soon as reading has finished. It copies from src to dst until either EOF is reached on src or an error occurs. It returns the number of bytes copied and the first error encountered while copying, if any.

A successful Copy returns err == nil, not err == EOF. Because Copy is defined to read from src until EOF, it does not treat an EOF from Read as an error to be reported.

Copy always uses src.Read() and dst.Write() so as to be able to identify the end of reading.

func CopyN

func CopyN(dst io.Writer, src *File, n int64) (written int64, err error)

CopyN is similar to io.CopyN except that it closes src and signals done as soon as reading has finished. It copies n bytes (or until an error) from src to dst. It returns the number of bytes copied and the earliest error encountered while copying. On return, written == n if only if err == nil.

func InitPool

func InitPool(bufsize int, maxbufs int)

InitPool initialises the buffer pool. It takes arguments bufsize (buffer size in bytes) and maxbufs (a hard limit on the number of buffers created)

func OffsetF

func OffsetF(f *os.File, seek int64, whence int) (physical uint64, logical uint64, size int64, err error)

OffsetF returns the physical offset (relative to disk start) of the data at the specified relative position in an open file

func OffsetOf

func OffsetOf(path string, seek int64, whence int) (physical uint64, logical uint64, size int64, err error)

OffsetOf returns the physical offset (relative to disk start) of the data at the specified position within a file associated with a path

func Pause

func Pause()

Pause stops all disks from launching new jobs

func PutBuf

func PutBuf(b Buf)

PutBuf returns a buffer to the pool

func RegisterSSDs

func RegisterSSDs(roots []string) error

func Resume

func Resume()

Resume enables all disks to launch new jobs

func Wait

func Wait()

Wait waits until there are no unfinished scheduled jobs

func Walk

func Walk(roots []string, opts *WalkOptions) <-chan *Path

Walk returns a path channel and walks (concurrently) all paths under each root folder in []roots, sending all regular files encountered to the channel, which is closed once all walks have completed. Walking the same file or folder twice is avoided, so for example

ch := Walk(e, {"/foo", "/foo/bar"}, options, disk)

will only walk the files in /foo/bar once. Errors are returned via errc (if provided). TODO: use filepath/Walk compatible callback instead of errc (need to make threadsafe)

Types

type Buf

type Buf []byte

Buf is a reusable byte buffer

func GetBuf

func GetBuf() Buf

GetBuf gets a buffer from the pool

type Disk

type Disk struct {
	// contains filtered or unexported fields
}

A Disk schedules read operations for files. It shoulds be created using NewDisk. A call to Disk.Start() is required before the disk will allow associated files to start reading data.

Example usage:

d := hddreader.NewDisk(1, 0, 0, 0, 100, 64)
for _, p := range(paths) {
        wg.Add(1)
        go read_stuff_from(p, &wg)
        // calls hddreader.OpenFile() and file.Read() but will block until d.Start() called
}
d.Start()  // enables reading and will unblock pending Read() calls in disk offset order
wg.Wait()
d.Close()

func GetDisk

func GetDisk(id uint64, ssd bool) *Disk

GetDisk gets the disk corresponding to a diskID, creating one if necessary

func NewDisk

func NewDisk(ssd bool) *Disk

NewDisk creates a new disk object to schedule read operations in order of increasing physical offset on the disk.

Up to maxread files will be read concurrently. An additional up to maxwindow files may be opened for reading if they are within (-behind,+ahead) bytes of the current head position. An additional up to maxopen files may be open for the sole purpose of reading disk offset.

If bufkB > 0 then an internal buffer pool is created to buffer WriteTo() calls. Note that while read order is preserved, buffering may change the order in which WriteTo(w) calls complete, depending on speed at which buffers are written to w.

func (*Disk) AddDir

func (d *Disk) AddDir(i id, path string) error

AddDir checks for the presence of a dir in d.dirs. If notfound then adds it and returns true, else returns false.

func (*Disk) Close

func (d *Disk) Close()

Close process all pending requests and then closes the disk scheduler and frees buffer memory.

func (*Disk) Pause

func (d *Disk) Pause()

Pause prevents any additional jobs from being started

func (*Disk) Resume

func (d *Disk) Resume()

Resume restarts a paused disk

func (*Disk) Schedule

func (d *Disk) Schedule(j Job, path string, offset uint64, priority Priority)

Schedule adds a job to the disk's queue.

func (*Disk) Wait

func (d *Disk) Wait()

Wait waits until there are not unfinished or scheduled jobs.

type DiskConfig

type DiskConfig struct {
	Read    int
	Window  int
	Process int
	Ahead   int64
	Behind  int64
	Lazy    float32
}

type File

type File struct {
	*os.File
	// contains filtered or unexported fields
}

File wraps an os.File object with a custom Close() command that signals to disk

func (*File) Close

func (f *File) Close() error

type Job

type Job interface {
	// Go is called by drs as a goroutine.  As soon as it has finished reading file
	// data, it should close the files and send read <- Token{}.  It can then continue
	// on with other tasks (eg processing the data).
	Go(f *File, err error)
}

Job is the interface for the calling routine to process files scheduled for reading.

type Path

type Path struct {
	Name   string
	Offset uint64
	Depth  int
	Info   os.FileInfo

	Disk *Disk
	// contains filtered or unexported fields
}

Path implements the drs.Job interface

func (*Path) Go

func (p *Path) Go(dir *File, err error)

Go recurses into a directory, sending any files found and recursing any directories

func (*Path) Report

func (p *Path) Report(e error)

Report sends erros to error channel

func (*Path) Send

func (p *Path) Send()

Send sends path to results channel

type Priority

type Priority int

Priority can be used to prioritise jobs. No lower priority jobs will be processed until all higher priority jobs are done.

const (
	// High is the highest priority
	High Priority = iota
	Normal
	Low
	PriorityCount // Sentinel
)

type Token

type Token struct{}

Token is a convenience type for signalling channels

type TokenReturn

type TokenReturn chan Token

func (TokenReturn) Done

func (t TokenReturn) Done()

type WalkOptions

type WalkOptions struct {
	SeeRootLinks bool       // if false, if root paths are symlinks they will be ignored
	SeeLinks     bool       // if false, symlinks will be ignored (except root paths)
	FollowLinks  bool       // if false, walk returns symlinks as symlinks; if true it returns their targets
	SeeDotFiles  bool       // if false, ignores "." and ".." entries // TODO:
	HiddenDirs   bool       // if false, won't descend into folders starting with '.'
	HiddenFiles  bool       // if false, won't return files starting with '.'
	ReturnDirs   bool       // if false, doesn't return dir paths
	NoRecurse    bool       // if false, walks passed root dirs recursively
	OneDevice    bool       // if false, walk will cross filesystem boundaries TODO
	MaxDepth     int        // if recursing dirs, limit depth (1 = files in root dirs; 0 = no limit).
	Priority     Priority   // drs schedule priority for walk takss
	Errs         chan error // optional channel to return walk errors; if nil then ignores errors
	// contains filtered or unexported fields
}

WalkOptions provides convenient commonly used filters for file walks. The struct is designed so that zero-value defaults gives a fairly typical walk configuration

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL