fileservice

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 18, 2023 License: Apache-2.0 Imports: 36 Imported by: 0

Documentation

Index

Constants

View Source
const ServiceNameSeparator = ":"

Variables

View Source
var (
	ErrChecksumNotMatch = moerr.NewInternalErrorNoCtx("checksum not match")
)

Functions

func Get added in v0.6.0

func Get[T any](fs FileService, name string) (res T, err error)

func JoinPath added in v0.6.0

func JoinPath(serviceName string, path string) string

Types

type Cache added in v0.7.0

type Cache interface {
	Read(
		ctx context.Context,
		vector *IOVector,
	) error
	Update(
		ctx context.Context,
		vector *IOVector,
	) error
	Flush()
	CacheStats() *CacheStats
}

type CacheConfig added in v0.6.0

type CacheConfig struct {
	MemoryCapacity toml.ByteSize `toml:"memory-capacity"`
	DiskPath       string        `toml:"disk-path"`
	DiskCapacity   toml.ByteSize `toml:"disk-capacity"`
}

type CacheKey added in v0.6.0

type CacheKey struct {
	Path   string
	Offset int64
	Size   int64
}

type CacheStats added in v0.6.0

type CacheStats struct {
	NumRead int64
	NumHit  int64
}

type CachingFileService added in v0.6.0

type CachingFileService interface {
	FileService

	// FlushCache flushes cache
	FlushCache()

	// CacheStats returns cache statistics
	CacheStats() *CacheStats
}

CachingFileService is an extension to the FileService

type Config added in v0.6.0

type Config struct {
	// Name name of fileservice, describe what an instance of fileservice is used for
	Name string `toml:"name"`
	// Backend fileservice backend. [MEM|DISK|DISK-ETL|S3|MINIO]
	Backend string `toml:"backend"`
	// S3 used to create fileservice using s3 as the backend
	S3 S3Config `toml:"s3"`
	// Cache specifies configs for cache
	Cache CacheConfig `toml:"cache"`
	// DataDir used to create fileservice using DISK as the backend
	DataDir string `toml:"data-dir"`
}

Config fileService config

type DirEntry

type DirEntry struct {
	// file name, not full path
	Name  string
	IsDir bool
	Size  int64
}

DirEntry is a file or dir

type DiskCache added in v0.7.0

type DiskCache struct {
	// contains filtered or unexported fields
}

func NewDiskCache added in v0.7.0

func NewDiskCache(path string, capacity int64) (*DiskCache, error)

func (*DiskCache) CacheStats added in v0.7.0

func (d *DiskCache) CacheStats() *CacheStats

func (*DiskCache) Flush added in v0.7.0

func (d *DiskCache) Flush()

func (*DiskCache) Read added in v0.7.0

func (d *DiskCache) Read(
	ctx context.Context,
	vector *IOVector,
) (
	err error,
)

func (*DiskCache) Update added in v0.7.0

func (d *DiskCache) Update(
	ctx context.Context,
	vector *IOVector,
) (
	err error,
)

type ETLFileService added in v0.6.0

type ETLFileService interface {
	FileService

	// ETLCompatible marks the implementation to be compatible to ETL operations
	// implementations must save file contents as-is
	ETLCompatible()
}

ETLFileService is an extension to the FileService

func GetForETL added in v0.6.0

func GetForETL(fs FileService, path string) (res ETLFileService, readPath string, err error)

GetForETL get or creates a FileService instance for ETL operations if service part of path is empty, a LocalETLFS will be created if service part of path is not empty, a ETLFileService typed instance will be extracted from fs argument if service part of path is argumented, a FileService instance will be created dynamically with those arguments supported dynamic file service: s3,<endpoint>,<region>,<bucket>,<key>,<secret>,<prefix> s3-no-key,<endpoint>,<region>,<bucket>,<prefix> minio,<endpoint>,<region>,<bucket>,<key>,<secret>,<prefix> s3-opts,endpoint=<endpoint>,region=<region>,bucket=<bucket>,key=<key>,secret=<secret>,prefix=<prefix>,role-arn=<role arn>,external-id=<external id>

key value pairs can be in any order

type FileLike added in v0.6.0

type FileLike interface {
	io.ReadWriteSeeker
	io.WriterAt
	io.ReaderAt
}

type FileService

type FileService interface {
	// Name is file service's name
	// service name is case-insensitive
	Name() string

	// Write writes a new file
	// returns ErrFileExisted if file already existed
	// returns ErrSizeNotMatch if provided size does not match data
	// entries in vector should be written atomically. if write failed, following reads must not succeed.
	Write(ctx context.Context, vector IOVector) error

	// Read reads a file to fill IOEntries
	// returns ErrFileNotFound if requested file not found
	// returns ErrUnexpectedEOF if less data is read than requested size
	// returns ErrEmptyRange if no data at specified offset and size
	// returns ErrEmptyVector if no IOEntry is passed
	Read(ctx context.Context, vector *IOVector) error

	// List lists sub-entries in a dir
	List(ctx context.Context, dirPath string) ([]DirEntry, error)

	// Delete deletes multi file
	// returns ErrFileNotFound if requested file not found
	Delete(ctx context.Context, filePaths ...string) error

	// Stat returns infomations about a file
	// returns ErrFileNotFound if requested file not found
	StatFile(ctx context.Context, filePath string) (*DirEntry, error)
}

FileService is a write-once file system

func NewFileService added in v0.6.0

func NewFileService(cfg Config) (FileService, error)

NewFileService create file service from config

type FileServices added in v0.6.0

type FileServices struct {
	// contains filtered or unexported fields
}

func NewFileServices added in v0.6.0

func NewFileServices(defaultName string, fss ...FileService) (*FileServices, error)

func (*FileServices) Delete added in v0.6.0

func (f *FileServices) Delete(ctx context.Context, filePaths ...string) error

func (*FileServices) List added in v0.6.0

func (f *FileServices) List(ctx context.Context, dirPath string) ([]DirEntry, error)

func (*FileServices) Name added in v0.6.0

func (f *FileServices) Name() string

func (*FileServices) Read added in v0.6.0

func (f *FileServices) Read(ctx context.Context, vector *IOVector) error

func (*FileServices) StatFile added in v0.7.0

func (f *FileServices) StatFile(ctx context.Context, filePath string) (*DirEntry, error)

func (*FileServices) Write added in v0.6.0

func (f *FileServices) Write(ctx context.Context, vector IOVector) error

type FileWithChecksum added in v0.6.0

type FileWithChecksum[T FileLike] struct {
	// contains filtered or unexported fields
}

FileWithChecksum maps file contents to blocks with checksum

func NewFileWithChecksum added in v0.6.0

func NewFileWithChecksum[T FileLike](
	underlying T,
	blockContentSize int,
) *FileWithChecksum[T]

func (*FileWithChecksum[T]) Read added in v0.6.0

func (f *FileWithChecksum[T]) Read(buf []byte) (n int, err error)

func (*FileWithChecksum[T]) ReadAt added in v0.6.0

func (f *FileWithChecksum[T]) ReadAt(buf []byte, offset int64) (n int, err error)

func (*FileWithChecksum[T]) Seek added in v0.6.0

func (f *FileWithChecksum[T]) Seek(offset int64, whence int) (int64, error)

func (*FileWithChecksum[T]) Write added in v0.6.0

func (f *FileWithChecksum[T]) Write(buf []byte) (n int, err error)

func (*FileWithChecksum[T]) WriteAt added in v0.6.0

func (f *FileWithChecksum[T]) WriteAt(buf []byte, offset int64) (n int, err error)

type IOEntry

