Documentation ¶
Index ¶
- Constants
- Variables
- func Lock(b []byte) error
- func Madvise(b []byte) error
- func MemAlign(blockSize uint) ([]byte, error)
- func MemAlignWithBase(blockSize, alignSize uint) ([]byte, error)
- func Mmap(fd *os.File, offset int64, length int, writable bool) ([]byte, error)
- func Munmap(b []byte) error
- func OpenFileWithDIO(name string, flag int, perm os.FileMode) (*os.File, error)
- func Sync(b []byte) error
- func Unlock(b []byte) error
- type AIOMode
- type AsyncIO
- func (aio *AsyncIO) Append(bs [][]byte) (int, error)
- func (aio *AsyncIO) Close() error
- func (aio *AsyncIO) FLock() error
- func (aio *AsyncIO) FUnlock() error
- func (aio *AsyncIO) Fd() uintptr
- func (aio *AsyncIO) IsDone(id RequestID) (bool, error)
- func (aio *AsyncIO) Option() Options
- func (aio *AsyncIO) Read(b []byte) (int, error)
- func (aio *AsyncIO) ReadAt(b []byte, offset int64) (int, error)
- func (aio *AsyncIO) Seek(offset int64, whence int) (int64, error)
- func (aio *AsyncIO) Stat() (os.FileInfo, error)
- func (aio *AsyncIO) Sync() error
- func (aio *AsyncIO) Truncate(size int64) error
- func (aio *AsyncIO) WaitFor(id RequestID) (int, error)
- func (aio *AsyncIO) Write(b []byte) (int, error)
- func (aio *AsyncIO) WriteAt(b []byte, offset int64) (int, error)
- func (aio *AsyncIO) WriteAtv(bs [][]byte, offset int64) (int, error)
- type Buffers
- type ConcurrentMap
- func (m ConcurrentMap) Count() int
- func (m ConcurrentMap) Get(key string) (interface{}, bool)
- func (m ConcurrentMap) GetShard(key string) *ConcurrentMapShared
- func (m ConcurrentMap) Has(key string) bool
- func (m ConcurrentMap) IsEmpty() bool
- func (m ConcurrentMap) Items() map[string]interface{}
- func (m ConcurrentMap) Iter() <-chan Tuple
- func (m ConcurrentMap) IterBuffered() <-chan Tuple
- func (m ConcurrentMap) IterCb(fn IterCb)
- func (m ConcurrentMap) Keys() []string
- func (m ConcurrentMap) MSet(data map[string]interface{})
- func (m ConcurrentMap) MarshalJSON() ([]byte, error)
- func (m ConcurrentMap) Pop(key string) (v interface{}, exists bool)
- func (m ConcurrentMap) RandomPop() (k string, v interface{}, exists bool)
- func (m ConcurrentMap) Remove(key string)
- func (m ConcurrentMap) RemoveCb(key string, cb RemoveCb) bool
- func (m ConcurrentMap) Set(key string, value interface{})
- func (m ConcurrentMap) SetIfAbsent(key string, value interface{}) bool
- func (m ConcurrentMap) Upsert(key string, value interface{}, cb UpsertCb) (res interface{})
- type ConcurrentMapShared
- type DirectIO
- type Event
- type File
- type FileIO
- type FileLock
- type FileLockMode
- type IOContext
- type IOMode
- type Iocb
- func (iocb *Iocb) OpCode() IocbCmd
- func (iocb *Iocb) PrepFDSync()
- func (iocb *Iocb) PrepFSync()
- func (iocb *Iocb) PrepPread(buf []byte, bufLen int, offset int64)
- func (iocb *Iocb) PrepPreadv(bs [][]byte, offset int64)
- func (iocb *Iocb) PrepPwrite(buf []byte, bufLen int, offset int64)
- func (iocb *Iocb) PrepPwritev(bs [][]byte, offset int64)
- func (iocb *Iocb) SetEventFd(eventfd int)
- type IocbCmd
- type IterCb
- type MemoryMap
- func (mmap *MemoryMap) Append(bs [][]byte) (int, error)
- func (mmap *MemoryMap) Close() error
- func (mmap *MemoryMap) FLock() (err error)
- func (mmap *MemoryMap) FUnlock() error
- func (mmap *MemoryMap) Option() Options
- func (mmap *MemoryMap) ReadAt(b []byte, off int64) (int, error)
- func (mmap *MemoryMap) WriteAtv(bs [][]byte, off int64) (int, error)
- type Options
- type RemoveCb
- type RequestID
- type Timespec
- type Tuple
- type UpsertCb
Constants ¶
const ( // AlignSize size to align the buffer AlignSize = 512 // BlockSize direct IO minimum number of bytes to write BlockSize = 4096 )
Variables ¶
var ( ErrNotInit = errors.New("Not initialized") ErrWaitAllFailed = errors.New("Failed to wait for all requests to complete") ErrNilCallback = errors.New("The kernel returned a nil callback iocb structure") ErrUntrackedEventKey = errors.New("The kernel returned an event key we weren't tracking") ErrInvalidEventPtr = errors.New("The kernel returned an invalid callback event pointer") ErrReqIDNotFound = errors.New("The requestID not found") ErrNotDone = errors.New("Request not finished") )
var DefaultOptions = Options{ IOEngine: StandardIO, Flag: os.O_RDWR | os.O_CREATE | os.O_SYNC, Perm: 0644, FileLock: None, MmapSize: 1<<30 - 1, MmapWritable: false, AIO: Libaio, AIOQueueDepth: 1024, AIOTimeout: 0, }
DefaultOptions is recommended options, you can modify these to suit your needs.
var ShardCount = 32
ShardCount map shard count
Functions ¶
func MemAlignWithBase ¶
MemAlignWithBase like linux posix_memalign. block start address must be a multiple of AlignSize. block size also must be a multiple of AlignSize.
func OpenFileWithDIO ¶
OpenFileWithDIO open files with O_DIRECT flag
Types ¶
type AsyncIO ¶
AsyncIO async IO maybe we can implement a simplified posix file system by implement an own disk allocator? Give it a try?
func (*AsyncIO) Sync ¶
Sync will wait for all submitted jobs to finish and then sync the file descriptor. Because the Linux kernel does not actually support Sync via the AIO interface we just issue a plain old sync via userland. No async here. Sync don't ack outstanding requests
func (*AsyncIO) Truncate ¶
Truncate will wait for all submitted jobs to finish trunctate the file to the designated size.
type Buffers ¶
type Buffers [][]byte
Buffers contains zero or more runs of bytes to write. this is applied to readv, writev, preadv, pwritev.
type ConcurrentMap ¶
type ConcurrentMap []*ConcurrentMapShared
ConcurrentMap A "thread" safe map of type string:Anything. To avoid lock bottlenecks this map is dived to several (ShardCount) map shards.
func NewConcurrentMap ¶
func NewConcurrentMap() ConcurrentMap
NewConcurrentMap Creates a new concurrent map.
func (ConcurrentMap) Count ¶
func (m ConcurrentMap) Count() int
Count returns the number of elements within the map.
func (ConcurrentMap) Get ¶
func (m ConcurrentMap) Get(key string) (interface{}, bool)
Get retrieves an element from map under given key.
func (ConcurrentMap) GetShard ¶
func (m ConcurrentMap) GetShard(key string) *ConcurrentMapShared
GetShard returns shard under given key
func (ConcurrentMap) Has ¶
func (m ConcurrentMap) Has(key string) bool
Has Looks up an item under specified key
func (ConcurrentMap) IsEmpty ¶
func (m ConcurrentMap) IsEmpty() bool
IsEmpty checks if map is empty.
func (ConcurrentMap) Items ¶
func (m ConcurrentMap) Items() map[string]interface{}
Items returns all items as map[string]interface{}
func (ConcurrentMap) Iter ¶
func (m ConcurrentMap) Iter() <-chan Tuple
Iter returns an iterator which could be used in a for range loop. Deprecated: using IterBuffered() will get a better performence
func (ConcurrentMap) IterBuffered ¶
func (m ConcurrentMap) IterBuffered() <-chan Tuple
IterBuffered returns a buffered iterator which could be used in a for range loop.
func (ConcurrentMap) IterCb ¶
func (m ConcurrentMap) IterCb(fn IterCb)
IterCb Callback based iterator, cheapest way to read all elements in a map.
func (ConcurrentMap) Keys ¶
func (m ConcurrentMap) Keys() []string
Keys returns all keys as []string
func (ConcurrentMap) MarshalJSON ¶
func (m ConcurrentMap) MarshalJSON() ([]byte, error)
MarshalJSON reviles ConcurrentMap "private" variables to json marshal.
func (ConcurrentMap) Pop ¶
func (m ConcurrentMap) Pop(key string) (v interface{}, exists bool)
Pop removes an element from the map and returns it
func (ConcurrentMap) RandomPop ¶
func (m ConcurrentMap) RandomPop() (k string, v interface{}, exists bool)
RandomPop random removes an element from the map and returns it
func (ConcurrentMap) Remove ¶
func (m ConcurrentMap) Remove(key string)
Remove removes an element from the map.
func (ConcurrentMap) RemoveCb ¶
func (m ConcurrentMap) RemoveCb(key string, cb RemoveCb) bool
RemoveCb locks the shard containing the key, retrieves its current value and calls the callback with those params If callback returns true and element exists, it will remove it from the map Returns the value returned by the callback (even if element was not present in the map)
func (ConcurrentMap) Set ¶
func (m ConcurrentMap) Set(key string, value interface{})
Set the given value under the specified key.
func (ConcurrentMap) SetIfAbsent ¶
func (m ConcurrentMap) SetIfAbsent(key string, value interface{}) bool
SetIfAbsent Sets the given value under the specified key if no value was associated with it.
func (ConcurrentMap) Upsert ¶
func (m ConcurrentMap) Upsert(key string, value interface{}, cb UpsertCb) (res interface{})
Upsert Insert or Update - updates existing element or inserts a new one using UpsertCb
type ConcurrentMapShared ¶
type ConcurrentMapShared struct { // contains filtered or unexported fields }
ConcurrentMapShared A "thread" safe string to anything map.
type DirectIO ¶
DirectIO dio mode
type File ¶
type File interface { // Fd returns the Unix fd or Windows handle referencing the open file. // The fd is valid only until f.Close is called or f is garbage collected. Fd() uintptr // Stat returns the FileInfo structure describing file. // The MMap mode returns the native file state instead of the memory slice. Stat() (os.FileInfo, error) // Read reads up to len(b) bytes from the File. // It returns the number of bytes read and any error encountered. // At end of file, Read returns io.EOF. Read(b []byte) (int, error) // ReadAt reads len(b) bytes from the File starting at byte offset off. // It returns the number of bytes read and the error, if any. // ReadAt always returns a non-nil error when n < len(b). // At end of file, that error is io.EOF. ReadAt(b []byte, off int64) (int, error) // Write writes len(b) bytes to the File. // It returns the number of bytes written and an error, if any. // Write returns a non-nil error when n != len(b). Write(b []byte) (int, error) // WriteAt writes len(b) bytes to the File starting at byte offset off. // It returns the number of bytes written and an error, if any. // WriteAt returns a non-nil error when n != len(b). WriteAt(b []byte, off int64) (int, error) // WriteAtv write multiple discrete discontinuous mem block // on AIO mode, it's impled by pwritev syscall // on other mode, it's impled by multi call pwrite syscall WriteAtv(bs [][]byte, off int64) (int, error) // Append write data at the end of file // We do not guarantee atomicity of concurrent append writes. // Note: we should avoid O_APPEND here due to ta the following bug: // POSIX requires that opening a file with the O_APPEND flag should // have no affect on the location at which pwrite() writes data. // However, on Linux, if a file is opened with O_APPEND, pwrite() // appends data to the end of the file, regardless of the value of // offset. on darwin, there is no this Bug. // More info here: https://linux.die.net/man/2/pwrite Append(bs [][]byte) (int, error) // Seek sets the offset for the next Read or Write on file to offset, interpreted // according to whence: 0 means relative to the origin of the file, 1 means // relative to the current offset, and 2 means relative to the end. // It returns the new offset and an error, if any. // The behavior of Seek on a file opened with O_APPEND is not specified. Seek(offset int64, whence int) (int64, error) // Truncate changes the size of the file. // It does not change the I/O offset. // If there is an error, it will be of type *PathError. Truncate(size int64) error // FLock the lock is suggested and exclusive FLock() error // FUnlock unlock the file lock // it will be atomic release when file close. FUnlock() error // Sync commits the current contents of the file to stable storage. // Typically, this means flushing the file system's in-memory copy // of recently written data to disk. Sync() error // Close closes the File, rendering it unusable for I/O. Close() error // Option return IO engine options Option() Options }
File a unified common file operation interface
type FileIO ¶
FileIO Standrad I/O mode
type FileLock ¶
type FileLock struct {
// contains filtered or unexported fields
}
FileLock holds a lock on a hidden file inside.
func NewFileLock ¶
NewFileLock return internal file lock example: The file lock format for the file name 'test' is .test-flock
type FileLockMode ¶
type FileLockMode int
FileLockMode specifies file lock mode, default None.
const ( // None indicates that open file without file lock None FileLockMode = iota // ReadWrite indicates that open file with file rwlock ReadWrite // ReadOnly indicates that open file with file rlock ReadOnly )
type Iocb ¶
type Iocb struct { Data unsafe.Pointer Key uint64 Opcode int16 Prio int16 Fd uint32 Buf unsafe.Pointer Nbytes uint64 Offset int64 Flags uint32 Resfd uint32 // contains filtered or unexported fields }
func (*Iocb) PrepFDSync ¶
func (iocb *Iocb) PrepFDSync()
func (*Iocb) PrepPreadv ¶
func (*Iocb) PrepPwritev ¶
func (*Iocb) SetEventFd ¶
type IterCb ¶
type IterCb func(key string, v interface{})
IterCb Iterator callback,called for every key,value found in maps. RLock is held for all calls for a given shard therefore callback sess consistent view of a shard, but not across the shards
type MemoryMap ¶
MemoryMap disk IO mode page faults and dirty page writes can degrade mmap performance we impl ReadAt by mmap, other API impled by standardIO.
func (*MemoryMap) FLock ¶
FLock a file lock is a recommended lock. if file lock not init, we will init once
type Options ¶
type Options struct { // IOEngine io mode IOEngine IOMode // Flag the file open mode Flag int // Perm the file perm Perm os.FileMode // FileLock file lock mode, default none FileLock FileLockMode // MmapSize mmap file size in memory MmapSize int // MmapWritable whether to allow mmap write // if true, it will be use mmap write instead of standardIO write, not implemented yet. MmapWritable bool // AIO async IO mode, defaul libaio, the io_uring isn't implemented yet. AIO AIOMode // AIOQueueDepth libaio max events, it's also use to control client IO number. AIOQueueDepth int // AIOTimeout unit ms, libaio timeout, 0 means no timeout. AIOTimeout int }
Options are params for creating IOEngine.
type RemoveCb ¶
RemoveCb is a callback executed in a map.RemoveCb() call, while Lock is held If returns true, the element will be removed from the map
type Tuple ¶
type Tuple struct { Key string Val interface{} }
Tuple used by the Iter & IterBuffered functions to wrap two variables together over a channel,
type UpsertCb ¶
type UpsertCb func(exist bool, valueInMap interface{}, newValue interface{}) interface{}
UpsertCb Callback to return new element to be inserted into the map It is called while lock is held, therefore it MUST NOT try to access other keys in same map, as it can lead to deadlock since Go sync.RWLock is not reentrant