fileservice

package
v1.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 11, 2024 License: Apache-2.0 Imports: 75 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SkipMemoryCacheReads = 1 << iota
	SkipMemoryCacheWrites
	SkipDiskCacheReads
	SkipDiskCacheWrites
	SkipFullFilePreloads
)
View Source
const DisableCacheCapacity = 1
View Source
const ServiceNameSeparator = ":"

Variables

View Source
var CtxKeyDiskCacheCallbacks ctxKeyDiskCacheCallbacks
View Source
var DisabledCacheConfig = CacheConfig{
	MemoryCapacity: ptrTo[toml.ByteSize](DisableCacheCapacity),
	DiskCapacity:   ptrTo[toml.ByteSize](DisableCacheCapacity),
}
View Source
var ErrNotCacheFile = errorStr("not a cache file")
View Source
var EventLoggerKey eventLoggerKey
View Source
var NoDefaultCredentialsForETL = os.Getenv("MO_NO_DEFAULT_CREDENTIALS") != ""

Functions

func CacheOriginalData added in v1.0.0

func CacheOriginalData(r io.Reader, data []byte, allocator CacheDataAllocator) (cacheData memorycache.CacheData, err error)

func DoWithRetry added in v1.2.0

func DoWithRetry[T any](
	what string,
	fn func() (T, error),
	maxAttemps int,
	isRetryable func(error) bool,
) (res T, err error)

func Get added in v0.6.0

func Get[T any](fs FileService, name string) (res T, err error)

func HandleRemoteRead added in v1.0.0

func HandleRemoteRead(
	ctx context.Context, fs FileService, req *query.Request, resp *query.WrappedResponse,
) error

func IsRetryableError added in v1.2.0

func IsRetryableError(err error) bool

func JoinPath added in v0.6.0

func JoinPath(serviceName string, path string) string

func LogEvent added in v1.2.1

func LogEvent(ctx context.Context, ev stringRef, args ...any)

func LogSlowEvent added in v1.2.1

func LogSlowEvent(ctx context.Context, threshold time.Duration)

func NewFileWithChecksumOSFile added in v0.8.0

func NewFileWithChecksumOSFile(
	ctx context.Context,
	underlying *os.File,
	blockContentSize int,
	perfCounterSets []*perfcounter.CounterSet,
) (*FileWithChecksum[*os.File], PutBack[*FileWithChecksum[*os.File]])

func NewMemoryCache added in v1.2.0

func NewMemoryCache(
	capacity int64,
	checkOverlaps bool,
	callbacks *CacheCallbacks,
) *memorycache.Cache

func OnDiskCacheEvict added in v1.0.0

func OnDiskCacheEvict(ctx context.Context, fn OnDiskCacheEvictFunc) (ret context.Context)

func OnDiskCacheWritten added in v1.0.0

func OnDiskCacheWritten(ctx context.Context, fn OnDiskCacheWrittenFunc) (ret context.Context)

func WithEventLogger added in v1.2.1

func WithEventLogger(ctx context.Context) context.Context

Types

type AliyunSDK added in v1.0.0

type AliyunSDK struct {
	// contains filtered or unexported fields
}

func NewAliyunSDK added in v1.0.0

func NewAliyunSDK(
	ctx context.Context,
	args ObjectStorageArguments,
	perfCounterSets []*perfcounter.CounterSet,
) (_ *AliyunSDK, err error)

func (*AliyunSDK) Delete added in v1.0.0

func (a *AliyunSDK) Delete(
	ctx context.Context,
	keys ...string,
) (
	err error,
)

func (*AliyunSDK) Exists added in v1.0.0

func (a *AliyunSDK) Exists(
	ctx context.Context,
	key string,
) (
	bool,
	error,
)

func (*AliyunSDK) List added in v1.0.0

func (a *AliyunSDK) List(
	ctx context.Context,
	prefix string,
	fn func(bool, string, int64) (bool, error),
) error

func (*AliyunSDK) Read added in v1.0.0

func (a *AliyunSDK) Read(
	ctx context.Context,
	key string,
	min *int64,
	max *int64,
) (
	r io.ReadCloser,
	err error,
)

func (*AliyunSDK) Stat added in v1.0.0

func (a *AliyunSDK) Stat(
	ctx context.Context,
	key string,
) (
	size int64,
	err error,
)

func (*AliyunSDK) Write added in v1.0.0

func (a *AliyunSDK) Write(
	ctx context.Context,
	key string,
	r io.Reader,
	size int64,
	expire *time.Time,
) (
	err error,
)

type AwsSDKv1 added in v1.0.0

type AwsSDKv1 struct {
	// contains filtered or unexported fields
}

func NewAwsSDKv1 added in v1.0.0

func NewAwsSDKv1(
	ctx context.Context,
	args ObjectStorageArguments,
	perfCounterSets []*perfcounter.CounterSet,
) (*AwsSDKv1, error)

func (*AwsSDKv1) Delete added in v1.0.0

func (a *AwsSDKv1) Delete(
	ctx context.Context,
	keys ...string,
) (
	err error,
)

func (*AwsSDKv1) Exists added in v1.0.0

func (a *AwsSDKv1) Exists(
	ctx context.Context,
	key string,
) (
	bool,
	error,
)

func (*AwsSDKv1) List added in v1.0.0

func (a *AwsSDKv1) List(
	ctx context.Context,
	prefix string,
	fn func(bool, string, int64) (bool, error),
) error

func (*AwsSDKv1) Read added in v1.0.0

func (a *AwsSDKv1) Read(
	ctx context.Context,
	key string,
	min *int64,
	max *int64,
) (
	r io.ReadCloser,
	err error,
)

func (*AwsSDKv1) Stat added in v1.0.0

func (a *AwsSDKv1) Stat(
	ctx context.Context,
	key string,
) (
	size int64,
	err error,
)

func (*AwsSDKv1) Write added in v1.0.0

func (a *AwsSDKv1) Write(
	ctx context.Context,
	key string,
	r io.Reader,
	size int64,
	expire *time.Time,
) (
	err error,
)

type AwsSDKv2 added in v1.0.0

type AwsSDKv2 struct {
	// contains filtered or unexported fields
}

func NewAwsSDKv2 added in v1.0.0

func NewAwsSDKv2(
	ctx context.Context,
	args ObjectStorageArguments,
	perfCounterSets []*perfcounter.CounterSet,
) (*AwsSDKv2, error)

func (*AwsSDKv2) Delete added in v1.0.0

func (a *AwsSDKv2) Delete(
	ctx context.Context,
	keys ...string,
) (
	err error,
)

func (*AwsSDKv2) Exists added in v1.0.0

func (a *AwsSDKv2) Exists(
	ctx context.Context,
	key string,
) (
	bool,
	error,
)

func (*AwsSDKv2) List added in v1.0.0

func (a *AwsSDKv2) List(
	ctx context.Context,
	prefix string,
	fn func(bool, string, int64) (bool, error),
) error

func (*AwsSDKv2) Read added in v1.0.0

func (a *AwsSDKv2) Read(
	ctx context.Context,
	key string,
	min *int64,
	max *int64,
) (
	r io.ReadCloser,
	err error,
)

func (*AwsSDKv2) Stat added in v1.0.0

func (a *AwsSDKv2) Stat(
	ctx context.Context,
	key string,
) (
	size int64,
	err error,
)

func (*AwsSDKv2) Write added in v1.0.0

func (a *AwsSDKv2) Write(
	ctx context.Context,
	key string,
	r io.Reader,
	size int64,
	expire *time.Time,
) (
	err error,
)

type Bytes added in v1.0.0

type Bytes struct {
	// contains filtered or unexported fields
}

func (Bytes) Bytes added in v1.0.0

func (b Bytes) Bytes() []byte

func (Bytes) Release added in v1.0.0

func (b Bytes) Release()

func (Bytes) Size added in v1.0.0

func (b Bytes) Size() int64

func (Bytes) Slice added in v1.0.0

func (b Bytes) Slice(length int) memorycache.CacheData

type CacheCallbackFunc added in v1.0.0

type CacheCallbackFunc = func(CacheKey, memorycache.CacheData)

type CacheCallbacks added in v1.0.0

type CacheCallbacks struct {
	PostGet   []CacheCallbackFunc
	PostSet   []CacheCallbackFunc
	PostEvict []CacheCallbackFunc
}

type CacheConfig added in v0.6.0

type CacheConfig struct {
	MemoryCapacity       *toml.ByteSize `toml:"memory-capacity" user_setting:"advanced"`
	DiskPath             *string        `toml:"disk-path"`
	DiskCapacity         *toml.ByteSize `toml:"disk-capacity"`
	DiskMinEvictInterval *toml.Duration `toml:"disk-min-evict-interval"`
	DiskEvictTarget      *float64       `toml:"disk-evict-target"`
	RemoteCacheEnabled   bool           `toml:"remote-cache-enabled"`
	RPC                  morpc.Config   `toml:"rpc"`
	CheckOverlaps        bool           `toml:"check-overlaps"`

	QueryClient      client.QueryClient            `json:"-"`
	KeyRouterFactory KeyRouterFactory[pb.CacheKey] `json:"-"`
	KeyRouter        client.KeyRouter[pb.CacheKey] `json:"-"`
	InitKeyRouter    *sync.Once                    `json:"-"`
	CacheCallbacks   `json:"-"`
	// contains filtered or unexported fields
}

func (*CacheConfig) SetRemoteCacheCallback added in v1.0.0

func (c *CacheConfig) SetRemoteCacheCallback()

type CacheDataAllocator added in v1.0.0

type CacheDataAllocator interface {
	Alloc(size int) memorycache.CacheData
}

func GetDefaultCacheDataAllocator added in v1.2.1

func GetDefaultCacheDataAllocator() CacheDataAllocator

type CacheKey added in v0.6.0

type CacheKey = pb.CacheKey

type CachingFileService added in v0.6.0

type CachingFileService interface {
	FileService

	// FlushCache flushes cache
	FlushCache()

	// SetAsyncUpdate sets cache update operation to async mode
	SetAsyncUpdate(bool)
}

CachingFileService is an extension to the FileService

type Config added in v0.6.0

type Config struct {
	// Name name of fileservice, describe what an instance of fileservice is used for
	Name string `toml:"name"`
	// Backend fileservice backend. [MEM|DISK|DISK-ETL|S3|MINIO]
	Backend string `toml:"backend"`
	// S3 used to create fileservice using s3 as the backend
	S3 ObjectStorageArguments `toml:"s3"`
	// Cache specifies configs for cache
	Cache CacheConfig `toml:"cache"`
	// DataDir used to create fileservice using DISK as the backend
	DataDir string `toml:"data-dir"`
	// FixMissing inidicates the file service to try its best to fix missing files
	FixMissing bool `toml:"fix-missing"`
}

Config fileService config

type CostAttr added in v1.2.1

type CostAttr struct {
	// List is the cost of List from FileService
	List CostItem
}

type CostItem added in v1.2.1

type CostItem uint8
const (
	CostLow CostItem = iota
	CostHigh
)

type DirEntry

type DirEntry struct {
	// file name, not full path
	Name  string
	IsDir bool
	Size  int64
}

DirEntry is a file or dir

type DiskCache added in v0.7.0

type DiskCache struct {
	// contains filtered or unexported fields
}

func NewDiskCache added in v0.7.0

func NewDiskCache(
	ctx context.Context,
	path string,
	capacity int,
	perfCounterSets []*perfcounter.CounterSet,
	asyncLoad bool,
) (ret *DiskCache, err error)

func (*DiskCache) DeletePaths added in v1.1.0

func (d *DiskCache) DeletePaths(
	ctx context.Context,
	paths []string,
) error

func (*DiskCache) Flush added in v0.7.0

func (d *DiskCache) Flush()

func (*DiskCache) Read added in v0.7.0

func (d *DiskCache) Read(
	ctx context.Context,
	vector *IOVector,
) (
	err error,
)

func (*DiskCache) SetFile added in v1.0.0

func (d *DiskCache) SetFile(
	ctx context.Context,
	path string,
	openReader func(context.Context) (io.ReadCloser, error),
) error

func (*DiskCache) Update added in v0.7.0

func (d *DiskCache) Update(
	ctx context.Context,
	vector *IOVector,
	async bool,
) (
	err error,
)

type DiskCacheCallbacks added in v1.0.0

type DiskCacheCallbacks struct {
	OnWritten []OnDiskCacheWrittenFunc
	OnEvict   []OnDiskCacheEvictFunc
}

type ETLFileService added in v0.6.0

type ETLFileService interface {
	FileService

	// ETLCompatible marks the implementation to be compatible to ETL operations
	// implementations must save file contents as-is
	ETLCompatible()
}

ETLFileService is an extension to the FileService

func GetForETL added in v0.6.0

func GetForETL(ctx context.Context, fs FileService, path string) (res ETLFileService, readPath string, err error)

GetForETL get or creates a FileService instance for ETL operations if service part of path is empty, a LocalETLFS will be created if service part of path is not empty, a ETLFileService typed instance will be extracted from fs argument if service part of path is argumented, a FileService instance will be created dynamically with those arguments supported dynamic file service: s3,<endpoint>,<region>,<bucket>,<key>,<secret>,<prefix> s3-no-key,<endpoint>,<region>,<bucket>,<prefix> minio,<endpoint>,<region>,<bucket>,<key>,<secret>,<prefix> s3-opts,endpoint=<endpoint>,region=<region>,bucket=<bucket>,key=<key>,secret=<secret>,prefix=<prefix>,role-arn=<role arn>,external-id=<external id>

key value pairs can be in any order

type FileCache added in v1.0.0

type FileCache interface {
	SetFile(
		ctx context.Context,
		path string,
		openReader func(context.Context) (io.ReadCloser, error),
	) error
}

type FileLike added in v0.6.0

type FileLike interface {
	io.ReadWriteSeeker
	io.WriterAt
	io.ReaderAt
}

type FileService

type FileService interface {
	// Name is file service's name
	// service name is case-insensitive
	Name() string

	// Write writes a new file
	// returns ErrFileExisted if file already existed
	// returns ErrSizeNotMatch if provided size does not match data
	// entries in vector should be written atomically. if write failed, following reads must not succeed.
	Write(ctx context.Context, vector IOVector) error

	// Read reads a file to fill IOEntries
	// returns ErrFileNotFound if requested file not found
	// returns ErrUnexpectedEOF if less data is read than requested size
	// returns ErrEmptyRange if no data at specified offset and size
	// returns ErrEmptyVector if no IOEntry is passed
	Read(ctx context.Context, vector *IOVector) error

	// ReadCache reads cached data if any
	// if cache hit, IOEntry.CachedData will be set
	ReadCache(ctx context.Context, vector *IOVector) error

	// List lists sub-entries in a dir
	List(ctx context.Context, dirPath string) ([]DirEntry, error)

	// Delete deletes multi file
	// returns ErrFileNotFound if requested file not found
	Delete(ctx context.Context, filePaths ...string) error

	// Stat returns infomations about a file
	// returns ErrFileNotFound if requested file not found
	StatFile(ctx context.Context, filePath string) (*DirEntry, error)

	// PrefetchFile prefetches a file
	PrefetchFile(ctx context.Context, filePath string) error

	// Cost returns the cost attr of the file service
	Cost() *CostAttr

	Close()
}

FileService is a write-once file system

func GetForBackup added in v1.0.0

func GetForBackup(ctx context.Context, spec string) (res FileService, err error)

GetForBackup creates a FileService instance for backup operations if service part of path is empty, a LocalFS will be created if service part of path is argumented, a FileService instance will be created dynamically with those arguments supported dynamic file service: s3-opts,endpoint=<endpoint>,region=<region>,bucket=<bucket>,key=<key>,secret=<secret>,prefix=<prefix>,role-arn=<role arn>,external-id=<external id>,is-minio=<is-minio>

func NewFileService added in v0.6.0

func NewFileService(
	ctx context.Context, cfg Config, perfCounterSets []*perfcounter.CounterSet,
) (FileService, error)

NewFileService create file service from config

func SubPath added in v0.8.0

func SubPath(upstream FileService, path string) FileService

SubPath returns a FileService instance that operates at specified sub path of the upstream instance

type FileServices added in v0.6.0

type FileServices struct {
	// contains filtered or unexported fields
}

func NewFileServices added in v0.6.0

func NewFileServices(defaultName string, fss ...FileService) (*FileServices, error)

func (*FileServices) Close added in v1.2.0

func (f *FileServices) Close()

func (*FileServices) Cost added in v1.2.1

func (f *FileServices) Cost() *CostAttr

func (*FileServices) Delete added in v0.6.0

func (f *FileServices) Delete(ctx context.Context, filePaths ...string) error

func (*FileServices) List added in v0.6.0

func (f *FileServices) List(ctx context.Context, dirPath string) ([]DirEntry, error)

func (*FileServices) Name added in v0.6.0

func (f *FileServices) Name() string

func (*FileServices) PrefetchFile added in v1.0.1

func (f *FileServices) PrefetchFile(ctx context.Context, filePath string) error

func (*FileServices) Read added in v0.6.0

func (f *FileServices) Read(ctx context.Context, vector *IOVector) error

func (*FileServices) ReadCache added in v1.0.0

func (f *FileServices) ReadCache(ctx context.Context, vector *IOVector) error

func (*FileServices) StatFile added in v0.7.0

func (f *FileServices) StatFile(ctx context.Context, filePath string) (*DirEntry, error)

func (*FileServices) Write added in v0.6.0

func (f *FileServices) Write(ctx context.Context, vector IOVector) error

type FileWithChecksum added in v0.6.0

type FileWithChecksum[T FileLike] struct {
	// contains filtered or unexported fields
}

FileWithChecksum maps file contents to blocks with checksum

func NewFileWithChecksum added in v0.6.0

func NewFileWithChecksum[T FileLike](
	ctx context.Context,
	underlying T,
	blockContentSize int,
	perfCounterSets []*perfcounter.CounterSet,
) *FileWithChecksum[T]

func (*FileWithChecksum[T]) Read added in v0.6.0

func (f *FileWithChecksum[T]) Read(buf []byte) (n int, err error)

func (*FileWithChecksum[T]) ReadAt added in v0.6.0

func (f *FileWithChecksum[T]) ReadAt(buf []byte, offset int64) (n int, err error)

func (*FileWithChecksum[T]) Seek added in v0.6.0

func (f *FileWithChecksum[T]) Seek(offset int64, whence int) (int64, error)

func (*FileWithChecksum[T]) Write added in v0.6.0

func (f *FileWithChecksum[T]) Write(buf []byte) (n int, err error)

func (*FileWithChecksum[T]) WriteAt added in v0.6.0

func (f *FileWithChecksum[T]) WriteAt(buf []byte, offset int64) (n int, err error)

type IOEntry

type IOEntry struct {
	// offset in file
	// when writing or mutating, offset can be arbitrary value, gaps between provided data are zero-filled
	// when reading, valid offsets are in range [0, len(file) - 1]
	Offset int64

	// number of bytes to read or write, [1, len(file)]
	// when reading, pass -1 to read to the end of file
	Size int64

	// raw content
	// when reading, if len(Data) < Size, a new Size-lengthed byte slice will be allocated
	Data []byte

	// when reading, if Writer is not nil, write data to it instead of setting Data field
	WriterForRead io.Writer

	// when reading, if ReadCloser is not nil, set an io.ReadCloser instead of setting Data field
	ReadCloserForRead *io.ReadCloser

	// when writing, if Reader is not nil, read data from it instead of reading Data field
	// number of bytes to be read is specified by Size field
	// if number of bytes is unknown, set Size field to -1
	ReaderForWrite io.Reader

	// When reading, if the ToCacheData field is not nil, the returning object's byte slice will be set to this field
	// Data, WriterForRead, ReadCloserForRead may be empty if CachedData is not null
	// if ToCacheData is provided, caller should always read CachedData instead of Data, WriterForRead or ReadCloserForRead
	CachedData memorycache.CacheData

	// ToCacheData constructs an object byte slice from entry contents
	// reader or data must not be retained after returns
	// reader always contains entry contents
	// data may contains entry contents if available
	// if data is empty, the io.Reader must be fully read before returning nil error
	ToCacheData func(reader io.Reader, data []byte, allocator CacheDataAllocator) (cacheData memorycache.CacheData, err error)
	// contains filtered or unexported fields
}

func (*IOEntry) ReadFromOSFile added in v0.8.0

func (i *IOEntry) ReadFromOSFile(ctx context.Context, file *os.File) (err error)

func (IOEntry) String added in v1.1.0

func (i IOEntry) String() string

type IOMergeKey added in v1.2.0

type IOMergeKey struct {
	Path   string
	Offset int64
	End    int64
	Policy Policy
}

type IOMerger added in v1.2.0

type IOMerger struct {
	// contains filtered or unexported fields
}

IOMerger merges multiple I/O requests to single one

func NewIOMerger added in v1.2.0

func NewIOMerger() *IOMerger

func (*IOMerger) Merge added in v1.2.0

func (i *IOMerger) Merge(key IOMergeKey) (done func(), wait func())

type IOVector

type IOVector struct {

	// FilePath indicates where to find the file
	// a path has two parts, service name and file name, separated by ':'
	// service name is optional, if omitted, the receiver FileService will use the default name of the service
	// file name parts are separated by '/'
	// valid characters in file name: 0-9 a-z A-Z / ! - _ . * ' ( )
	// and all printable non-ASCII characters
	// example:
	// s3:a/b/c S3:a/b/c represents the same file 'a/b/c' located in 'S3' service
	FilePath string

	// io entries
	// empty Entries is not allowed
	// when writing, overlapping Entries is not allowed
	Entries []IOEntry

	// ExpireAt specifies the expire time of the file
	// implementations may or may not delete the file after this time
	// zero value means no expire
	ExpireAt time.Time

	// Policy controls policy for the vector
	Policy Policy

	// Caches indicates extra caches to operate on
	Caches []IOVectorCache
}

func (*IOVector) Release added in v1.0.0

func (i *IOVector) Release()

type IOVectorCache added in v0.8.0

type IOVectorCache interface {
	Read(
		ctx context.Context,
		vector *IOVector,
	) error
	Update(
		ctx context.Context,
		vector *IOVector,
		async bool,
	) error
	Flush()
	//TODO file contents may change, so we still need this s.
	DeletePaths(
		ctx context.Context,
		paths []string,
	) error
}

VectorCache caches IOVector

type KeyRouterFactory added in v1.0.0

type KeyRouterFactory[T comparable] func() client.KeyRouter[T]

type LocalETLFS added in v0.6.0

type LocalETLFS struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

LocalETLFS is a FileService implementation backed by local file system and suitable for ETL operations

func NewLocalETLFS added in v0.6.0

func NewLocalETLFS(name string, rootPath string) (*LocalETLFS, error)

func (*LocalETLFS) Close added in v1.2.0

func (l *LocalETLFS) Close()

func (*LocalETLFS) Cost added in v1.2.1

func (l *LocalETLFS) Cost() *CostAttr

func (*LocalETLFS) Delete added in v0.6.0

func (l *LocalETLFS) Delete(ctx context.Context, filePaths ...string) error

func (*LocalETLFS) ETLCompatible added in v0.6.0

func (l *LocalETLFS) ETLCompatible()

func (*LocalETLFS) List added in v0.6.0

func (l *LocalETLFS) List(ctx context.Context, dirPath string) (ret []DirEntry, err error)

func (*LocalETLFS) Name added in v0.6.0

func (l *LocalETLFS) Name() string

func (*LocalETLFS) NewMutator added in v0.6.0

func (l *LocalETLFS) NewMutator(ctx context.Context, filePath string) (Mutator, error)

func (*LocalETLFS) PrefetchFile added in v1.0.1

func (l *LocalETLFS) PrefetchFile(ctx context.Context, filePath string) error

func (*LocalETLFS) Read added in v0.6.0

func (l *LocalETLFS) Read(ctx context.Context, vector *IOVector) error

func (*LocalETLFS) ReadCache added in v1.0.0

func (l *LocalETLFS) ReadCache(ctx context.Context, vector *IOVector) error

func (*LocalETLFS) StatFile added in v0.7.0

func (l *LocalETLFS) StatFile(ctx context.Context, filePath string) (*DirEntry, error)

func (*LocalETLFS) Write added in v0.6.0

func (l *LocalETLFS) Write(ctx context.Context, vector IOVector) error

type LocalETLFSMutator added in v0.6.0

type LocalETLFSMutator struct {
	// contains filtered or unexported fields
}

func (*LocalETLFSMutator) Append added in v0.6.0

func (l *LocalETLFSMutator) Append(ctx context.Context, entries ...IOEntry) error

func (*LocalETLFSMutator) Close added in v0.6.0

func (l *LocalETLFSMutator) Close() error

func (*LocalETLFSMutator) Mutate added in v0.6.0

func (l *LocalETLFSMutator) Mutate(ctx context.Context, entries ...IOEntry) error

type LocalFS

type LocalFS struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

LocalFS is a FileService implementation backed by local file system

func NewLocalFS

func NewLocalFS(
	ctx context.Context,
	name string,
	rootPath string,
	cacheConfig CacheConfig,
	perfCounterSets []*perfcounter.CounterSet,
) (*LocalFS, error)

func (*LocalFS) Close added in v1.2.0

func (l *LocalFS) Close()

func (*LocalFS) Cost added in v1.2.1

func (l *LocalFS) Cost() *CostAttr

func (*LocalFS) Delete

func (l *LocalFS) Delete(ctx context.Context, filePaths ...string) error

func (*LocalFS) FlushCache added in v0.6.0

func (l *LocalFS) FlushCache()

func (*LocalFS) List

func (l *LocalFS) List(ctx context.Context, dirPath string) (ret []DirEntry, err error)

func (*LocalFS) Name added in v0.6.0

func (l *LocalFS) Name() string

func (*LocalFS) NewMutator added in v0.6.0

func (l *LocalFS) NewMutator(ctx context.Context, filePath string) (Mutator, error)

func (*LocalFS) PrefetchFile added in v1.0.1

func (l *LocalFS) PrefetchFile(ctx context.Context, filePath string) error

func (*LocalFS) Read

func (l *LocalFS) Read(ctx context.Context, vector *IOVector) (err error)

func (*LocalFS) ReadCache added in v1.0.0

func (l *LocalFS) ReadCache(ctx context.Context, vector *IOVector) (err error)

func (*LocalFS) Replace added in v0.6.0

func (l *LocalFS) Replace(ctx context.Context, vector IOVector) error

func (*LocalFS) SetAsyncUpdate added in v0.8.0

func (l *LocalFS) SetAsyncUpdate(b bool)

func (*LocalFS) StatFile added in v0.7.0

func (l *LocalFS) StatFile(ctx context.Context, filePath string) (*DirEntry, error)

func (*LocalFS) Write

func (l *LocalFS) Write(ctx context.Context, vector IOVector) error

type LocalFSMutator added in v0.6.0

type LocalFSMutator struct {
	// contains filtered or unexported fields
}

func (*LocalFSMutator) Append added in v0.6.0

func (l *LocalFSMutator) Append(ctx context.Context, entries ...IOEntry) error

func (*LocalFSMutator) Close added in v0.6.0

func (l *LocalFSMutator) Close() error

func (*LocalFSMutator) Mutate added in v0.6.0

func (l *LocalFSMutator) Mutate(ctx context.Context, entries ...IOEntry) error

type MemCache added in v0.6.0

type MemCache struct {
	// contains filtered or unexported fields
}

func NewMemCache added in v0.6.0

func NewMemCache(
	dataCache *memorycache.Cache,
	counterSets []*perfcounter.CounterSet,
) *MemCache

func (*MemCache) Alloc added in v1.2.0

func (m *MemCache) Alloc(n int) memorycache.CacheData

func (*MemCache) DeletePaths added in v1.1.0

func (m *MemCache) DeletePaths(
	ctx context.Context,
	paths []string,
) error

func (*MemCache) Flush added in v0.6.0

func (m *MemCache) Flush()

func (*MemCache) Read added in v0.6.0

func (m *MemCache) Read(
	ctx context.Context,
	vector *IOVector,
) (
	err error,
)

func (*MemCache) Update added in v0.7.0

func (m *MemCache) Update(
	ctx context.Context,
	vector *IOVector,
	async bool,
) error

type MemoryFS

type MemoryFS struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

MemoryFS is an in-memory FileService implementation

func NewMemoryFS

func NewMemoryFS(
	name string,
	cacheConfig CacheConfig,
	perfCounterSets []*perfcounter.CounterSet,
) (*MemoryFS, error)

func (*MemoryFS) Close added in v1.2.0

func (m *MemoryFS) Close()

func (*MemoryFS) Cost added in v1.2.1

func (m *MemoryFS) Cost() *CostAttr

func (*MemoryFS) Delete

func (m *MemoryFS) Delete(ctx context.Context, filePaths ...string) error

func (*MemoryFS) ETLCompatible added in v0.6.0

func (m *MemoryFS) ETLCompatible()

func (*MemoryFS) List

func (m *MemoryFS) List(ctx context.Context, dirPath string) (entries []DirEntry, err error)

func (*MemoryFS) Name added in v0.6.0

func (m *MemoryFS) Name() string

func (*MemoryFS) PrefetchFile added in v1.0.1

func (m *MemoryFS) PrefetchFile(ctx context.Context, filePath string) error

func (*MemoryFS) Read

func (m *MemoryFS) Read(ctx context.Context, vector *IOVector) (err error)

func (*MemoryFS) ReadCache added in v1.0.0

func (m *MemoryFS) ReadCache(ctx context.Context, vector *IOVector) (err error)

func (*MemoryFS) Replace added in v0.6.0

func (m *MemoryFS) Replace(ctx context.Context, vector IOVector) error

func (*MemoryFS) StatFile added in v0.7.0

func (m *MemoryFS) StatFile(ctx context.Context, filePath string) (*DirEntry, error)

func (*MemoryFS) Write

func (m *MemoryFS) Write(ctx context.Context, vector IOVector) error

type MinioSDK added in v1.0.0

type MinioSDK struct {
	// contains filtered or unexported fields
}

func NewMinioSDK added in v1.0.0

func NewMinioSDK(
	ctx context.Context,
	args ObjectStorageArguments,
	perfCounterSets []*perfcounter.CounterSet,
) (*MinioSDK, error)

func (*MinioSDK) Delete added in v1.0.0

func (a *MinioSDK) Delete(
	ctx context.Context,
	keys ...string,
) (
	err error,
)

func (*MinioSDK) Exists added in v1.0.0

func (a *MinioSDK) Exists(
	ctx context.Context,
	key string,
) (
	bool,
	error,
)

func (*MinioSDK) List added in v1.0.0

func (a *MinioSDK) List(
	ctx context.Context,
	prefix string,
	fn func(bool, string, int64) (bool, error),
) error

func (*MinioSDK) Read added in v1.0.0

func (a *MinioSDK) Read(
	ctx context.Context,
	key string,
	min *int64,
	max *int64,
) (
	r io.ReadCloser,
	err error,
)

func (*MinioSDK) Stat added in v1.0.0

func (a *MinioSDK) Stat(
	ctx context.Context,
	key string,
) (
	size int64,
	err error,
)

func (*MinioSDK) Write added in v1.0.0

func (a *MinioSDK) Write(
	ctx context.Context,
	key string,
	r io.Reader,
	size int64,
	expire *time.Time,
) (
	err error,
)

type MutableFileService

type MutableFileService interface {
	FileService

	// NewMutator creates a new mutator
	NewMutator(ctx context.Context, filePath string) (Mutator, error)
}

MutableFileService is an extension interface to FileService that allow mutation

type Mutator added in v0.6.0

type Mutator interface {

	// Mutate mutates file contents
	Mutate(ctx context.Context, entries ...IOEntry) error

	// Append appends data to file
	// all IOEntry.Offset is base on the end of file position
	// for example, passing IOEntry{Offset: 0, Len:1, Data: []byte("a")} will append "a" to the end of file
	Append(ctx context.Context, entries ...IOEntry) error

	// Close closes the mutator
	// Must be called after finishing mutation
	Close() error
}

type NewFileServicesFunc added in v0.6.0

type NewFileServicesFunc = func(defaultName string) (*FileServices, error)

NewFileServicesFunc creates a new *FileServices

type ObjectStorage added in v1.0.0

type ObjectStorage interface {
	// List lists objects with specified prefix
	List(
		ctx context.Context,
		prefix string,
		fn func(isPrefix bool, key string, size int64) (bool, error),
	) (
		err error,
	)

	// Stat returns informations about an object
	Stat(
		ctx context.Context,
		key string,
	) (
		size int64,
		err error,
	)

	// Exists reports whether specified object exists
	Exists(
		ctx context.Context,
		key string,
	) (
		bool,
		error,
	)

	// Write writes an object
	Write(
		ctx context.Context,
		key string,
		r io.Reader,
		size int64,
		expire *time.Time,
	) (
		err error,
	)

	// Read returns an io.Reader for specified object range
	Read(
		ctx context.Context,
		key string,
		min *int64,
		max *int64,
	) (
		r io.ReadCloser,
		err error,
	)

	// Delete deletes objects
	Delete(
		ctx context.Context,
		keys ...string,
	) (
		err error,
	)
}

type ObjectStorageArguments added in v1.0.0

type ObjectStorageArguments struct {
	// misc
	Name                 string `toml:"name"`
	KeyPrefix            string `toml:"key-prefix"`
	SharedConfigProfile  string `toml:"shared-config-profile"`
	NoDefaultCredentials bool   `toml:"no-default-credentials"`
	NoBucketValidation   bool   `toml:"no-bucket-validation"`
	Concurrency          int64  `toml:"concurrency"`

	// s3
	Bucket    string   `toml:"bucket"`
	Endpoint  string   `toml:"endpoint"`
	IsMinio   bool     `toml:"is-minio"`
	Region    string   `toml:"region"`
	CertFiles []string `toml:"cert-files"`

	// credentials
	RoleARN         string `json:"-" toml:"role-arn"`
	BearerToken     string `json:"-" toml:"bearer-token"`
	ExternalID      string `json:"-" toml:"external-id"`
	KeyID           string `json:"-" toml:"key-id"`
	KeySecret       string `json:"-" toml:"key-secret"`
	RAMRole         string `json:"-" toml:"ram-role"`
	RoleSessionName string `json:"-" toml:"role-session-name"`
	SecurityToken   string `json:"-" toml:"security-token"`
	SessionToken    string `json:"-" toml:"session-token"`
}

func (*ObjectStorageArguments) SetFromString added in v1.0.0

func (o *ObjectStorageArguments) SetFromString(arguments []string) error

func (ObjectStorageArguments) String added in v1.0.1

func (o ObjectStorageArguments) String() string

type OnDiskCacheEvictFunc added in v1.0.0

type OnDiskCacheEvictFunc = func(
	diskFilePath string,
)

type OnDiskCacheWrittenFunc added in v1.0.0

type OnDiskCacheWrittenFunc = func(
	filePath string,
	entry IOEntry,
)

type Path added in v0.6.0

type Path struct {
	Service          string
	ServiceArguments []string
	File             string
}

func ParsePath added in v0.6.0

func ParsePath(s string) (path Path, err error)

func ParsePathAtService added in v0.6.0

func ParsePathAtService(s string, serviceStr string) (path Path, err error)

func (Path) ServiceString added in v0.8.0

func (p Path) ServiceString() string

func (Path) String added in v0.8.0

func (p Path) String() string

type Policy added in v1.0.0

type Policy uint64

func (Policy) Any added in v1.0.0

func (c Policy) Any(policies ...Policy) bool

func (Policy) CacheFullFile added in v1.0.0

func (c Policy) CacheFullFile() bool

func (Policy) CacheIOEntry added in v1.0.0

func (c Policy) CacheIOEntry() bool

type Pool added in v0.8.0

type Pool[T any] struct {
	// contains filtered or unexported fields
}

func NewPool added in v0.8.0

func NewPool[T any](
	capacity uint32,
	newFunc func() T,
	resetFunc func(T),
	finallyFunc func(T),
) *Pool[T]

func (*Pool[T]) Get added in v0.8.0

func (p *Pool[T]) Get(ptr *T) PutBack[T]

func (*Pool[T]) Put added in v0.8.0

func (p *Pool[T]) Put(idx int, ptr *T)

type PutBack added in v0.8.0

type PutBack[T any] struct {
	// contains filtered or unexported fields
}

func (PutBack[T]) Put added in v0.8.0

func (pb PutBack[T]) Put()

type RemoteCache added in v1.0.0

type RemoteCache struct {
	// contains filtered or unexported fields
}

RemoteCache is the cache for remote read.

func NewRemoteCache added in v1.0.0

func NewRemoteCache(client client.QueryClient, factory KeyRouterFactory[query.CacheKey]) *RemoteCache

func (*RemoteCache) DeletePaths added in v1.1.0

func (r *RemoteCache) DeletePaths(ctx context.Context, paths []string) error

func (*RemoteCache) Flush added in v1.0.0

func (r *RemoteCache) Flush()

func (*RemoteCache) Read added in v1.0.0

func (r *RemoteCache) Read(ctx context.Context, vector *IOVector) error

func (*RemoteCache) Update added in v1.0.0

func (r *RemoteCache) Update(ctx context.Context, vector *IOVector, async bool) error

type ReplaceableFileService added in v0.6.0

type ReplaceableFileService interface {
	FileService

	Replace(ctx context.Context, vector IOVector) error
}

ReplaceableFileService is an extension interface to FileService that allow replacing a whole file

type S3FS

type S3FS struct {
	// contains filtered or unexported fields
}

S3FS is a FileService implementation backed by S3

func NewS3FS

func NewS3FS(
	ctx context.Context,
	args ObjectStorageArguments,
	cacheConfig CacheConfig,
	perfCounterSets []*perfcounter.CounterSet,
	noCache bool,
	noDefaultCredential bool,
) (*S3FS, error)

func (*S3FS) Close added in v1.2.0

func (s *S3FS) Close()

func (*S3FS) Cost added in v1.2.1

func (s *S3FS) Cost() *CostAttr

func (*S3FS) Delete

func (s *S3FS) Delete(ctx context.Context, filePaths ...string) error

func (*S3FS) ETLCompatible added in v0.6.0

func (*S3FS) ETLCompatible()

func (*S3FS) FlushCache added in v0.6.0

func (s *S3FS) FlushCache()

func (*S3FS) List

func (s *S3FS) List(ctx context.Context, dirPath string) (entries []DirEntry, err error)

func (*S3FS) Name added in v0.6.0

func (s *S3FS) Name() string

func (*S3FS) PrefetchFile added in v1.0.1

func (s *S3FS) PrefetchFile(ctx context.Context, filePath string) error

func (*S3FS) Read

func (s *S3FS) Read(ctx context.Context, vector *IOVector) (err error)

func (*S3FS) ReadCache added in v1.0.0

func (s *S3FS) ReadCache(ctx context.Context, vector *IOVector) (err error)

func (*S3FS) SetAsyncUpdate added in v0.8.0

func (s *S3FS) SetAsyncUpdate(b bool)

func (*S3FS) StatFile added in v0.7.0

func (s *S3FS) StatFile(ctx context.Context, filePath string) (*DirEntry, error)

func (*S3FS) Write

func (s *S3FS) Write(ctx context.Context, vector IOVector) error

type TargetCacheKeys added in v1.0.0

type TargetCacheKeys map[string][]*query.RequestCacheKey

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL