fileservice

package
v0.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 30, 2023 License: Apache-2.0 Imports: 45 Imported by: 0

Documentation

Index

Constants

View Source
const DisableCacheCapacity = 1
View Source
const ServiceNameSeparator = ":"

Variables

View Source
var DisabledCacheConfig = CacheConfig{
	MemoryCapacity: DisableCacheCapacity,
	DiskCapacity:   DisableCacheCapacity,
}
View Source
var FSProfileHandler = NewProfileHandler()

Functions

func Get added in v0.6.0

func Get[T any](fs FileService, name string) (res T, err error)

func JoinPath added in v0.6.0

func JoinPath(serviceName string, path string) string

func NewFileWithChecksumOSFile added in v0.8.0

func NewFileWithChecksumOSFile(
	ctx context.Context,
	underlying *os.File,
	blockContentSize int,
	perfCounterSets []*perfcounter.CounterSet,
) (*FileWithChecksum[*os.File], PutBack[*FileWithChecksum[*os.File]])

Types

type CacheConfig added in v0.6.0

type CacheConfig struct {
	MemoryCapacity       toml.ByteSize `toml:"memory-capacity"`
	DiskPath             string        `toml:"disk-path"`
	DiskCapacity         toml.ByteSize `toml:"disk-capacity"`
	DiskMinEvictInterval toml.Duration `toml:"disk-min-evict-interval"`
	DiskEvictTarget      float64       `toml:"disk-evict-target"`
	// contains filtered or unexported fields
}

func (*CacheConfig) SetDefaults added in v0.8.0

func (c *CacheConfig) SetDefaults()

type CachingFileService added in v0.6.0

type CachingFileService interface {
	FileService

	// FlushCache flushes cache
	FlushCache()

	// SetAsyncUpdate sets cache update operation to async mode
	SetAsyncUpdate(bool)
}

CachingFileService is an extension to the FileService

type Config added in v0.6.0

type Config struct {
	// Name name of fileservice, describe what an instance of fileservice is used for
	Name string `toml:"name"`
	// Backend fileservice backend. [MEM|DISK|DISK-ETL|S3|MINIO]
	Backend string `toml:"backend"`
	// S3 used to create fileservice using s3 as the backend
	S3 S3Config `toml:"s3"`
	// Cache specifies configs for cache
	Cache CacheConfig `toml:"cache"`
	// DataDir used to create fileservice using DISK as the backend
	DataDir string `toml:"data-dir"`
}

Config fileService config

type DirEntry

type DirEntry struct {
	// file name, not full path
	Name  string
	IsDir bool
	Size  int64
}

DirEntry is a file or dir

type DiskCache added in v0.7.0

type DiskCache struct {
	// contains filtered or unexported fields
}

func NewDiskCache added in v0.7.0

func NewDiskCache(
	ctx context.Context,
	path string,
	capacity int64,
	evictInterval time.Duration,
	evictTarget float64,
	perfCounterSets []*perfcounter.CounterSet,
) (*DiskCache, error)

func (*DiskCache) Flush added in v0.7.0

func (d *DiskCache) Flush()

func (*DiskCache) GetFileContent added in v0.8.0

func (d *DiskCache) GetFileContent(ctx context.Context, filePath string, offset int64) (r io.ReadCloser, err error)

func (*DiskCache) Read added in v0.7.0

func (d *DiskCache) Read(
	ctx context.Context,
	vector *IOVector,
) (
	err error,
)

func (*DiskCache) SetFileContent added in v0.8.0

func (d *DiskCache) SetFileContent(
	ctx context.Context,
	filePath string,
	readFunc func(context.Context, *IOVector) error,
) (err error)

func (*DiskCache) Update added in v0.7.0

func (d *DiskCache) Update(
	ctx context.Context,
	vector *IOVector,
	async bool,
) (
	err error,
)

type ETLFileService added in v0.6.0

type ETLFileService interface {
	FileService

	// ETLCompatible marks the implementation to be compatible to ETL operations
	// implementations must save file contents as-is
	ETLCompatible()
}

ETLFileService is an extension to the FileService

func GetForETL added in v0.6.0

func GetForETL(fs FileService, path string) (res ETLFileService, readPath string, err error)

GetForETL get or creates a FileService instance for ETL operations if service part of path is empty, a LocalETLFS will be created if service part of path is not empty, a ETLFileService typed instance will be extracted from fs argument if service part of path is argumented, a FileService instance will be created dynamically with those arguments supported dynamic file service: s3,<endpoint>,<region>,<bucket>,<key>,<secret>,<prefix> s3-no-key,<endpoint>,<region>,<bucket>,<prefix> minio,<endpoint>,<region>,<bucket>,<key>,<secret>,<prefix> s3-opts,endpoint=<endpoint>,region=<region>,bucket=<bucket>,key=<key>,secret=<secret>,prefix=<prefix>,role-arn=<role arn>,external-id=<external id>

key value pairs can be in any order

type FileContentCache added in v0.8.0

type FileContentCache interface {
	GetFileContent(
		ctx context.Context,
		path string,
		offset int64,
	) (
		r io.ReadCloser,
		err error,
	)

	SetFileContent(
		ctx context.Context,
		path string,
		readFunc func(ctx context.Context, vec *IOVector) error,
	) (
		err error,
	)
}

FileContentCache caches contents of files

type FileLike added in v0.6.0

type FileLike interface {
	io.ReadWriteSeeker
	io.WriterAt
	io.ReaderAt
}

type FileService

type FileService interface {
	// Name is file service's name
	// service name is case-insensitive
	Name() string

	// Write writes a new file
	// returns ErrFileExisted if file already existed
	// returns ErrSizeNotMatch if provided size does not match data
	// entries in vector should be written atomically. if write failed, following reads must not succeed.
	Write(ctx context.Context, vector IOVector) error

	// Read reads a file to fill IOEntries
	// returns ErrFileNotFound if requested file not found
	// returns ErrUnexpectedEOF if less data is read than requested size
	// returns ErrEmptyRange if no data at specified offset and size
	// returns ErrEmptyVector if no IOEntry is passed
	Read(ctx context.Context, vector *IOVector) error

	// List lists sub-entries in a dir
	List(ctx context.Context, dirPath string) ([]DirEntry, error)

	// Delete deletes multi file
	// returns ErrFileNotFound if requested file not found
	Delete(ctx context.Context, filePaths ...string) error

	// Stat returns infomations about a file
	// returns ErrFileNotFound if requested file not found
	StatFile(ctx context.Context, filePath string) (*DirEntry, error)

	// Preload indicates the service to preload a file
	Preload(ctx context.Context, filePath string) error
}

FileService is a write-once file system

func NewFileService added in v0.6.0

func NewFileService(ctx context.Context, cfg Config, perfCounterSets []*perfcounter.CounterSet) (FileService, error)

NewFileService create file service from config

func SubPath added in v0.8.0

func SubPath(upstream FileService, path string) FileService

SubPath returns a FileService instance that operates at specified sub path of the upstream instance

type FileServices added in v0.6.0

type FileServices struct {
	// contains filtered or unexported fields
}

func NewFileServices added in v0.6.0

func NewFileServices(defaultName string, fss ...FileService) (*FileServices, error)

func (*FileServices) Delete added in v0.6.0

func (f *FileServices) Delete(ctx context.Context, filePaths ...string) error

func (*FileServices) List added in v0.6.0

func (f *FileServices) List(ctx context.Context, dirPath string) ([]DirEntry, error)

func (*FileServices) Name added in v0.6.0

func (f *FileServices) Name() string

func (*FileServices) Preload added in v0.8.0

func (f *FileServices) Preload(ctx context.Context, dirPath string) error

func (*FileServices) Read added in v0.6.0

func (f *FileServices) Read(ctx context.Context, vector *IOVector) error

func (*FileServices) StatFile added in v0.7.0

func (f *FileServices) StatFile(ctx context.Context, filePath string) (*DirEntry, error)

func (*FileServices) Write added in v0.6.0

func (f *FileServices) Write(ctx context.Context, vector IOVector) error

type FileWithChecksum added in v0.6.0

type FileWithChecksum[T FileLike] struct {
	// contains filtered or unexported fields
}

FileWithChecksum maps file contents to blocks with checksum

func NewFileWithChecksum added in v0.6.0

func NewFileWithChecksum[T FileLike](
	ctx context.Context,
	underlying T,
	blockContentSize int,
	perfCounterSets []*perfcounter.CounterSet,
) *FileWithChecksum[T]

func (*FileWithChecksum[T]) Read added in v0.6.0

func (f *FileWithChecksum[T]) Read(buf []byte) (n int, err error)

func (*FileWithChecksum[T]) ReadAt added in v0.6.0

func (f *FileWithChecksum[T]) ReadAt(buf []byte, offset int64) (n int, err error)

func (*FileWithChecksum[T]) Seek added in v0.6.0

func (f *FileWithChecksum[T]) Seek(offset int64, whence int) (int64, error)

func (*FileWithChecksum[T]) Write added in v0.6.0

func (f *FileWithChecksum[T]) Write(buf []byte) (n int, err error)

func (*FileWithChecksum[T]) WriteAt added in v0.6.0

func (f *FileWithChecksum[T]) WriteAt(buf []byte, offset int64) (n int, err error)

type IOEntry

type IOEntry struct {
	// offset in file
	// when writing or mutating, offset can be arbitrary value, gaps between provided data are zero-filled
	// when reading, valid offsets are in range [0, len(file) - 1]
	Offset int64

	// number of bytes to read or write, [1, len(file)]
	// when reading, pass -1 to read to the end of file
	Size int64

	// raw content
	// when reading, if len(Data) < Size, a new Size-lengthed byte slice will be allocated
	Data []byte

	// when reading, if Writer is not nil, write data to it instead of setting Data field
	WriterForRead io.Writer

	// when reading, if ReadCloser is not nil, set an io.ReadCloser instead of setting Data field
	ReadCloserForRead *io.ReadCloser

	// when writing, if Reader is not nil, read data from it instead of reading Data field
	// number of bytes to be read is specified by Size field
	// if number of bytes is unknown, set Size field to -1
	ReaderForWrite io.Reader

	// When reading, if the ToObjectBytes field is not nil, the returning object's byte slice will be set to this field
	// Data, WriterForRead, ReadCloserForRead may be empty if ObjectBytes is not null
	// if ToObjectBytes is provided, caller should always read ObjectBytes instead of Data, WriterForRead or ReadCloserForRead
	ObjectBytes []byte

	// ToObjectBytes constructs an object byte slice from entry contents
	// reader or data must not be retained after returns
	// reader always contains entry contents
	// data may contains entry contents if available
	// if data is empty, the io.Reader must be fully read before returning nil error
	ToObjectBytes func(reader io.Reader, data []byte) (object []byte, objectSize int64, err error)

	// ObjectSize indicates the memory bytes to hold the object
	// set from ToObjectBytes returning value
	// used in capacity limited caches
	ObjectSize int64
	// contains filtered or unexported fields
}

func (*IOEntry) ReadFromOSFile added in v0.8.0

func (e *IOEntry) ReadFromOSFile(file *os.File) error

type IOVector

type IOVector struct {
	// FilePath indicates where to find the file
	// a path has two parts, service name and file name, separated by ':'
	// service name is optional, if omitted, the receiver FileService will use the default name of the service
	// file name parts are separated by '/'
	// valid characters in file name: 0-9 a-z A-Z / ! - _ . * ' ( )
	// and all printable non-ASCII characters
	// example:
	// s3:a/b/c S3:a/b/c represents the same file 'a/b/c' located in 'S3' service
	FilePath string
	// io entries
	// empty Entries is not allowed
	// when writing, overlapping Entries is not allowed
	Entries []IOEntry
	// ExpireAt specifies the expire time of the file
	// implementations may or may not delete the file after this time
	// zero value means no expire
	ExpireAt time.Time
	// NoCache true, means the data NOT read/update FileService cache.
	NoCache bool
	// Preloading indicates whether the I/O is for preloading
	Preloading bool
}

type IOVectorCache added in v0.8.0

type IOVectorCache interface {
	Read(
		ctx context.Context,
		vector *IOVector,
	) error
	Update(
		ctx context.Context,
		vector *IOVector,
		async bool,
	) error
	Flush()
}

VectorCache caches IOVector

type IOVectorCacheKey added in v0.8.0

type IOVectorCacheKey struct {
	Path   string
	Offset int64
	Size   int64
}

type LocalETLFS added in v0.6.0

type LocalETLFS struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

LocalETLFS is a FileService implementation backed by local file system and suitable for ETL operations

func NewLocalETLFS added in v0.6.0

func NewLocalETLFS(name string, rootPath string) (*LocalETLFS, error)

func (*LocalETLFS) Delete added in v0.6.0

func (l *LocalETLFS) Delete(ctx context.Context, filePaths ...string) error

func (*LocalETLFS) ETLCompatible added in v0.6.0

func (l *LocalETLFS) ETLCompatible()

func (*LocalETLFS) List added in v0.6.0

func (l *LocalETLFS) List(ctx context.Context, dirPath string) (ret []DirEntry, err error)

func (*LocalETLFS) Name added in v0.6.0

func (l *LocalETLFS) Name() string

func (*LocalETLFS) NewMutator added in v0.6.0

func (l *LocalETLFS) NewMutator(ctx context.Context, filePath string) (Mutator, error)

func (*LocalETLFS) Preload added in v0.8.0

func (l *LocalETLFS) Preload(ctx context.Context, filePath string) error

func (*LocalETLFS) Read added in v0.6.0

func (l *LocalETLFS) Read(ctx context.Context, vector *IOVector) error

func (*LocalETLFS) StatFile added in v0.7.0

func (l *LocalETLFS) StatFile(ctx context.Context, filePath string) (*DirEntry, error)

func (*LocalETLFS) Write added in v0.6.0

func (l *LocalETLFS) Write(ctx context.Context, vector IOVector) error

type LocalETLFSMutator added in v0.6.0

type LocalETLFSMutator struct {
	// contains filtered or unexported fields
}

func (*LocalETLFSMutator) Append added in v0.6.0

func (l *LocalETLFSMutator) Append(ctx context.Context, entries ...IOEntry) error

func (*LocalETLFSMutator) Close added in v0.6.0

func (l *LocalETLFSMutator) Close() error

func (*LocalETLFSMutator) Mutate added in v0.6.0

func (l *LocalETLFSMutator) Mutate(ctx context.Context, entries ...IOEntry) error

type LocalFS

type LocalFS struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

LocalFS is a FileService implementation backed by local file system

func NewLocalFS

func NewLocalFS(
	ctx context.Context,
	name string,
	rootPath string,
	cacheConfig CacheConfig,
	perfCounterSets []*perfcounter.CounterSet,
) (*LocalFS, error)

func (*LocalFS) Delete

func (l *LocalFS) Delete(ctx context.Context, filePaths ...string) error

func (*LocalFS) FlushCache added in v0.6.0

func (l *LocalFS) FlushCache()

func (*LocalFS) List

func (l *LocalFS) List(ctx context.Context, dirPath string) (ret []DirEntry, err error)

func (*LocalFS) Name added in v0.6.0

func (l *LocalFS) Name() string

func (*LocalFS) NewMutator added in v0.6.0

func (l *LocalFS) NewMutator(ctx context.Context, filePath string) (Mutator, error)

func (*LocalFS) Preload added in v0.8.0

func (l *LocalFS) Preload(ctx context.Context, filePath string) error

func (*LocalFS) Read

func (l *LocalFS) Read(ctx context.Context, vector *IOVector) (err error)

func (*LocalFS) Replace added in v0.6.0

func (l *LocalFS) Replace(ctx context.Context, vector IOVector) error

func (*LocalFS) SetAsyncUpdate added in v0.8.0

func (l *LocalFS) SetAsyncUpdate(b bool)

func (*LocalFS) StatFile added in v0.7.0

func (l *LocalFS) StatFile(ctx context.Context, filePath string) (*DirEntry, error)

func (*LocalFS) Write

func (l *LocalFS) Write(ctx context.Context, vector IOVector) error

type LocalFSMutator added in v0.6.0

type LocalFSMutator struct {
	// contains filtered or unexported fields
}

func (*LocalFSMutator) Append added in v0.6.0

func (l *LocalFSMutator) Append(ctx context.Context, entries ...IOEntry) error

func (*LocalFSMutator) Close added in v0.6.0

func (l *LocalFSMutator) Close() error

func (*LocalFSMutator) Mutate added in v0.6.0

func (l *LocalFSMutator) Mutate(ctx context.Context, entries ...IOEntry) error

type MemCache added in v0.6.0

type MemCache struct {
	// contains filtered or unexported fields
}

func NewMemCache added in v0.6.0

func NewMemCache(opts ...MemCacheOptionFunc) *MemCache

func (*MemCache) Flush added in v0.6.0

func (m *MemCache) Flush()

func (*MemCache) Read added in v0.6.0

func (m *MemCache) Read(
	ctx context.Context,
	vector *IOVector,
) (
	err error,
)

func (*MemCache) Update added in v0.7.0

func (m *MemCache) Update(
	ctx context.Context,
	vector *IOVector,
	async bool,
) error

type MemCacheOptionFunc added in v0.8.0

type MemCacheOptionFunc func(*memCacheOptions)

func WithClock added in v0.8.0

func WithClock(capacity int64) MemCacheOptionFunc

func WithLRU added in v0.8.0

func WithLRU(capacity int64) MemCacheOptionFunc

func WithPerfCounterSets added in v0.8.0

func WithPerfCounterSets(counterSets []*perfcounter.CounterSet) MemCacheOptionFunc

type MemoryFS

type MemoryFS struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

MemoryFS is an in-memory FileService implementation

func NewMemoryFS

func NewMemoryFS(
	name string,
	cacheConfig CacheConfig,
	perfCounterSets []*perfcounter.CounterSet,
) (*MemoryFS, error)

func (*MemoryFS) Delete

func (m *MemoryFS) Delete(ctx context.Context, filePaths ...string) error

func (*MemoryFS) ETLCompatible added in v0.6.0

func (m *MemoryFS) ETLCompatible()

func (*MemoryFS) List

func (m *MemoryFS) List(ctx context.Context, dirPath string) (entries []DirEntry, err error)

func (*MemoryFS) Name added in v0.6.0

func (m *MemoryFS) Name() string

func (*MemoryFS) Preload added in v0.8.0

func (m *MemoryFS) Preload(ctx context.Context, filePath string) error

func (*MemoryFS) Read

func (m *MemoryFS) Read(ctx context.Context, vector *IOVector) (err error)

func (*MemoryFS) Replace added in v0.6.0

func (m *MemoryFS) Replace(ctx context.Context, vector IOVector) error

func (*MemoryFS) StatFile added in v0.7.0

func (m *MemoryFS) StatFile(ctx context.Context, filePath string) (*DirEntry, error)

func (*MemoryFS) Write

func (m *MemoryFS) Write(ctx context.Context, vector IOVector) error

type MutableFileService

type MutableFileService interface {
	FileService

	// NewMutator creates a new mutator
	NewMutator(ctx context.Context, filePath string) (Mutator, error)
}

MutableFileService is an extension interface to FileService that allow mutation

type Mutator added in v0.6.0

type Mutator interface {

	// Mutate mutates file contents
	Mutate(ctx context.Context, entries ...IOEntry) error

	// Append appends data to file
	// all IOEntry.Offset is base on the end of file position
	// for example, passing IOEntry{Offset: 0, Len:1, Data: []byte("a")} will append "a" to the end of file
	Append(ctx context.Context, entries ...IOEntry) error

	// Close closes the mutator
	// Must be called after finishing mutation
	Close() error
}

type NewFileServicesFunc added in v0.6.0

type NewFileServicesFunc = func(defaultName string) (*FileServices, error)

NewFileServicesFunc creates a new *FileServices

type ObjectCache added in v0.8.0

type ObjectCache interface {
	Set(key any, value []byte, size int64, preloading bool)
	Get(key any, preloading bool) (value []byte, size int64, ok bool)
	Flush()
	Capacity() int64
	Used() int64
	Available() int64
}

ObjectCache caches IOEntry.ObjectBytes

type Path added in v0.6.0

type Path struct {
	Service          string
	ServiceArguments []string
	File             string
}

func ParsePath added in v0.6.0

func ParsePath(s string) (path Path, err error)

func ParsePathAtService added in v0.6.0

func ParsePathAtService(s string, serviceStr string) (path Path, err error)

func (Path) ServiceString added in v0.8.0

func (p Path) ServiceString() string

func (Path) String added in v0.8.0

func (p Path) String() string

type Pool added in v0.8.0

type Pool[T any] struct {
	// contains filtered or unexported fields
}

func NewPool added in v0.8.0

func NewPool[T any](
	capacity uint32,
	newFunc func() T,
	resetFunc func(T),
	finallyFunc func(T),
) *Pool[T]

func (*Pool[T]) Get added in v0.8.0

func (p *Pool[T]) Get(ptr *T) PutBack[T]

func (*Pool[T]) Put added in v0.8.0

func (p *Pool[T]) Put(idx int, ptr *T)

type ProfileHandler added in v0.8.0

type ProfileHandler struct {
	// contains filtered or unexported fields
}

func NewProfileHandler added in v0.8.0

func NewProfileHandler() *ProfileHandler

func (*ProfileHandler) AddSample added in v0.8.0

func (p *ProfileHandler) AddSample()

func (*ProfileHandler) ServeHTTP added in v0.8.0

func (p *ProfileHandler) ServeHTTP(w http.ResponseWriter, req *http.Request)

func (*ProfileHandler) StartProfile added in v0.8.0

func (p *ProfileHandler) StartProfile(w io.Writer) (stop func())

type PutBack added in v0.8.0

type PutBack[T any] struct {
	// contains filtered or unexported fields
}

func (PutBack[T]) Put added in v0.8.0

func (pb PutBack[T]) Put()

type ReplaceableFileService added in v0.6.0

type ReplaceableFileService interface {
	FileService

	Replace(ctx context.Context, vector IOVector) error
}

ReplaceableFileService is an extension interface to FileService that allow replacing a whole file

type S3Config

type S3Config struct {
	SharedConfigProfile string `toml:"shared-config-profile"`
	Endpoint            string `toml:"endpoint"`
	Bucket              string `toml:"bucket"`
	// KeyPrefix enables multiple fs instances in one bucket
	KeyPrefix string `toml:"key-prefix"`
}

type S3FS

type S3FS struct {
	// contains filtered or unexported fields
}

S3FS is a FileService implementation backed by S3

func NewS3FS

func NewS3FS(
	ctx context.Context,
	sharedConfigProfile string,
	name string,
	endpoint string,
	bucket string,
	keyPrefix string,
	cacheConfig CacheConfig,
	perfCounterSets []*perfcounter.CounterSet,
	noCache bool,
) (*S3FS, error)

func NewS3FSOnMinio added in v0.6.0

func NewS3FSOnMinio(
	ctx context.Context,
	sharedConfigProfile string,
	name string,
	endpoint string,
	bucket string,
	keyPrefix string,
	cacheConfig CacheConfig,
	perfCounterSets []*perfcounter.CounterSet,
	noCache bool,
) (*S3FS, error)

NewS3FSOnMinio creates S3FS on minio server this is needed because the URL scheme of minio server does not compatible with AWS'

func (*S3FS) Delete

func (s *S3FS) Delete(ctx context.Context, filePaths ...string) error

func (*S3FS) ETLCompatible added in v0.6.0

func (*S3FS) ETLCompatible()

func (*S3FS) FlushCache added in v0.6.0

func (s *S3FS) FlushCache()

func (*S3FS) List

func (s *S3FS) List(ctx context.Context, dirPath string) (entries []DirEntry, err error)

func (*S3FS) Name added in v0.6.0

func (s *S3FS) Name() string

func (*S3FS) Preload added in v0.8.0

func (s *S3FS) Preload(ctx context.Context, filePath string) error

func (*S3FS) Read

func (s *S3FS) Read(ctx context.Context, vector *IOVector) (err error)

func (*S3FS) SetAsyncUpdate added in v0.8.0

func (s *S3FS) SetAsyncUpdate(b bool)

func (*S3FS) StatFile added in v0.7.0

func (s *S3FS) StatFile(ctx context.Context, filePath string) (*DirEntry, error)

func (*S3FS) Write

func (s *S3FS) Write(ctx context.Context, vector IOVector) error

Directories

Path Synopsis
objcache

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL