Documentation ¶
Index ¶
- Constants
- Variables
- func CacheOriginalData(r io.Reader, data []byte, allocator CacheDataAllocator) (cacheData fscache.Data, err error)
- func DoWithRetry[T any](what string, fn func() (T, error), maxAttemps int, ...) (res T, err error)
- func EvictDiskCaches() map[string]int64
- func EvictMemoryCaches() map[string]int64
- func Get[T any](fs FileService, name string) (res T, err error)
- func HandleRemoteRead(ctx context.Context, fs FileService, req *query.Request, ...) error
- func IsRetryableError(err error) bool
- func JoinPath(serviceName string, path string) string
- func LogEvent(ctx context.Context, ev stringRef, args ...any)
- func LogSlowEvent(ctx context.Context, threshold time.Duration)
- func NewFileWithChecksumOSFile(ctx context.Context, underlying *os.File, blockContentSize int, ...) (*FileWithChecksum[*os.File], PutBack[*FileWithChecksum[*os.File]])
- func OnDiskCacheEvict(ctx context.Context, fn OnDiskCacheEvictFunc) (ret context.Context)
- func OnDiskCacheWritten(ctx context.Context, fn OnDiskCacheWrittenFunc) (ret context.Context)
- func WithEventLogger(ctx context.Context) context.Context
- type AliyunSDK
- func (a *AliyunSDK) Delete(ctx context.Context, keys ...string) (err error)
- func (a *AliyunSDK) Exists(ctx context.Context, key string) (bool, error)
- func (a *AliyunSDK) List(ctx context.Context, prefix string, fn func(bool, string, int64) (bool, error)) error
- func (a *AliyunSDK) Read(ctx context.Context, key string, min *int64, max *int64) (r io.ReadCloser, err error)
- func (a *AliyunSDK) Stat(ctx context.Context, key string) (size int64, err error)
- func (a *AliyunSDK) Write(ctx context.Context, key string, r io.Reader, size int64, expire *time.Time) (err error)
- type AwsSDKv1
- func (a *AwsSDKv1) Delete(ctx context.Context, keys ...string) (err error)
- func (a *AwsSDKv1) Exists(ctx context.Context, key string) (bool, error)
- func (a *AwsSDKv1) List(ctx context.Context, prefix string, fn func(bool, string, int64) (bool, error)) error
- func (a *AwsSDKv1) Read(ctx context.Context, key string, min *int64, max *int64) (r io.ReadCloser, err error)
- func (a *AwsSDKv1) Stat(ctx context.Context, key string) (size int64, err error)
- func (a *AwsSDKv1) Write(ctx context.Context, key string, r io.Reader, size int64, expire *time.Time) (err error)
- type AwsSDKv2
- func (a *AwsSDKv2) Delete(ctx context.Context, keys ...string) (err error)
- func (a *AwsSDKv2) Exists(ctx context.Context, key string) (bool, error)
- func (a *AwsSDKv2) List(ctx context.Context, prefix string, fn func(bool, string, int64) (bool, error)) error
- func (a *AwsSDKv2) Read(ctx context.Context, key string, min *int64, max *int64) (r io.ReadCloser, err error)
- func (a *AwsSDKv2) Stat(ctx context.Context, key string) (size int64, err error)
- func (a *AwsSDKv2) Write(ctx context.Context, key string, r io.Reader, size int64, expire *time.Time) (err error)
- type Bytes
- type CacheCallbackFunc
- type CacheCallbacks
- type CacheConfig
- type CacheDataAllocator
- type CachingFileService
- type Config
- type CostAttr
- type CostItem
- type DirEntry
- type DiskCache
- func (d *DiskCache) Close()
- func (d *DiskCache) DeletePaths(ctx context.Context, paths []string) error
- func (d *DiskCache) Evict(done chan int64)
- func (d *DiskCache) Flush()
- func (d *DiskCache) Read(ctx context.Context, vector *IOVector) (err error)
- func (d *DiskCache) SetFile(ctx context.Context, path string, ...) error
- func (d *DiskCache) Update(ctx context.Context, vector *IOVector, async bool) (err error)
- type DiskCacheCallbacks
- type ETLFileService
- type FileCache
- type FileLike
- type FileService
- type FileServices
- func (f *FileServices) Close()
- func (f *FileServices) Cost() *CostAttr
- func (f *FileServices) Delete(ctx context.Context, filePaths ...string) error
- func (f *FileServices) List(ctx context.Context, dirPath string) ([]DirEntry, error)
- func (f *FileServices) Name() string
- func (f *FileServices) PrefetchFile(ctx context.Context, filePath string) error
- func (f *FileServices) Read(ctx context.Context, vector *IOVector) error
- func (f *FileServices) ReadCache(ctx context.Context, vector *IOVector) error
- func (f *FileServices) StatFile(ctx context.Context, filePath string) (*DirEntry, error)
- func (f *FileServices) Write(ctx context.Context, vector IOVector) error
- type FileWithChecksum
- func (f *FileWithChecksum[T]) Read(buf []byte) (n int, err error)
- func (f *FileWithChecksum[T]) ReadAt(buf []byte, offset int64) (n int, err error)
- func (f *FileWithChecksum[T]) Seek(offset int64, whence int) (int64, error)
- func (f *FileWithChecksum[T]) Write(buf []byte) (n int, err error)
- func (f *FileWithChecksum[T]) WriteAt(buf []byte, offset int64) (n int, err error)
- type IOEntry
- type IOMergeKey
- type IOMerger
- type IOVector
- type IOVectorCache
- type KeyRouterFactory
- type LocalETLFS
- func (l *LocalETLFS) Close()
- func (l *LocalETLFS) Cost() *CostAttr
- func (l *LocalETLFS) Delete(ctx context.Context, filePaths ...string) error
- func (l *LocalETLFS) ETLCompatible()
- func (l *LocalETLFS) List(ctx context.Context, dirPath string) (ret []DirEntry, err error)
- func (l *LocalETLFS) Name() string
- func (l *LocalETLFS) NewMutator(ctx context.Context, filePath string) (Mutator, error)
- func (l *LocalETLFS) PrefetchFile(ctx context.Context, filePath string) error
- func (l *LocalETLFS) Read(ctx context.Context, vector *IOVector) error
- func (l *LocalETLFS) ReadCache(ctx context.Context, vector *IOVector) error
- func (l *LocalETLFS) StatFile(ctx context.Context, filePath string) (*DirEntry, error)
- func (l *LocalETLFS) Write(ctx context.Context, vector IOVector) error
- type LocalETLFSMutator
- type LocalFS
- func (l *LocalFS) AllocateCacheData(size int) fscache.Data
- func (l *LocalFS) Close()
- func (l *LocalFS) Cost() *CostAttr
- func (l *LocalFS) Delete(ctx context.Context, filePaths ...string) error
- func (l *LocalFS) FlushCache()
- func (l *LocalFS) List(ctx context.Context, dirPath string) (ret []DirEntry, err error)
- func (l *LocalFS) Name() string
- func (l *LocalFS) NewMutator(ctx context.Context, filePath string) (Mutator, error)
- func (l *LocalFS) PrefetchFile(ctx context.Context, filePath string) error
- func (l *LocalFS) Read(ctx context.Context, vector *IOVector) (err error)
- func (l *LocalFS) ReadCache(ctx context.Context, vector *IOVector) (err error)
- func (l *LocalFS) Replace(ctx context.Context, vector IOVector) error
- func (l *LocalFS) SetAsyncUpdate(b bool)
- func (l *LocalFS) StatFile(ctx context.Context, filePath string) (*DirEntry, error)
- func (l *LocalFS) Write(ctx context.Context, vector IOVector) error
- type LocalFSMutator
- type MemCache
- func (m *MemCache) Close()
- func (m *MemCache) DeletePaths(ctx context.Context, paths []string) error
- func (m *MemCache) Evict(done chan int64)
- func (m *MemCache) Flush()
- func (m *MemCache) Read(ctx context.Context, vector *IOVector) (err error)
- func (m *MemCache) Update(ctx context.Context, vector *IOVector, async bool) error
- type MemoryFS
- func (m *MemoryFS) Close()
- func (m *MemoryFS) Cost() *CostAttr
- func (m *MemoryFS) Delete(ctx context.Context, filePaths ...string) error
- func (m *MemoryFS) ETLCompatible()
- func (m *MemoryFS) List(ctx context.Context, dirPath string) (entries []DirEntry, err error)
- func (m *MemoryFS) Name() string
- func (m *MemoryFS) PrefetchFile(ctx context.Context, filePath string) error
- func (m *MemoryFS) Read(ctx context.Context, vector *IOVector) (err error)
- func (m *MemoryFS) ReadCache(ctx context.Context, vector *IOVector) (err error)
- func (m *MemoryFS) Replace(ctx context.Context, vector IOVector) error
- func (m *MemoryFS) StatFile(ctx context.Context, filePath string) (*DirEntry, error)
- func (m *MemoryFS) Write(ctx context.Context, vector IOVector) error
- type MinioSDK
- func (a *MinioSDK) Delete(ctx context.Context, keys ...string) (err error)
- func (a *MinioSDK) Exists(ctx context.Context, key string) (bool, error)
- func (a *MinioSDK) List(ctx context.Context, prefix string, fn func(bool, string, int64) (bool, error)) error
- func (a *MinioSDK) Read(ctx context.Context, key string, min *int64, max *int64) (r io.ReadCloser, err error)
- func (a *MinioSDK) Stat(ctx context.Context, key string) (size int64, err error)
- func (a *MinioSDK) Write(ctx context.Context, key string, r io.Reader, size int64, expire *time.Time) (err error)
- type MutableFileService
- type Mutator
- type NewFileServicesFunc
- type ObjectStorage
- type ObjectStorageArguments
- type OnDiskCacheEvictFunc
- type OnDiskCacheWrittenFunc
- type Path
- type Policy
- type Pool
- type PutBack
- type RemoteCache
- func (r *RemoteCache) DeletePaths(ctx context.Context, paths []string) error
- func (r *RemoteCache) Evict(done chan int64)
- func (r *RemoteCache) Flush()
- func (r *RemoteCache) Read(ctx context.Context, vector *IOVector) error
- func (r *RemoteCache) Update(ctx context.Context, vector *IOVector, async bool) error
- type ReplaceableFileService
- type S3FS
- func (s *S3FS) AllocateCacheData(size int) fscache.Data
- func (s *S3FS) Close()
- func (s *S3FS) Cost() *CostAttr
- func (s *S3FS) Delete(ctx context.Context, filePaths ...string) error
- func (*S3FS) ETLCompatible()
- func (s *S3FS) FlushCache()
- func (s *S3FS) List(ctx context.Context, dirPath string) (entries []DirEntry, err error)
- func (s *S3FS) Name() string
- func (s *S3FS) PrefetchFile(ctx context.Context, filePath string) error
- func (s *S3FS) Read(ctx context.Context, vector *IOVector) (err error)
- func (s *S3FS) ReadCache(ctx context.Context, vector *IOVector) (err error)
- func (s *S3FS) SetAsyncUpdate(b bool)
- func (s *S3FS) StatFile(ctx context.Context, filePath string) (*DirEntry, error)
- func (s *S3FS) Write(ctx context.Context, vector IOVector) error
- type TargetCacheKeys
Constants ¶
const ( SkipMemoryCacheReads = 1 << iota SkipMemoryCacheWrites SkipDiskCacheReads SkipDiskCacheWrites SkipFullFilePreloads )
const ( SkipCacheReads = SkipMemoryCacheReads | SkipDiskCacheReads SkipCacheWrites = SkipMemoryCacheWrites | SkipDiskCacheWrites SkipDiskCache = SkipDiskCacheReads | SkipDiskCacheWrites SkipMemoryCache = SkipMemoryCacheReads | SkipMemoryCacheWrites SkipAllCache = SkipDiskCache | SkipMemoryCache )
const DisableCacheCapacity = 1
const ServiceNameSeparator = ":"
Variables ¶
var ( GlobalMemoryCacheSizeHint atomic.Int64 GlobalDiskCacheSizeHint atomic.Int64 )
var CtxKeyDiskCacheCallbacks ctxKeyDiskCacheCallbacks
var DefaultCacheDataAllocator = sync.OnceValue(func() CacheDataAllocator {
return &bytesAllocator{
allocator: memoryCacheAllocator(),
}
})
var DisabledCacheConfig = CacheConfig{ MemoryCapacity: ptrTo[toml.ByteSize](DisableCacheCapacity), DiskCapacity: ptrTo[toml.ByteSize](DisableCacheCapacity), }
var ErrNotCacheFile = errorStr("not a cache file")
var EventLoggerKey eventLoggerKey
var NoDefaultCredentialsForETL = os.Getenv("MO_NO_DEFAULT_CREDENTIALS") != ""
Functions ¶
func CacheOriginalData ¶ added in v1.0.0
func DoWithRetry ¶ added in v1.2.0
func EvictDiskCaches ¶ added in v1.2.3
func EvictMemoryCaches ¶ added in v1.2.3
func HandleRemoteRead ¶ added in v1.0.0
func HandleRemoteRead( ctx context.Context, fs FileService, req *query.Request, resp *query.WrappedResponse, ) error
func IsRetryableError ¶ added in v1.2.0
func NewFileWithChecksumOSFile ¶ added in v0.8.0
func NewFileWithChecksumOSFile( ctx context.Context, underlying *os.File, blockContentSize int, perfCounterSets []*perfcounter.CounterSet, ) (*FileWithChecksum[*os.File], PutBack[*FileWithChecksum[*os.File]])
func OnDiskCacheEvict ¶ added in v1.0.0
func OnDiskCacheEvict(ctx context.Context, fn OnDiskCacheEvictFunc) (ret context.Context)
func OnDiskCacheWritten ¶ added in v1.0.0
func OnDiskCacheWritten(ctx context.Context, fn OnDiskCacheWrittenFunc) (ret context.Context)
Types ¶
type AliyunSDK ¶ added in v1.0.0
type AliyunSDK struct {
// contains filtered or unexported fields
}
func NewAliyunSDK ¶ added in v1.0.0
func NewAliyunSDK( ctx context.Context, args ObjectStorageArguments, perfCounterSets []*perfcounter.CounterSet, ) (_ *AliyunSDK, err error)
type AwsSDKv1 ¶ added in v1.0.0
type AwsSDKv1 struct {
// contains filtered or unexported fields
}
func NewAwsSDKv1 ¶ added in v1.0.0
func NewAwsSDKv1( ctx context.Context, args ObjectStorageArguments, perfCounterSets []*perfcounter.CounterSet, ) (*AwsSDKv1, error)
type AwsSDKv2 ¶ added in v1.0.0
type AwsSDKv2 struct {
// contains filtered or unexported fields
}
func NewAwsSDKv2 ¶ added in v1.0.0
func NewAwsSDKv2( ctx context.Context, args ObjectStorageArguments, perfCounterSets []*perfcounter.CounterSet, ) (*AwsSDKv2, error)
type CacheCallbackFunc ¶ added in v1.0.0
type CacheCallbacks ¶ added in v1.0.0
type CacheCallbacks struct { PostGet []CacheCallbackFunc PostSet []CacheCallbackFunc PostEvict []CacheCallbackFunc }
type CacheConfig ¶ added in v0.6.0
type CacheConfig struct { MemoryCapacity *toml.ByteSize `toml:"memory-capacity" user_setting:"advanced"` DiskPath *string `toml:"disk-path"` DiskCapacity *toml.ByteSize `toml:"disk-capacity"` DiskMinEvictInterval *toml.Duration `toml:"disk-min-evict-interval"` DiskEvictTarget *float64 `toml:"disk-evict-target"` RemoteCacheEnabled bool `toml:"remote-cache-enabled"` RPC morpc.Config `toml:"rpc"` CheckOverlaps bool `toml:"check-overlaps"` QueryClient client.QueryClient `json:"-"` KeyRouterFactory KeyRouterFactory[pb.CacheKey] `json:"-"` KeyRouter client.KeyRouter[pb.CacheKey] `json:"-"` InitKeyRouter *sync.Once `json:"-"` CacheCallbacks `json:"-"` // contains filtered or unexported fields }
func (*CacheConfig) SetRemoteCacheCallback ¶ added in v1.0.0
func (c *CacheConfig) SetRemoteCacheCallback()
type CacheDataAllocator ¶ added in v1.0.0
type CachingFileService ¶ added in v0.6.0
type CachingFileService interface { FileService // FlushCache flushes cache FlushCache() // SetAsyncUpdate sets cache update operation to async mode SetAsyncUpdate(bool) }
CachingFileService is an extension to the FileService
type Config ¶ added in v0.6.0
type Config struct { // Name name of fileservice, describe what an instance of fileservice is used for Name string `toml:"name"` // Backend fileservice backend. [MEM|DISK|DISK-ETL|S3|MINIO] Backend string `toml:"backend"` // S3 used to create fileservice using s3 as the backend S3 ObjectStorageArguments `toml:"s3"` // Cache specifies configs for cache Cache CacheConfig `toml:"cache"` // DataDir used to create fileservice using DISK as the backend DataDir string `toml:"data-dir"` // FixMissing inidicates the file service to try its best to fix missing files FixMissing bool `toml:"fix-missing"` }
Config fileService config
type CostAttr ¶ added in v1.2.1
type CostAttr struct { // List is the cost of List from FileService List CostItem }
type DiskCache ¶ added in v0.7.0
type DiskCache struct {
// contains filtered or unexported fields
}
func NewDiskCache ¶ added in v0.7.0
func NewDiskCache( ctx context.Context, path string, capacity fscache.CapacityFunc, perfCounterSets []*perfcounter.CounterSet, asyncLoad bool, name string, cacheDataAllocator CacheDataAllocator, ) (ret *DiskCache, err error)
func (*DiskCache) DeletePaths ¶ added in v1.1.0
type DiskCacheCallbacks ¶ added in v1.0.0
type DiskCacheCallbacks struct { OnWritten []OnDiskCacheWrittenFunc OnEvict []OnDiskCacheEvictFunc }
type ETLFileService ¶ added in v0.6.0
type ETLFileService interface { FileService // ETLCompatible marks the implementation to be compatible to ETL operations // implementations must save file contents as-is ETLCompatible() }
ETLFileService is an extension to the FileService
func GetForETL ¶ added in v0.6.0
func GetForETL(ctx context.Context, fs FileService, path string) (res ETLFileService, readPath string, err error)
GetForETL get or creates a FileService instance for ETL operations if service part of path is empty, a LocalETLFS will be created if service part of path is not empty, a ETLFileService typed instance will be extracted from fs argument if service part of path is argumented, a FileService instance will be created dynamically with those arguments supported dynamic file service: s3,<endpoint>,<region>,<bucket>,<key>,<secret>,<prefix> s3-no-key,<endpoint>,<region>,<bucket>,<prefix> minio,<endpoint>,<region>,<bucket>,<key>,<secret>,<prefix> s3-opts,endpoint=<endpoint>,region=<region>,bucket=<bucket>,key=<key>,secret=<secret>,prefix=<prefix>,role-arn=<role arn>,external-id=<external id>
key value pairs can be in any order
type FileService ¶
type FileService interface { // Name is file service's name // service name is case-insensitive Name() string // Write writes a new file // returns ErrFileExisted if file already existed // returns ErrSizeNotMatch if provided size does not match data // entries in vector should be written atomically. if write failed, following reads must not succeed. Write(ctx context.Context, vector IOVector) error // Read reads a file to fill IOEntries // returns ErrFileNotFound if requested file not found // returns ErrUnexpectedEOF if less data is read than requested size // returns ErrEmptyRange if no data at specified offset and size // returns ErrEmptyVector if no IOEntry is passed Read(ctx context.Context, vector *IOVector) error // ReadCache reads cached data if any // if cache hit, IOEntry.CachedData will be set ReadCache(ctx context.Context, vector *IOVector) error // List lists sub-entries in a dir List(ctx context.Context, dirPath string) ([]DirEntry, error) // Delete deletes multi file // returns ErrFileNotFound if requested file not found Delete(ctx context.Context, filePaths ...string) error // Stat returns infomations about a file // returns ErrFileNotFound if requested file not found StatFile(ctx context.Context, filePath string) (*DirEntry, error) // PrefetchFile prefetches a file PrefetchFile(ctx context.Context, filePath string) error // Cost returns the cost attr of the file service Cost() *CostAttr Close() }
FileService is a write-once file system
func GetForBackup ¶ added in v1.0.0
func GetForBackup(ctx context.Context, spec string) (res FileService, err error)
GetForBackup creates a FileService instance for backup operations if service part of path is empty, a LocalFS will be created if service part of path is argumented, a FileService instance will be created dynamically with those arguments supported dynamic file service: s3-opts,endpoint=<endpoint>,region=<region>,bucket=<bucket>,key=<key>,secret=<secret>,prefix=<prefix>,role-arn=<role arn>,external-id=<external id>,is-minio=<is-minio>
func NewFileService ¶ added in v0.6.0
func NewFileService( ctx context.Context, cfg Config, perfCounterSets []*perfcounter.CounterSet, ) (FileService, error)
NewFileService create file service from config
func SubPath ¶ added in v0.8.0
func SubPath(upstream FileService, path string) FileService
SubPath returns a FileService instance that operates at specified sub path of the upstream instance
type FileServices ¶ added in v0.6.0
type FileServices struct {
// contains filtered or unexported fields
}
func NewFileServices ¶ added in v0.6.0
func NewFileServices(defaultName string, fss ...FileService) (*FileServices, error)
func (*FileServices) Close ¶ added in v1.2.0
func (f *FileServices) Close()
func (*FileServices) Cost ¶ added in v1.2.1
func (f *FileServices) Cost() *CostAttr
func (*FileServices) Delete ¶ added in v0.6.0
func (f *FileServices) Delete(ctx context.Context, filePaths ...string) error
func (*FileServices) Name ¶ added in v0.6.0
func (f *FileServices) Name() string
func (*FileServices) PrefetchFile ¶ added in v1.0.1
func (f *FileServices) PrefetchFile(ctx context.Context, filePath string) error
func (*FileServices) Read ¶ added in v0.6.0
func (f *FileServices) Read(ctx context.Context, vector *IOVector) error
func (*FileServices) ReadCache ¶ added in v1.0.0
func (f *FileServices) ReadCache(ctx context.Context, vector *IOVector) error
type FileWithChecksum ¶ added in v0.6.0
type FileWithChecksum[T FileLike] struct { // contains filtered or unexported fields }
FileWithChecksum maps file contents to blocks with checksum
func NewFileWithChecksum ¶ added in v0.6.0
func NewFileWithChecksum[T FileLike]( ctx context.Context, underlying T, blockContentSize int, perfCounterSets []*perfcounter.CounterSet, ) *FileWithChecksum[T]
func (*FileWithChecksum[T]) Read ¶ added in v0.6.0
func (f *FileWithChecksum[T]) Read(buf []byte) (n int, err error)
func (*FileWithChecksum[T]) ReadAt ¶ added in v0.6.0
func (f *FileWithChecksum[T]) ReadAt(buf []byte, offset int64) (n int, err error)
func (*FileWithChecksum[T]) Seek ¶ added in v0.6.0
func (f *FileWithChecksum[T]) Seek(offset int64, whence int) (int64, error)
type IOEntry ¶
type IOEntry struct { // offset in file // when writing or mutating, offset can be arbitrary value, gaps between provided data are zero-filled // when reading, valid offsets are in range [0, len(file) - 1] Offset int64 // number of bytes to read or write, [1, len(file)] // when reading, pass -1 to read to the end of file Size int64 // raw content // when reading, if len(Data) < Size, a new Size-lengthed byte slice will be allocated Data []byte // when reading, if Writer is not nil, write data to it instead of setting Data field WriterForRead io.Writer // when reading, if ReadCloser is not nil, set an io.ReadCloser instead of setting Data field ReadCloserForRead *io.ReadCloser // when writing, if Reader is not nil, read data from it instead of reading Data field // number of bytes to be read is specified by Size field // if number of bytes is unknown, set Size field to -1 ReaderForWrite io.Reader // When reading, if the ToCacheData field is not nil, the returning object's byte slice will be set to this field // Data, WriterForRead, ReadCloserForRead may be empty if CachedData is not null // if ToCacheData is provided, caller should always read CachedData instead of Data, WriterForRead or ReadCloserForRead CachedData fscache.Data // ToCacheData constructs an object byte slice from entry contents // reader or data must not be retained after returns // reader always contains entry contents // data may contains entry contents if available // if data is empty, the io.Reader must be fully read before returning nil error ToCacheData func(reader io.Reader, data []byte, allocator CacheDataAllocator) (cacheData fscache.Data, err error) // contains filtered or unexported fields }
func (*IOEntry) ReadFromOSFile ¶ added in v0.8.0
type IOMergeKey ¶ added in v1.2.0
type IOMerger ¶ added in v1.2.0
type IOMerger struct {
// contains filtered or unexported fields
}
IOMerger merges multiple I/O requests to single one
func NewIOMerger ¶ added in v1.2.0
func NewIOMerger() *IOMerger
func (*IOMerger) Merge ¶ added in v1.2.0
func (i *IOMerger) Merge(key IOMergeKey) (done func(), wait func())
type IOVector ¶
type IOVector struct { // FilePath indicates where to find the file // a path has two parts, service name and file name, separated by ':' // service name is optional, if omitted, the receiver FileService will use the default name of the service // file name parts are separated by '/' // valid characters in file name: 0-9 a-z A-Z / ! - _ . * ' ( ) // and all printable non-ASCII characters // example: // s3:a/b/c S3:a/b/c represents the same file 'a/b/c' located in 'S3' service FilePath string // io entries // empty Entries is not allowed // when writing, overlapping Entries is not allowed Entries []IOEntry // ExpireAt specifies the expire time of the file // implementations may or may not delete the file after this time // zero value means no expire ExpireAt time.Time // Policy controls policy for the vector Policy Policy // Caches indicates extra caches to operate on Caches []IOVectorCache }
type IOVectorCache ¶ added in v0.8.0
type IOVectorCache interface { Read( ctx context.Context, vector *IOVector, ) error Update( ctx context.Context, vector *IOVector, async bool, ) error Flush() //TODO file contents may change in TAE that violates the immutibility assumption // before they fix this, we still need this sh**. DeletePaths( ctx context.Context, paths []string, ) error // Evict triggers eviction // if done is not nil, when eviction finish, target size will be send to the done chan Evict(done chan int64) }
VectorCache caches IOVector
type KeyRouterFactory ¶ added in v1.0.0
type KeyRouterFactory[T comparable] func() client.KeyRouter[T]
type LocalETLFS ¶ added in v0.6.0
LocalETLFS is a FileService implementation backed by local file system and suitable for ETL operations
func NewLocalETLFS ¶ added in v0.6.0
func NewLocalETLFS(name string, rootPath string) (*LocalETLFS, error)
func (*LocalETLFS) Close ¶ added in v1.2.0
func (l *LocalETLFS) Close()
func (*LocalETLFS) Cost ¶ added in v1.2.1
func (l *LocalETLFS) Cost() *CostAttr
func (*LocalETLFS) Delete ¶ added in v0.6.0
func (l *LocalETLFS) Delete(ctx context.Context, filePaths ...string) error
func (*LocalETLFS) ETLCompatible ¶ added in v0.6.0
func (l *LocalETLFS) ETLCompatible()
func (*LocalETLFS) Name ¶ added in v0.6.0
func (l *LocalETLFS) Name() string
func (*LocalETLFS) NewMutator ¶ added in v0.6.0
func (*LocalETLFS) PrefetchFile ¶ added in v1.0.1
func (l *LocalETLFS) PrefetchFile(ctx context.Context, filePath string) error
func (*LocalETLFS) Read ¶ added in v0.6.0
func (l *LocalETLFS) Read(ctx context.Context, vector *IOVector) error
func (*LocalETLFS) ReadCache ¶ added in v1.0.0
func (l *LocalETLFS) ReadCache(ctx context.Context, vector *IOVector) error
type LocalETLFSMutator ¶ added in v0.6.0
type LocalETLFSMutator struct {
// contains filtered or unexported fields
}
func (*LocalETLFSMutator) Append ¶ added in v0.6.0
func (l *LocalETLFSMutator) Append(ctx context.Context, entries ...IOEntry) error
func (*LocalETLFSMutator) Close ¶ added in v0.6.0
func (l *LocalETLFSMutator) Close() error
type LocalFS ¶
LocalFS is a FileService implementation backed by local file system
func NewLocalFS ¶
func NewLocalFS( ctx context.Context, name string, rootPath string, cacheConfig CacheConfig, perfCounterSets []*perfcounter.CounterSet, ) (*LocalFS, error)
func (*LocalFS) AllocateCacheData ¶ added in v1.2.3
func (*LocalFS) FlushCache ¶ added in v0.6.0
func (l *LocalFS) FlushCache()
func (*LocalFS) NewMutator ¶ added in v0.6.0
func (*LocalFS) PrefetchFile ¶ added in v1.0.1
func (*LocalFS) SetAsyncUpdate ¶ added in v0.8.0
type LocalFSMutator ¶ added in v0.6.0
type LocalFSMutator struct {
// contains filtered or unexported fields
}
func (*LocalFSMutator) Append ¶ added in v0.6.0
func (l *LocalFSMutator) Append(ctx context.Context, entries ...IOEntry) error
func (*LocalFSMutator) Close ¶ added in v0.6.0
func (l *LocalFSMutator) Close() error
type MemCache ¶ added in v0.6.0
type MemCache struct {
// contains filtered or unexported fields
}
func NewMemCache ¶ added in v0.6.0
func NewMemCache( capacity fscache.CapacityFunc, callbacks *CacheCallbacks, counterSets []*perfcounter.CounterSet, name string, ) *MemCache
func (*MemCache) DeletePaths ¶ added in v1.1.0
type MemoryFS ¶
MemoryFS is an in-memory FileService implementation
func NewMemoryFS ¶
func NewMemoryFS( name string, cacheConfig CacheConfig, perfCounterSets []*perfcounter.CounterSet, ) (*MemoryFS, error)
func (*MemoryFS) ETLCompatible ¶ added in v0.6.0
func (m *MemoryFS) ETLCompatible()
func (*MemoryFS) PrefetchFile ¶ added in v1.0.1
type MinioSDK ¶ added in v1.0.0
type MinioSDK struct {
// contains filtered or unexported fields
}
func NewMinioSDK ¶ added in v1.0.0
func NewMinioSDK( ctx context.Context, args ObjectStorageArguments, perfCounterSets []*perfcounter.CounterSet, ) (*MinioSDK, error)
type MutableFileService ¶
type MutableFileService interface { FileService // NewMutator creates a new mutator NewMutator(ctx context.Context, filePath string) (Mutator, error) }
MutableFileService is an extension interface to FileService that allow mutation
type Mutator ¶ added in v0.6.0
type Mutator interface { // Mutate mutates file contents Mutate(ctx context.Context, entries ...IOEntry) error // Append appends data to file // all IOEntry.Offset is base on the end of file position // for example, passing IOEntry{Offset: 0, Len:1, Data: []byte("a")} will append "a" to the end of file Append(ctx context.Context, entries ...IOEntry) error // Close closes the mutator // Must be called after finishing mutation Close() error }
type NewFileServicesFunc ¶ added in v0.6.0
type NewFileServicesFunc = func(defaultName string) (*FileServices, error)
NewFileServicesFunc creates a new *FileServices
type ObjectStorage ¶ added in v1.0.0
type ObjectStorage interface { // List lists objects with specified prefix List( ctx context.Context, prefix string, fn func(isPrefix bool, key string, size int64) (bool, error), ) ( err error, ) // Stat returns informations about an object Stat( ctx context.Context, key string, ) ( size int64, err error, ) // Exists reports whether specified object exists Exists( ctx context.Context, key string, ) ( bool, error, ) // Write writes an object Write( ctx context.Context, key string, r io.Reader, size int64, expire *time.Time, ) ( err error, ) // Read returns an io.Reader for specified object range Read( ctx context.Context, key string, min *int64, max *int64, ) ( r io.ReadCloser, err error, ) // Delete deletes objects Delete( ctx context.Context, keys ...string, ) ( err error, ) }
type ObjectStorageArguments ¶ added in v1.0.0
type ObjectStorageArguments struct { // misc Name string `toml:"name"` KeyPrefix string `toml:"key-prefix"` NoDefaultCredentials bool `toml:"no-default-credentials"` NoBucketValidation bool `toml:"no-bucket-validation"` Concurrency int64 `toml:"concurrency"` // s3 Bucket string `toml:"bucket"` Endpoint string `toml:"endpoint"` IsMinio bool `toml:"is-minio"` Region string `toml:"region"` CertFiles []string `toml:"cert-files"` // credentials RoleARN string `json:"-" toml:"role-arn"` BearerToken string `json:"-" toml:"bearer-token"` ExternalID string `json:"-" toml:"external-id"` KeyID string `json:"-" toml:"key-id"` KeySecret string `json:"-" toml:"key-secret"` RAMRole string `json:"-" toml:"ram-role"` RoleSessionName string `json:"-" toml:"role-session-name"` SecurityToken string `json:"-" toml:"security-token"` SessionToken string `json:"-" toml:"session-token"` }
func (*ObjectStorageArguments) SetFromString ¶ added in v1.0.0
func (o *ObjectStorageArguments) SetFromString(arguments []string) error
func (ObjectStorageArguments) String ¶ added in v1.0.1
func (o ObjectStorageArguments) String() string
type OnDiskCacheEvictFunc ¶ added in v1.0.0
type OnDiskCacheEvictFunc = func( diskFilePath string, )
type OnDiskCacheWrittenFunc ¶ added in v1.0.0
type Path ¶ added in v0.6.0
func ParsePathAtService ¶ added in v0.6.0
func (Path) ServiceString ¶ added in v0.8.0
type Policy ¶ added in v1.0.0
type Policy uint64
func (Policy) CacheFullFile ¶ added in v1.0.0
func (Policy) CacheIOEntry ¶ added in v1.0.0
type Pool ¶ added in v0.8.0
type Pool[T any] struct { // contains filtered or unexported fields }
type PutBack ¶ added in v0.8.0
type PutBack[T any] struct { // contains filtered or unexported fields }
type RemoteCache ¶ added in v1.0.0
type RemoteCache struct {
// contains filtered or unexported fields
}
RemoteCache is the cache for remote read.
func NewRemoteCache ¶ added in v1.0.0
func NewRemoteCache(client client.QueryClient, factory KeyRouterFactory[query.CacheKey]) *RemoteCache
func (*RemoteCache) DeletePaths ¶ added in v1.1.0
func (r *RemoteCache) DeletePaths(ctx context.Context, paths []string) error
func (*RemoteCache) Evict ¶ added in v1.2.3
func (r *RemoteCache) Evict(done chan int64)
func (*RemoteCache) Flush ¶ added in v1.0.0
func (r *RemoteCache) Flush()
type ReplaceableFileService ¶ added in v0.6.0
type ReplaceableFileService interface { FileService Replace(ctx context.Context, vector IOVector) error }
ReplaceableFileService is an extension interface to FileService that allow replacing a whole file
type S3FS ¶
type S3FS struct {
// contains filtered or unexported fields
}
S3FS is a FileService implementation backed by S3
func NewS3FS ¶
func NewS3FS( ctx context.Context, args ObjectStorageArguments, cacheConfig CacheConfig, perfCounterSets []*perfcounter.CounterSet, noCache bool, noDefaultCredential bool, ) (*S3FS, error)
func (*S3FS) AllocateCacheData ¶ added in v1.2.3
func (*S3FS) ETLCompatible ¶ added in v0.6.0
func (*S3FS) ETLCompatible()
func (*S3FS) FlushCache ¶ added in v0.6.0
func (s *S3FS) FlushCache()
func (*S3FS) PrefetchFile ¶ added in v1.0.1
func (*S3FS) SetAsyncUpdate ¶ added in v0.8.0
type TargetCacheKeys ¶ added in v1.0.0
type TargetCacheKeys map[string][]*query.RequestCacheKey
Source Files ¶
- aliyun_sdk.go
- aws_sdk_v1.go
- aws_sdk_v2.go
- bytes.go
- cache.go
- caching_file_service.go
- config.go
- disk_cache.go
- disk_cache_callbacks.go
- disk_object_storage.go
- error.go
- etl_file_service.go
- event_logger.go
- file_cache.go
- file_like.go
- file_service.go
- file_services.go
- file_with_checksum.go
- get.go
- http_client.go
- io.go
- io_entry.go
- io_entry_reader.go
- io_merger.go
- io_vector.go
- local_etl_fs.go
- local_fs.go
- malloc.go
- mem_cache.go
- memory_fs.go
- minio_sdk.go
- mutable_file_service.go
- object_storage.go
- object_storage_arguments.go
- object_storage_semaphore.go
- path.go
- policy.go
- pool.go
- remote_cache.go
- replaceable_file_service.go
- retry.go
- retryable_reader.go
- reuse.go
- s3_fs.go
- s3_fs_restore.go
- string.go
- sub_path.go
- utils.go