type IOEntry struct {
	// offset in file
	// when writing or mutating, offset can be arbitrary value, gaps between provided data are zero-filled
	// when reading, valid offsets are in range [0, len(file) - 1]
	Offset int64

	// number of bytes to read or write, [1, len(file)]
	// when reading, pass -1 to read to the end of file
	Size int64

	// raw content
	// when reading, if len(Data) < Size, a new Size-lengthed byte slice will be allocated
	Data []byte

	// when reading, if Writer is not nil, write data to it instead of setting Data field
	WriterForRead io.Writer

	// when reading, if ReadCloser is not nil, set an io.ReadCloser instead of setting Data field
	ReadCloserForRead *io.ReadCloser

	// when writing, if Reader is not nil, read data from it instead of reading Data field
	// number of bytes to be read is specified by Size field
	// if number of bytes is unknown, set Size field to -1
	ReaderForWrite io.Reader

	// when reading, if the ToObject field is not nil, the returning object will be set to this field
	// caches may choose to cache this object instead of caching []byte
	// Data, WriterForRead, ReadCloserForRead may be empty if Object is not null
	// if ToObject is provided, caller should always read Object instead of Data, WriterForRead or ReadCloserForRead
	Object any

	// ToObject constructs an object from entry contents
	// reader or data must not be retained after returns
	// reader always contains entry contents
	// data may contains entry contents if available
	// if data is empty, the io.Reader must be fully read before returning nil error
	// return an *RC value to make the object pinnable
	// cache implementations should not evict an *RC value with non-zero reference
	ToObject func(reader io.Reader, data []byte) (object any, objectSize int64, err error)

	// ObjectSize indicates the memory bytes to hold the object
	// set from ToObject returning value
	// used in capacity limited caches
	ObjectSize int64
	// contains filtered or unexported fields
}

type IOVector

type IOVector struct {
	// FilePath indicates where to find the file
	// a path has two parts, service name and file name, separated by ':'
	// service name is optional, if omitted, the receiver FileService will use the default name of the service
	// file name parts are separated by '/'
	// valid characters in file name: 0-9 a-z A-Z / ! - _ . * ' ( )
	// and all printable non-ASCII characters
	// example:
	// s3:a/b/c S3:a/b/c represents the same file 'a/b/c' located in 'S3' service
	FilePath string
	// io entries
	// empty Entries is not allowed
	// when writing, overlapping Entries is not allowed
	Entries []IOEntry
	// ExpireAt specifies the expire time of the file
	// implementations may or may not delete the file after this time
	// zero value means no expire
	ExpireAt time.Time
}

type LRU added in v0.6.0

type LRU struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewLRU added in v0.6.0

func NewLRU(capacity int64) *LRU

func (*LRU) Flush added in v0.6.0

func (l *LRU) Flush()

func (*LRU) Get added in v0.6.0

func (l *LRU) Get(key any) (value any, size int64, ok bool)

func (*LRU) Set added in v0.6.0

func (l *LRU) Set(key any, value any, size int64)

type LocalETLFS added in v0.6.0

type LocalETLFS struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

LocalETLFS is a FileService implementation backed by local file system and suitable for ETL operations

func NewLocalETLFS added in v0.6.0

func NewLocalETLFS(name string, rootPath string) (*LocalETLFS, error)

func (*LocalETLFS) Delete added in v0.6.0

func (l *LocalETLFS) Delete(ctx context.Context, filePaths ...string) error

func (*LocalETLFS) ETLCompatible added in v0.6.0

func (l *LocalETLFS) ETLCompatible()

func (*LocalETLFS) List added in v0.6.0

func (l *LocalETLFS) List(ctx context.Context, dirPath string) (ret []DirEntry, err error)

func (*LocalETLFS) Name added in v0.6.0

func (l *LocalETLFS) Name() string

func (*LocalETLFS) NewMutator added in v0.6.0

func (l *LocalETLFS) NewMutator(filePath string) (Mutator, error)

func (*LocalETLFS) Read added in v0.6.0

func (l *LocalETLFS) Read(ctx context.Context, vector *IOVector) error

func (*LocalETLFS) StatFile added in v0.7.0

func (l *LocalETLFS) StatFile(ctx context.Context, filePath string) (*DirEntry, error)

func (*LocalETLFS) Write added in v0.6.0

func (l *LocalETLFS) Write(ctx context.Context, vector IOVector) error

type LocalETLFSMutator added in v0.6.0

type LocalETLFSMutator struct {
	// contains filtered or unexported fields
}

func (*LocalETLFSMutator) Append added in v0.6.0

func (l *LocalETLFSMutator) Append(ctx context.Context, entries ...IOEntry) error

func (*LocalETLFSMutator) Close added in v0.6.0

func (l *LocalETLFSMutator) Close() error

func (*LocalETLFSMutator) Mutate added in v0.6.0

func (l *LocalETLFSMutator) Mutate(ctx context.Context, entries ...IOEntry) error

type LocalFS

type LocalFS struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

LocalFS is a FileService implementation backed by local file system

func NewLocalFS

func NewLocalFS(
	name string,
	rootPath string,
	memCacheCapacity int64,
) (*LocalFS, error)

func (*LocalFS) CacheStats added in v0.6.0

func (l *LocalFS) CacheStats() *CacheStats

func (*LocalFS) Delete

func (l *LocalFS) Delete(ctx context.Context, filePaths ...string) error

func (*LocalFS) FlushCache added in v0.6.0

func (l *LocalFS) FlushCache()

func (*LocalFS) List

func (l *LocalFS) List(ctx context.Context, dirPath string) (ret []DirEntry, err error)

func (*LocalFS) Name added in v0.6.0

func (l *LocalFS) Name() string

func (*LocalFS) NewMutator added in v0.6.0

func (l *LocalFS) NewMutator(filePath string) (Mutator, error)

func (*LocalFS) Read

func (l *LocalFS) Read(ctx context.Context, vector *IOVector) (err error)

func (*LocalFS) Replace added in v0.6.0

func (l *LocalFS) Replace(ctx context.Context, vector IOVector) error

func (*LocalFS) StatFile added in v0.7.0

func (l *LocalFS) StatFile(ctx context.Context, filePath string) (*DirEntry, error)

func (*LocalFS) Write

func (l *LocalFS) Write(ctx context.Context, vector IOVector) error

type LocalFSMutator added in v0.6.0

type LocalFSMutator struct {
	// contains filtered or unexported fields
}

func (*LocalFSMutator) Append added in v0.6.0

func (l *LocalFSMutator) Append(ctx context.Context, entries ...IOEntry) error

func (*LocalFSMutator) Close added in v0.6.0

func (l *LocalFSMutator) Close() error

func (*LocalFSMutator) Mutate added in v0.6.0

func (l *LocalFSMutator) Mutate(ctx context.Context, entries ...IOEntry) error

type MemCache added in v0.6.0

type MemCache struct {
	// contains filtered or unexported fields
}

func NewMemCache added in v0.6.0

func NewMemCache(capacity int64) *MemCache

func (*MemCache) CacheStats added in v0.6.0

func (m *MemCache) CacheStats() *CacheStats

func (*MemCache) Flush added in v0.6.0

func (m *MemCache) Flush()

func (*MemCache) Read added in v0.6.0

func (m *MemCache) Read(
	ctx context.Context,
	vector *IOVector,
) (
	err error,
)

func (*MemCache) Update added in v0.7.0

func (m *MemCache) Update(
	ctx context.Context,
	vector *IOVector,
) error

type MemoryFS

type MemoryFS struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

MemoryFS is an in-memory FileService implementation

func NewMemoryFS

func NewMemoryFS(name string) (*MemoryFS, error)

func (*MemoryFS) Delete

func (m *MemoryFS) Delete(ctx context.Context, filePaths ...string) error

func (*MemoryFS) ETLCompatible added in v0.6.0

func (m *MemoryFS) ETLCompatible()

func (*MemoryFS) List

func (m *MemoryFS) List(ctx context.Context, dirPath string) (entries []DirEntry, err error)

func (*MemoryFS) Name added in v0.6.0

func (m *MemoryFS) Name() string

func (*MemoryFS) Read

func (m *MemoryFS) Read(ctx context.Context, vector *IOVector) error

func (*MemoryFS) Replace added in v0.6.0

func (m *MemoryFS) Replace(ctx context.Context, vector IOVector) error

func (*MemoryFS) StatFile added in v0.7.0

func (m *MemoryFS) StatFile(ctx context.Context, filePath string) (*DirEntry, error)

func (*MemoryFS) Write

func (m *MemoryFS) Write(ctx context.Context, vector IOVector) error

type MutableFileService

type MutableFileService interface {
	FileService

	// NewMutator creates a new mutator
	NewMutator(filePath string) (Mutator, error)
}

MutableFileService is an extension interface to FileService that allow mutation

type Mutator added in v0.6.0

type Mutator interface {

	// Mutate mutates file contents
	Mutate(ctx context.Context, entries ...IOEntry) error

	// Append appends data to file
	// all IOEntry.Offset is base on the end of file position
	// for example, passing IOEntry{Offset: 0, Len:1, Data: []byte("a")} will append "a" to the end of file
	Append(ctx context.Context, entries ...IOEntry) error

	// Close closes the mutator
	// Must be called after finishing mutation
	Close() error
}

type NewFileServicesFunc added in v0.6.0

type NewFileServicesFunc = func(defaultName string) (*FileServices, error)

NewFileServicesFunc creates a new *FileServices

type Path added in v0.6.0

type Path struct {
	Service          string
	ServiceArguments []string
	File             string
}

func ParsePath added in v0.6.0

func ParsePath(s string) (path Path, err error)

func ParsePathAtService added in v0.6.0

func ParsePathAtService(s string, serviceName string) (path Path, err error)

type RC added in v0.6.0

type RC[T any] struct {
	Value T
	// contains filtered or unexported fields
}

RC represents a reference counted value that will not evict in LRU if refs is greater than 0

func NewRC added in v0.6.0

func NewRC[T any](value T) *RC[T]

NewRC creates an RC value with 0 reference

func (*RC[T]) DecRef added in v0.6.0

func (r *RC[T]) DecRef()

DecRef decreases reference count

func (*RC[T]) IncRef added in v0.6.0

func (r *RC[T]) IncRef()

IncRef increases reference count

func (*RC[T]) RefCount added in v0.6.0

func (r *RC[T]) RefCount() int64

RefCount returns reference count

type ReplaceableFileService added in v0.6.0

type ReplaceableFileService interface {
	FileService

	Replace(ctx context.Context, vector IOVector) error
}

ReplaceableFileService is an extension interface to FileService that allow replacing a whole file

type S3Config

type S3Config struct {
	SharedConfigProfile string `toml:"shared-config-profile"`
	Endpoint            string `toml:"endpoint"`
	Bucket              string `toml:"bucket"`
	// KeyPrefix enables multiple fs instances in one bucket
	KeyPrefix string `toml:"key-prefix"`
}

type S3FS

type S3FS struct {
	// contains filtered or unexported fields
}

S3FS is a FileService implementation backed by S3

func NewS3FS

func NewS3FS(
	sharedConfigProfile string,
	name string,
	endpoint string,
	bucket string,
	keyPrefix string,
	memCacheCapacity int64,
	diskCacheCapacity int64,
	diskCachePath string,
) (*S3FS, error)

func NewS3FSOnMinio added in v0.6.0

func NewS3FSOnMinio(
	sharedConfigProfile string,
	name string,
	endpoint string,
	bucket string,
	keyPrefix string,
	memCacheCapacity int64,
	diskCacheCapacity int64,
	diskCachePath string,
) (*S3FS, error)

NewS3FSOnMinio creates S3FS on minio server this is needed because the URL scheme of minio server does not compatible with AWS'

func (*S3FS) CacheStats added in v0.6.0

func (s *S3FS) CacheStats() *CacheStats

func (*S3FS) Delete

func (s *S3FS) Delete(ctx context.Context, filePaths ...string) error

func (*S3FS) ETLCompatible added in v0.6.0

func (*S3FS) ETLCompatible()

func (*S3FS) FlushCache added in v0.6.0

func (s *S3FS) FlushCache()

func (*S3FS) List

func (s *S3FS) List(ctx context.Context, dirPath string) (entries []DirEntry, err error)

func (*S3FS) Name added in v0.6.0

func (s *S3FS) Name() string

func (*S3FS) Read

func (s *S3FS) Read(ctx context.Context, vector *IOVector) (err error)

func (*S3FS) StatFile added in v0.7.0

func (s *S3FS) StatFile(ctx context.Context, filePath string) (*DirEntry, error)

func (*S3FS) Write

func (s *S3FS) Write(ctx context.Context, vector IOVector) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL