Documentation ¶
Index ¶
- Constants
- func NewBlockCacheComponent() internal.Component
- type Block
- type BlockCache
- func (bc *BlockCache) CloseFile(options internal.CloseFileOptions) error
- func (bc *BlockCache) Configure(_ bool) error
- func (bc *BlockCache) CreateFile(options internal.CreateFileOptions) (*handlemap.Handle, error)
- func (bc *BlockCache) DeleteDir(options internal.DeleteDirOptions) error
- func (bc *BlockCache) DeleteFile(options internal.DeleteFileOptions) error
- func (bc *BlockCache) FlushFile(options internal.FlushFileOptions) error
- func (bc *BlockCache) Name() string
- func (bc *BlockCache) OpenFile(options internal.OpenFileOptions) (*handlemap.Handle, error)
- func (bc *BlockCache) ReadInBuffer(options internal.ReadInBufferOptions) (int, error)
- func (bc *BlockCache) RenameDir(options internal.RenameDirOptions) error
- func (bc *BlockCache) RenameFile(options internal.RenameFileOptions) error
- func (bc *BlockCache) SetName(name string)
- func (bc *BlockCache) SetNextComponent(nc internal.Component)
- func (bc *BlockCache) Start(ctx context.Context) error
- func (bc *BlockCache) Stop() error
- func (bc *BlockCache) SyncFile(options internal.SyncFileOptions) error
- func (bc *BlockCache) WriteFile(options internal.WriteFileOptions) (int, error)
- type BlockCacheOptions
- type BlockPool
- type ThreadPool
Constants ¶
const ( MAX_POOL_USAGE uint32 = 80 MIN_POOL_USAGE uint32 = 50 MIN_PREFETCH = 5 MIN_WRITE_BLOCK = 3 MIN_RANDREAD = 10 MAX_FAIL_CNT = 3 MAX_BLOCKS = 50000 )
const ( BlockFlagFresh uint16 = iota BlockFlagDownloading // Block is being downloaded BlockFlagUploading // Block is being uploaded BlockFlagDirty // Block has been written and data is not persisted yet BlockFlagSynced // Block has been written and data is persisted BlockFlagFailed // Block upload/download has failed )
Various flags denoting state of a block
const ( BlockStatusDownloaded int = iota + 1 // Download of this block is complete BlockStatusUploaded // Upload of this block is complete BlockStatusDownloadFailed // Download of this block has failed BlockStatusUploadFailed // Upload of this block has failed )
Flags to denote the status of upload/download of a block
Variables ¶
This section is empty.
Functions ¶
func NewBlockCacheComponent ¶
------------------------- Factory ------------------------------------------- Pipeline will call this method to create your object, initialize your variables here << DO NOT DELETE ANY AUTO GENERATED CODE HERE >>
Types ¶
type Block ¶
type Block struct {
// contains filtered or unexported fields
}
Block is a memory mapped buffer with its state to hold data
func AllocateBlock ¶
AllocateBlock creates a new memory mapped buffer for the given size
func (*Block) Dirty ¶ added in v1.1.0
func (b *Block) Dirty()
Mark this block as dirty as it has been modified
func (*Block) NoMoreDirty ¶ added in v1.1.0
func (b *Block) NoMoreDirty()
Mark this block as dirty as it has been modified
func (*Block) Ready ¶ added in v1.1.0
Ready marks this Block is now ready for reading by its first reader (data download completed)
type BlockCache ¶
type BlockCache struct { internal.BaseComponent // contains filtered or unexported fields }
Common structure for Component
func (*BlockCache) CloseFile ¶
func (bc *BlockCache) CloseFile(options internal.CloseFileOptions) error
CloseFile: File is closed by application so release all the blocks and submit back to blockPool
func (*BlockCache) Configure ¶
func (bc *BlockCache) Configure(_ bool) error
Configure : Pipeline will call this method after constructor so that you can read config and initialize yourself
Return failure if any config is not valid to exit the process
func (*BlockCache) CreateFile ¶ added in v1.1.0
func (bc *BlockCache) CreateFile(options internal.CreateFileOptions) (*handlemap.Handle, error)
CreateFile: Create a new file
func (*BlockCache) DeleteDir ¶ added in v1.1.0
func (bc *BlockCache) DeleteDir(options internal.DeleteDirOptions) error
DeleteDir: Recursively invalidate the directory and its children
func (*BlockCache) DeleteFile ¶ added in v1.1.0
func (bc *BlockCache) DeleteFile(options internal.DeleteFileOptions) error
DeleteFile: Invalidate the file in local cache.
func (*BlockCache) FlushFile ¶ added in v1.1.0
func (bc *BlockCache) FlushFile(options internal.FlushFileOptions) error
FlushFile: Flush the local file to storage
func (*BlockCache) Name ¶
func (bc *BlockCache) Name() string
func (*BlockCache) OpenFile ¶
func (bc *BlockCache) OpenFile(options internal.OpenFileOptions) (*handlemap.Handle, error)
OpenFile: Create a handle for the file user has requested to open
func (*BlockCache) ReadInBuffer ¶
func (bc *BlockCache) ReadInBuffer(options internal.ReadInBufferOptions) (int, error)
ReadInBuffer: Read the file into a buffer
func (*BlockCache) RenameDir ¶ added in v1.1.0
func (bc *BlockCache) RenameDir(options internal.RenameDirOptions) error
RenameDir: Recursively invalidate the source directory and its children
func (*BlockCache) RenameFile ¶ added in v1.1.0
func (bc *BlockCache) RenameFile(options internal.RenameFileOptions) error
RenameFile: Invalidate the file in local cache.
func (*BlockCache) SetName ¶
func (bc *BlockCache) SetName(name string)
func (*BlockCache) SetNextComponent ¶
func (bc *BlockCache) SetNextComponent(nc internal.Component)
func (*BlockCache) Start ¶
func (bc *BlockCache) Start(ctx context.Context) error
Start : Pipeline calls this method to start the component functionality
this shall not Block the call otherwise pipeline will not start
func (*BlockCache) Stop ¶
func (bc *BlockCache) Stop() error
Stop : Stop the component functionality and kill all threads started
func (*BlockCache) SyncFile ¶ added in v1.1.0
func (bc *BlockCache) SyncFile(options internal.SyncFileOptions) error
func (*BlockCache) WriteFile ¶ added in v1.1.0
func (bc *BlockCache) WriteFile(options internal.WriteFileOptions) (int, error)
WriteFile: Write to the local file
type BlockCacheOptions ¶
type BlockCacheOptions struct { BlockSize float64 `config:"block-size-mb" yaml:"block-size-mb,omitempty"` MemSize uint64 `config:"mem-size-mb" yaml:"mem-size-mb,omitempty"` TmpPath string `config:"path" yaml:"path,omitempty"` DiskSize uint64 `config:"disk-size-mb" yaml:"disk-size-mb,omitempty"` DiskTimeout uint32 `config:"disk-timeout-sec" yaml:"timeout-sec,omitempty"` PrefetchCount uint32 `config:"prefetch" yaml:"prefetch,omitempty"` Workers uint32 `config:"parallelism" yaml:"parallelism,omitempty"` PrefetchOnOpen bool `config:"prefetch-on-open" yaml:"prefetch-on-open,omitempty"` }
Structure defining your config parameters
type BlockPool ¶
type BlockPool struct {
// contains filtered or unexported fields
}
BlockPool is a pool of Blocks
func NewBlockPool ¶
NewBlockPool allocates a new pool of blocks
type ThreadPool ¶
type ThreadPool struct {
// contains filtered or unexported fields
}
ThreadPool is a group of workers that can be used to execute a task
func (*ThreadPool) Do ¶
func (t *ThreadPool) Do(priority bool)
Do is the core task to be executed by each worker thread
func (*ThreadPool) Schedule ¶
func (t *ThreadPool) Schedule(urgent bool, item *workItem)
Schedule the download of a block
func (*ThreadPool) Start ¶
func (t *ThreadPool) Start()
Start all the workers and wait till they start receiving requests