filer

package
v0.0.0-...-45e1a9a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 15, 2024 License: Apache-2.0 Imports: 40 Imported by: 0

Documentation

Index

Constants

View Source
const (
	LogFlushInterval = time.Minute
	PaginationSize   = 1024
	FilerStoreId     = "filer.store.id"
)
View Source
const (
	DirectoryEtcRoot      = "/etc/"
	DirectoryEtcSeaweedFS = "/etc/seaweedfs"
	DirectoryEtcRemote    = "/etc/remote"
	FilerConfName         = "filer.conf"
	IamConfigDirectory    = "/etc/iam"
	IamIdentityFile       = "identity.json"
	IamPoliciesFile       = "policies.json"
)
View Source
const (
	TopicsDir    = "/topics"
	SystemLogDir = TopicsDir + "/.system/log"
)
View Source
const CountEntryChunksForGzip = 50
View Source
const (
	HARD_LINK_MARKER = '\x01'
)
View Source
const (
	ManifestBatch = 10000
)
View Source
const (
	MetaOffsetPrefix = "Meta"
)
View Source
const ModeChangeLimit = 3
View Source
const (
	MsgFailDelNonEmptyFolder = "fail to delete non-empty folder"
)
View Source
const REMOTE_STORAGE_CONF_SUFFIX = ".conf"
View Source
const REMOTE_STORAGE_MOUNT_FILE = "mount.mapping"
View Source
const (
	// see weedfs_file_lseek.go
	SEEK_DATA uint32 = 3 // seek to next data after the offset

)
View Source
const SectionSize = 2 * 1024 * 1024 * 32 // 64MiB

Variables

View Source
var (
	OS_UID = uint32(os.Getuid())
	OS_GID = uint32(os.Getgid())
)
View Source
var (
	ErrUnsupportedListDirectoryPrefixed      = errors.New("unsupported directory prefix listing")
	ErrUnsupportedSuperLargeDirectoryListing = errors.New("unsupported super large directory listing")
	ErrKvNotImplemented                      = errors.New("kv not implemented yet")
	ErrKvNotFound                            = errors.New("kv: not found")
)
View Source
var (
	Root = &Entry{
		FullPath: "/",
		Attr: Attr{
			Mtime:  time.Now(),
			Crtime: time.Now(),
			Mode:   os.ModeDir | 0755,
			Uid:    OS_UID,
			Gid:    OS_GID,
		},
	}
)
View Source
var (
	Stores []FilerStore
)
View Source
var (
	VolumeNotFoundPattern = regexp.MustCompile(`volume \d+? not found`)
)

Functions

func CacheRemoteObjectToLocalCluster

func CacheRemoteObjectToLocalCluster(filerClient filer_pb.FilerClient, remoteConf *remote_pb.RemoteConf, remoteLocation *remote_pb.RemoteStorageLocation, parent util.FullPath, entry *filer_pb.Entry) error

func CheckDuplicateAccessKey

func CheckDuplicateAccessKey(s3cfg *iam_pb.S3ApiConfiguration) error

CheckDuplicateAccessKey returns an error message when s3cfg has duplicate access keys

func CompactFileChunks

func CompactFileChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk)

func DeleteMountMapping

func DeleteMountMapping(filerClient filer_pb.FilerClient, dir string) (err error)

func DoMinusChunks

func DoMinusChunks(as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk)

func DoMinusChunksBySourceFileId

func DoMinusChunksBySourceFileId(as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk)

func ETag

func ETag(entry *filer_pb.Entry) (etag string)

func ETagChunks

func ETagChunks(chunks []*filer_pb.FileChunk) (etag string)

func ETagEntry

func ETagEntry(entry *Entry) (etag string)

func EntryAttributeToPb

func EntryAttributeToPb(entry *Entry) *filer_pb.FuseAttributes

func EqualEntry

func EqualEntry(a, b *Entry) bool

func FileSize

func FileSize(entry *filer_pb.Entry) (size uint64)

func FindGarbageChunks

func FindGarbageChunks(visibles *IntervalList[*VisibleInterval], start int64, stop int64) (garbageFileIds map[string]struct{})

func FromPbEntryToExistingEntry

func FromPbEntryToExistingEntry(message *filer_pb.Entry, fsEntry *Entry)

func GetPeerMetaOffsetKey

func GetPeerMetaOffsetKey(peerSignature int32) []byte

func HasChunkManifest

func HasChunkManifest(chunks []*filer_pb.FileChunk) bool

func HasData

func HasData(entry *filer_pb.Entry) bool

func InsertMountMapping

func InsertMountMapping(filerClient filer_pb.FilerClient, dir string, remoteStorageLocation *remote_pb.RemoteStorageLocation) (err error)

func IsSameData

func IsSameData(a, b *filer_pb.Entry) bool

func LookupByMasterClientFn

func LookupByMasterClientFn(masterClient *wdclient.MasterClient) func(vids []string) (map[string]*operation.LookupResult, error)

func MapFullPathToRemoteStorageLocation

func MapFullPathToRemoteStorageLocation(localMountedDir util.FullPath, remoteMountedLocation *remote_pb.RemoteStorageLocation, fp util.FullPath) *remote_pb.RemoteStorageLocation

func MapRemoteStorageLocationPathToFullPath

func MapRemoteStorageLocationPathToFullPath(localMountedDir util.FullPath, remoteMountedLocation *remote_pb.RemoteStorageLocation, remoteLocationPath string) (fp util.FullPath)

func MaybeManifestize

func MaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk) (chunks []*filer_pb.FileChunk, err error)

func MergeIntoChunkViews

func MergeIntoChunkViews(chunkViews *IntervalList[*ChunkView], start int64, stop int64, chunk *filer_pb.FileChunk)

func MergeIntoVisibles

func MergeIntoVisibles(visibles *IntervalList[*VisibleInterval], start int64, stop int64, chunk *filer_pb.FileChunk)

func MinusChunks

func MinusChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk, err error)

func NewFileReader

func NewFileReader(filerClient filer_pb.FilerClient, entry *filer_pb.Entry) io.Reader

func ParallelProcessDirectoryStructure

func ParallelProcessDirectoryStructure(entryChan chan *Entry, concurrency int, eachEntryFn func(entry *Entry) error) (firstErr error)

ParallelProcessDirectoryStructure processes each entry in parallel, and also ensure parent directories are processed first. This also assumes the parent directories are in the entryChan already.

func ParseS3ConfigurationFromBytes

func ParseS3ConfigurationFromBytes[T proto.Message](content []byte, config T) error

func ProtoToText

func ProtoToText(writer io.Writer, config proto.Message) error

func ReadAll

func ReadAll(buffer []byte, masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) error

func ReadEachLogEntry

func ReadEachLogEntry(r io.Reader, sizeBuf []byte, startTsNs, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastTsNs int64, err error)

func ReadEntry

func ReadEntry(masterClient *wdclient.MasterClient, filerClient filer_pb.SeaweedFilerClient, dir, name string, byteBuffer *bytes.Buffer) error

func ReadInsideFiler

func ReadInsideFiler(filerClient filer_pb.SeaweedFilerClient, dir, name string) (content []byte, err error)

func ReadMountMappings

func ReadMountMappings(grpcDialOption grpc.DialOption, filerAddress pb.ServerAddress) (mappings *remote_pb.RemoteStorageMapping, readErr error)

func ReadRemoteStorageConf

func ReadRemoteStorageConf(grpcDialOption grpc.DialOption, filerAddress pb.ServerAddress, storageName string) (conf *remote_pb.RemoteConf, readErr error)

func Replay

func Replay(filerStore FilerStore, resp *filer_pb.SubscribeMetadataResponse) error

func ResolveChunkManifest

func ResolveChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, startOffset, stopOffset int64) (dataChunks, manifestChunks []*filer_pb.FileChunk, manifestResolveErr error)

func ResolveOneChunkManifest

func ResolveOneChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunk *filer_pb.FileChunk) (dataChunks []*filer_pb.FileChunk, manifestResolveErr error)

func SaveInsideFiler

func SaveInsideFiler(client filer_pb.SeaweedFilerClient, dir, name string, content []byte) error

func SeparateGarbageChunks

func SeparateGarbageChunks(visibles *IntervalList[*VisibleInterval], chunks []*filer_pb.FileChunk) (compacted []*filer_pb.FileChunk, garbage []*filer_pb.FileChunk)

func SeparateManifestChunks

func SeparateManifestChunks(chunks []*filer_pb.FileChunk) (manifestChunks, nonManifestChunks []*filer_pb.FileChunk)

func StreamContent

func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error

func TotalSize

func TotalSize(chunks []*filer_pb.FileChunk) (size uint64)

func UnmarshalRemoteStorageMappings

func UnmarshalRemoteStorageMappings(oldContent []byte) (mappings *remote_pb.RemoteStorageMapping, err error)

func VolumeId

func VolumeId(fileId string) string

Types

type Attr

type Attr struct {
	Mtime         time.Time   // time of last modification
	Crtime        time.Time   // time of creation (OS X only)
	Mode          os.FileMode // file mode
	Uid           uint32      // owner uid
	Gid           uint32      // group gid
	Mime          string      // mime type
	TtlSec        int32       // ttl in seconds
	UserName      string
	GroupNames    []string
	SymlinkTarget string
	Md5           []byte
	FileSize      uint64
	Rdev          uint32
	Inode         uint64
}

func PbToEntryAttribute

func PbToEntryAttribute(attr *filer_pb.FuseAttributes) Attr

func (Attr) IsDirectory

func (attr Attr) IsDirectory() bool

type BucketAware

type BucketAware interface {
	OnBucketCreation(bucket string)
	OnBucketDeletion(bucket string)
	CanDropWholeBucket() bool
}

type ChunkGroup

type ChunkGroup struct {
	// contains filtered or unexported fields
}

func NewChunkGroup

func NewChunkGroup(lookupFn wdclient.LookupFileIdFunctionType, chunkCache chunk_cache.ChunkCache, chunks []*filer_pb.FileChunk) (*ChunkGroup, error)

func (*ChunkGroup) AddChunk

func (group *ChunkGroup) AddChunk(chunk *filer_pb.FileChunk) error

func (*ChunkGroup) ReadDataAt

func (group *ChunkGroup) ReadDataAt(fileSize int64, buff []byte, offset int64) (n int, tsNs int64, err error)

func (*ChunkGroup) SearchChunks

func (group *ChunkGroup) SearchChunks(offset, fileSize int64, whence uint32) (found bool, out int64)

FIXME: needa tests

func (*ChunkGroup) SetChunks

func (group *ChunkGroup) SetChunks(chunks []*filer_pb.FileChunk) error

type ChunkReadAt

type ChunkReadAt struct {
	// contains filtered or unexported fields
}

func NewChunkReaderAtFromClient

func NewChunkReaderAtFromClient(readerCache *ReaderCache, chunkViews *IntervalList[*ChunkView], fileSize int64) *ChunkReadAt

func (*ChunkReadAt) Close

func (c *ChunkReadAt) Close() error

func (*ChunkReadAt) ReadAt

func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error)

func (*ChunkReadAt) ReadAtWithTime

func (c *ChunkReadAt) ReadAtWithTime(p []byte, offset int64) (n int, ts int64, err error)

type ChunkStreamReader

type ChunkStreamReader struct {
	// contains filtered or unexported fields
}

---------------- ChunkStreamReader ----------------------------------

func NewChunkStreamReader

func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader

func NewChunkStreamReaderFromFiler

func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader

func (*ChunkStreamReader) Close

func (c *ChunkStreamReader) Close()

func (*ChunkStreamReader) Read

func (c *ChunkStreamReader) Read(p []byte) (n int, err error)

func (*ChunkStreamReader) ReadAt

func (c *ChunkStreamReader) ReadAt(p []byte, off int64) (n int, err error)

func (*ChunkStreamReader) Seek

func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error)

type ChunkView

type ChunkView struct {
	FileId        string
	OffsetInChunk int64 // offset within the chunk
	ViewSize      uint64
	ViewOffset    int64 // actual offset in the file, for the data specified via [offset, offset+size) in current chunk
	ChunkSize     uint64
	CipherKey     []byte
	IsGzipped     bool
	ModifiedTsNs  int64
}

func (*ChunkView) Clone

func (cv *ChunkView) Clone() IntervalValue

func (*ChunkView) IsFullChunk

func (cv *ChunkView) IsFullChunk() bool

func (*ChunkView) SetStartStop

func (cv *ChunkView) SetStartStop(start, stop int64)

type Debuggable

type Debuggable interface {
	Debug(writer io.Writer)
}

type DoStreamContent

type DoStreamContent func(writer io.Writer) error

func PrepareStreamContent

func PrepareStreamContent(masterClient wdclient.HasLookupFileIdFunction, jwtFunc VolumeServerJwtFunction, chunks []*filer_pb.FileChunk, offset int64, size int64) (DoStreamContent, error)

func PrepareStreamContentWithThrottler

func PrepareStreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunction, jwtFunc VolumeServerJwtFunction, chunks []*filer_pb.FileChunk, offset int64, size int64, downloadMaxBytesPs int64) (DoStreamContent, error)

type Entry

type Entry struct {
	util.FullPath

	Attr
	Extended map[string][]byte

	// the following is for files
	Chunks []*filer_pb.FileChunk `json:"chunks,omitempty"`

	HardLinkId      HardLinkId
	HardLinkCounter int32
	Content         []byte
	Remote          *filer_pb.RemoteEntry
	Quota           int64
}

func FromPbEntry

func FromPbEntry(dir string, entry *filer_pb.Entry) *Entry

func (*Entry) DecodeAttributesAndChunks

func (entry *Entry) DecodeAttributesAndChunks(blob []byte) error

func (*Entry) EncodeAttributesAndChunks

func (entry *Entry) EncodeAttributesAndChunks() ([]byte, error)

func (*Entry) GetChunks

func (entry *Entry) GetChunks() []*filer_pb.FileChunk

func (*Entry) IsInRemoteOnly

func (entry *Entry) IsInRemoteOnly() bool

func (*Entry) ShallowClone

func (entry *Entry) ShallowClone() *Entry

func (*Entry) Size

func (entry *Entry) Size() uint64

func (*Entry) Timestamp

func (entry *Entry) Timestamp() time.Time

func (*Entry) ToExistingProtoEntry

func (entry *Entry) ToExistingProtoEntry(message *filer_pb.Entry)

func (*Entry) ToProtoEntry

func (entry *Entry) ToProtoEntry() *filer_pb.Entry

func (*Entry) ToProtoFullEntry

func (entry *Entry) ToProtoFullEntry() *filer_pb.FullEntry

type FileChunkSection

type FileChunkSection struct {
	// contains filtered or unexported fields
}

func NewFileChunkSection

func NewFileChunkSection(si SectionIndex) *FileChunkSection

func (*FileChunkSection) DataStartOffset

func (section *FileChunkSection) DataStartOffset(group *ChunkGroup, offset int64, fileSize int64) int64

func (*FileChunkSection) NextStopOffset

func (section *FileChunkSection) NextStopOffset(group *ChunkGroup, offset int64, fileSize int64) int64

type Filer

type Filer struct {
	UniqueFilerId    int32
	UniqueFilerEpoch int32
	Store            VirtualFilerStore
	MasterClient     *wdclient.MasterClient

	GrpcDialOption     grpc.DialOption
	DirBucketsPath     string
	Cipher             bool
	LocalMetaLogBuffer *log_buffer.LogBuffer

	MetaAggregator    *MetaAggregator
	Signature         int32
	FilerConf         *FilerConf
	RemoteStorage     *FilerRemoteStorage
	Dlm               *lock_manager.DistributedLockManager
	MaxFilenameLength uint32
	// contains filtered or unexported fields
}

func NewFiler

func NewFiler(masters pb.ServerDiscovery, grpcDialOption grpc.DialOption, filerHost pb.ServerAddress, filerGroup string, collection string, replication string, dataCenter string, maxFilenameLength uint32, notifyFn func()) *Filer

func (*Filer) AggregateFromPeers

func (f *Filer) AggregateFromPeers(self pb.ServerAddress, existingNodes []*master_pb.ClusterNodeUpdate, startFrom time.Time)

func (*Filer) BeginTransaction

func (f *Filer) BeginTransaction(ctx context.Context) (context.Context, error)

func (*Filer) CanRename

func (f *Filer) CanRename(source, target util.FullPath, oldName string) error

func (*Filer) CommitTransaction

func (f *Filer) CommitTransaction(ctx context.Context) error

func (*Filer) CreateEntry

func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFromOtherCluster bool, signatures []int32, skipCreateParentDir bool, maxFilenameLength uint32) error

func (*Filer) DeleteChunks

func (f *Filer) DeleteChunks(fullpath util.FullPath, chunks []*filer_pb.FileChunk)

func (*Filer) DeleteChunksNotRecursive

func (f *Filer) DeleteChunksNotRecursive(chunks []*filer_pb.FileChunk)

func (*Filer) DeleteEntryMetaAndData

func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isFromOtherCluster bool, signatures []int32) (err error)

func (*Filer) DeleteUncommittedChunks

func (f *Filer) DeleteUncommittedChunks(chunks []*filer_pb.FileChunk)

func (*Filer) DetectBucket

func (f *Filer) DetectBucket(source util.FullPath) (bucket string)

func (*Filer) DirectDeleteChunks

func (f *Filer) DirectDeleteChunks(chunks []*filer_pb.FileChunk)

func (*Filer) FindEntry

func (f *Filer) FindEntry(ctx context.Context, p util.FullPath) (entry *Entry, err error)

func (*Filer) GetMaster

func (fs *Filer) GetMaster(ctx context.Context) pb.ServerAddress

func (*Filer) GetStore

func (f *Filer) GetStore() (store FilerStore)

func (*Filer) KeepMasterClientConnected

func (fs *Filer) KeepMasterClientConnected(ctx context.Context)

func (*Filer) ListDirectoryEntries

func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string, namePattern string, namePatternExclude string) (entries []*Entry, hasMore bool, err error)

For now, prefix and namePattern are mutually exclusive

func (*Filer) ListExistingPeerUpdates

func (f *Filer) ListExistingPeerUpdates(ctx context.Context) (existingNodes []*master_pb.ClusterNodeUpdate)

func (*Filer) LoadConfiguration

func (f *Filer) LoadConfiguration(config *util.ViperProxy) (isFresh bool)

func (*Filer) LoadFilerConf

func (f *Filer) LoadFilerConf()

func (*Filer) LoadRemoteStorageConfAndMapping

func (f *Filer) LoadRemoteStorageConfAndMapping()

////////////////////////////////// load and maintain remote storages //////////////////////////////////

func (*Filer) MaybeBootstrapFromOnePeer

func (f *Filer) MaybeBootstrapFromOnePeer(self pb.ServerAddress, existingNodes []*master_pb.ClusterNodeUpdate, snapshotTime time.Time) (err error)

func (*Filer) NotifyUpdateEvent

func (f *Filer) NotifyUpdateEvent(ctx context.Context, oldEntry, newEntry *Entry, deleteChunks, isFromOtherCluster bool, signatures []int32)

func (*Filer) ReadPersistedLogBuffer

func (f *Filer) ReadPersistedLogBuffer(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastTsNs int64, isDone bool, err error)

func (*Filer) RollbackTransaction

func (f *Filer) RollbackTransaction(ctx context.Context) error

func (*Filer) SetStore

func (f *Filer) SetStore(store FilerStore) (isFresh bool)

func (*Filer) Shutdown

func (f *Filer) Shutdown()

func (*Filer) StreamListDirectoryEntries

func (f *Filer) StreamListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string, namePattern string, namePatternExclude string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error)

For now, prefix and namePattern are mutually exclusive

func (*Filer) UpdateEntry

func (f *Filer) UpdateEntry(ctx context.Context, oldEntry, entry *Entry) (err error)

type FilerConf

type FilerConf struct {
	// contains filtered or unexported fields
}

func NewFilerConf

func NewFilerConf() (fc *FilerConf)

func ReadFilerConf

func ReadFilerConf(filerGrpcAddress pb.ServerAddress, grpcDialOption grpc.DialOption, masterClient *wdclient.MasterClient) (*FilerConf, error)

func (*FilerConf) AddLocationConf

func (fc *FilerConf) AddLocationConf(locConf *filer_pb.FilerConf_PathConf) (err error)

func (*FilerConf) DeleteLocationConf

func (fc *FilerConf) DeleteLocationConf(locationPrefix string)

func (*FilerConf) GetCollectionTtls

func (fc *FilerConf) GetCollectionTtls(collection string) (ttls map[string]string)

func (*FilerConf) LoadFromBytes

func (fc *FilerConf) LoadFromBytes(data []byte) (err error)

func (*FilerConf) MatchStorageRule

func (fc *FilerConf) MatchStorageRule(path string) (pathConf *filer_pb.FilerConf_PathConf)

func (*FilerConf) ToProto

func (fc *FilerConf) ToProto() *filer_pb.FilerConf

func (*FilerConf) ToText

func (fc *FilerConf) ToText(writer io.Writer) error

type FilerRemoteStorage

type FilerRemoteStorage struct {
	// contains filtered or unexported fields
}

func NewFilerRemoteStorage

func NewFilerRemoteStorage() (rs *FilerRemoteStorage)

func (*FilerRemoteStorage) FindMountDirectory

func (rs *FilerRemoteStorage) FindMountDirectory(p util.FullPath) (mountDir util.FullPath, remoteLocation *remote_pb.RemoteStorageLocation)

func (*FilerRemoteStorage) FindRemoteStorageClient

func (rs *FilerRemoteStorage) FindRemoteStorageClient(p util.FullPath) (client remote_storage.RemoteStorageClient, remoteConf *remote_pb.RemoteConf, found bool)

func (*FilerRemoteStorage) GetRemoteStorageClient

func (rs *FilerRemoteStorage) GetRemoteStorageClient(storageName string) (client remote_storage.RemoteStorageClient, remoteConf *remote_pb.RemoteConf, found bool)

func (*FilerRemoteStorage) LoadRemoteStorageConfigurationsAndMapping

func (rs *FilerRemoteStorage) LoadRemoteStorageConfigurationsAndMapping(filer *Filer) (err error)

type FilerStore

type FilerStore interface {
	// GetName gets the name to locate the configuration in filer.toml file
	GetName() string
	// Initialize initializes the file store
	Initialize(configuration util.Configuration, prefix string) error
	InsertEntry(context.Context, *Entry) error
	UpdateEntry(context.Context, *Entry) (err error)
	// err == filer_pb.ErrNotFound if not found
	FindEntry(context.Context, util.FullPath) (entry *Entry, err error)
	DeleteEntry(context.Context, util.FullPath) (err error)
	DeleteFolderChildren(context.Context, util.FullPath) (err error)
	ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error)
	ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error)

	BeginTransaction(ctx context.Context) (context.Context, error)
	CommitTransaction(ctx context.Context) error
	RollbackTransaction(ctx context.Context) error

	KvPut(ctx context.Context, key []byte, value []byte) (err error)
	KvGet(ctx context.Context, key []byte) (value []byte, err error)
	KvDelete(ctx context.Context, key []byte) (err error)

	Shutdown()
}

type FilerStorePathTranslator

type FilerStorePathTranslator struct {
	// contains filtered or unexported fields
}

func NewFilerStorePathTranslator

func NewFilerStorePathTranslator(storeRoot string, store FilerStore) *FilerStorePathTranslator

func (*FilerStorePathTranslator) BeginTransaction

func (t *FilerStorePathTranslator) BeginTransaction(ctx context.Context) (context.Context, error)

func (*FilerStorePathTranslator) CommitTransaction

func (t *FilerStorePathTranslator) CommitTransaction(ctx context.Context) error

func (*FilerStorePathTranslator) DeleteEntry

func (t *FilerStorePathTranslator) DeleteEntry(ctx context.Context, fp util.FullPath) (err error)

func (*FilerStorePathTranslator) DeleteFolderChildren

func (t *FilerStorePathTranslator) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error)

func (*FilerStorePathTranslator) DeleteOneEntry

func (t *FilerStorePathTranslator) DeleteOneEntry(ctx context.Context, existingEntry *Entry) (err error)

func (*FilerStorePathTranslator) FindEntry

func (t *FilerStorePathTranslator) FindEntry(ctx context.Context, fp util.FullPath) (entry *Entry, err error)

func (*FilerStorePathTranslator) GetName

func (t *FilerStorePathTranslator) GetName() string

func (*FilerStorePathTranslator) Initialize

func (t *FilerStorePathTranslator) Initialize(configuration util.Configuration, prefix string) error

func (*FilerStorePathTranslator) InsertEntry

func (t *FilerStorePathTranslator) InsertEntry(ctx context.Context, entry *Entry) error

func (*FilerStorePathTranslator) KvDelete

func (t *FilerStorePathTranslator) KvDelete(ctx context.Context, key []byte) (err error)

func (*FilerStorePathTranslator) KvGet

func (t *FilerStorePathTranslator) KvGet(ctx context.Context, key []byte) (value []byte, err error)

func (*FilerStorePathTranslator) KvPut

func (t *FilerStorePathTranslator) KvPut(ctx context.Context, key []byte, value []byte) (err error)

func (*FilerStorePathTranslator) ListDirectoryEntries

func (t *FilerStorePathTranslator) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc ListEachEntryFunc) (string, error)

func (*FilerStorePathTranslator) ListDirectoryPrefixedEntries

func (t *FilerStorePathTranslator) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (string, error)

func (*FilerStorePathTranslator) RollbackTransaction

func (t *FilerStorePathTranslator) RollbackTransaction(ctx context.Context) error

func (*FilerStorePathTranslator) Shutdown

func (t *FilerStorePathTranslator) Shutdown()

func (*FilerStorePathTranslator) UpdateEntry

func (t *FilerStorePathTranslator) UpdateEntry(ctx context.Context, entry *Entry) error

type FilerStoreWrapper

type FilerStoreWrapper struct {
	// contains filtered or unexported fields
}

func NewFilerStoreWrapper

func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper

func (*FilerStoreWrapper) AddPathSpecificStore

func (fsw *FilerStoreWrapper) AddPathSpecificStore(path string, storeId string, store FilerStore)

func (*FilerStoreWrapper) BeginTransaction

func (fsw *FilerStoreWrapper) BeginTransaction(ctx context.Context) (context.Context, error)

func (*FilerStoreWrapper) CanDropWholeBucket

func (fsw *FilerStoreWrapper) CanDropWholeBucket() bool

func (*FilerStoreWrapper) CommitTransaction

func (fsw *FilerStoreWrapper) CommitTransaction(ctx context.Context) error

func (*FilerStoreWrapper) Debug

func (fsw *FilerStoreWrapper) Debug(writer io.Writer)

func (*FilerStoreWrapper) DeleteEntry

func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath) (err error)

func (*FilerStoreWrapper) DeleteFolderChildren

func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error)
func (fsw *FilerStoreWrapper) DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error

func (*FilerStoreWrapper) DeleteOneEntry

func (fsw *FilerStoreWrapper) DeleteOneEntry(ctx context.Context, existingEntry *Entry) (err error)

func (*FilerStoreWrapper) FindEntry

func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) (entry *Entry, err error)

func (*FilerStoreWrapper) GetName

func (fsw *FilerStoreWrapper) GetName() string

func (*FilerStoreWrapper) Initialize

func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration, prefix string) error

func (*FilerStoreWrapper) InsertEntry

func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) error

func (*FilerStoreWrapper) KvDelete

func (fsw *FilerStoreWrapper) KvDelete(ctx context.Context, key []byte) (err error)

func (*FilerStoreWrapper) KvGet

func (fsw *FilerStoreWrapper) KvGet(ctx context.Context, key []byte) (value []byte, err error)

func (*FilerStoreWrapper) KvPut

func (fsw *FilerStoreWrapper) KvPut(ctx context.Context, key []byte, value []byte) (err error)

func (*FilerStoreWrapper) ListDirectoryEntries

func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc ListEachEntryFunc) (string, error)

func (*FilerStoreWrapper) ListDirectoryPrefixedEntries

func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error)

func (*FilerStoreWrapper) OnBucketCreation

func (fsw *FilerStoreWrapper) OnBucketCreation(bucket string)

func (*FilerStoreWrapper) OnBucketDeletion

func (fsw *FilerStoreWrapper) OnBucketDeletion(bucket string)

func (*FilerStoreWrapper) RollbackTransaction

func (fsw *FilerStoreWrapper) RollbackTransaction(ctx context.Context) error

func (*FilerStoreWrapper) Shutdown

func (fsw *FilerStoreWrapper) Shutdown()

func (*FilerStoreWrapper) UpdateEntry

func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) error

type HardLinkId

type HardLinkId []byte // 16 bytes + 1 marker byte

func NewHardLinkId

func NewHardLinkId() HardLinkId

type Interval

type Interval[T IntervalValue] struct {
	StartOffset int64
	StopOffset  int64
	TsNs        int64
	Value       T
	Prev        *Interval[T]
	Next        *Interval[T]
}

func (*Interval[T]) Size

func (interval *Interval[T]) Size() int64

type IntervalList

type IntervalList[T IntervalValue] struct {
	Lock sync.RWMutex
	// contains filtered or unexported fields
}

IntervalList mark written intervals within one page chunk

func NewIntervalList

func NewIntervalList[T IntervalValue]() *IntervalList[T]

func NonOverlappingVisibleIntervals

func NonOverlappingVisibleIntervals(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, startOffset int64, stopOffset int64) (visibles *IntervalList[*VisibleInterval], err error)

NonOverlappingVisibleIntervals translates the file chunk into VisibleInterval in memory If the file chunk content is a chunk manifest

func ViewFromChunks

func ViewFromChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, offset int64, size int64) (chunkViews *IntervalList[*ChunkView])

func ViewFromVisibleIntervals

func ViewFromVisibleIntervals(visibles *IntervalList[*VisibleInterval], offset int64, size int64) (chunkViews *IntervalList[*ChunkView])

func (*IntervalList[T]) AppendInterval

func (list *IntervalList[T]) AppendInterval(interval *Interval[T])

func (*IntervalList[T]) Front

func (list *IntervalList[T]) Front() (interval *Interval[T])

func (*IntervalList[T]) InsertInterval

func (list *IntervalList[T]) InsertInterval(startOffset, stopOffset, tsNs int64, value T)

func (*IntervalList[T]) Len

func (list *IntervalList[T]) Len() int

func (*IntervalList[T]) Overlay

func (list *IntervalList[T]) Overlay(startOffset, stopOffset, tsNs int64, value T)

type IntervalValue

type IntervalValue interface {
	SetStartStop(start, stop int64)
	Clone() IntervalValue
}

type ListEachEntryFunc

type ListEachEntryFunc func(entry *Entry) bool

type MetaAggregator

type MetaAggregator struct {
	MetaLogBuffer *log_buffer.LogBuffer

	// notifying clients
	ListenersLock sync.Mutex
	ListenersCond *sync.Cond
	// contains filtered or unexported fields
}

func NewMetaAggregator

func NewMetaAggregator(filer *Filer, self pb.ServerAddress, grpcDialOption grpc.DialOption) *MetaAggregator

MetaAggregator only aggregates data "on the fly". The logs are not re-persisted to disk. The old data comes from what each LocalMetadata persisted on disk.

func (*MetaAggregator) OnPeerUpdate

func (ma *MetaAggregator) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startFrom time.Time)

type OnChunksFunc

type OnChunksFunc func([]*filer_pb.FileChunk) error

type OnHardLinkIdsFunc

type OnHardLinkIdsFunc func([]HardLinkId) error

type Point

type Point struct {
	// contains filtered or unexported fields
}

type ReaderCache

type ReaderCache struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewReaderCache

func NewReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn wdclient.LookupFileIdFunctionType) *ReaderCache

func (*ReaderCache) MaybeCache

func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView])

func (*ReaderCache) ReadChunkAt

func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byte, isGzipped bool, offset int64, chunkSize int, shouldCache bool) (int, error)

func (*ReaderCache) UnCache

func (rc *ReaderCache) UnCache(fileId string)

type ReaderPattern

type ReaderPattern struct {
	// contains filtered or unexported fields
}

func NewReaderPattern

func NewReaderPattern() *ReaderPattern

func (*ReaderPattern) IsRandomMode

func (rp *ReaderPattern) IsRandomMode() bool

func (*ReaderPattern) MonitorReadAt

func (rp *ReaderPattern) MonitorReadAt(offset int64, size int)

type SaveDataAsChunkFunctionType

type SaveDataAsChunkFunctionType func(reader io.Reader, name string, offset int64, tsNs int64) (chunk *filer_pb.FileChunk, err error)

type SectionIndex

type SectionIndex int64

type SingleChunkCacher

type SingleChunkCacher struct {
	sync.Mutex
	// contains filtered or unexported fields
}

type VirtualFilerStore

type VirtualFilerStore interface {
	FilerStore
	DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error
	DeleteOneEntry(ctx context.Context, entry *Entry) error
	AddPathSpecificStore(path string, storeId string, store FilerStore)
	OnBucketCreation(bucket string)
	OnBucketDeletion(bucket string)
	CanDropWholeBucket() bool
}

type VisibleInterval

type VisibleInterval struct {
	// contains filtered or unexported fields
}

func (*VisibleInterval) Clone

func (v *VisibleInterval) Clone() IntervalValue

func (*VisibleInterval) SetStartStop

func (v *VisibleInterval) SetStartStop(start, stop int64)

type VolumeServerJwtFunction

type VolumeServerJwtFunction func(fileId string) string

Directories

Path Synopsis
elastic
v7
Package elastic is for elastic filer store.
Package elastic is for elastic filer store.
Package sqlite is for sqlite filer store.
Package sqlite is for sqlite filer store.
* Package tikv is for TiKV filer store.
* Package tikv is for TiKV filer store.
Package ydb is for YDB filer store.
Package ydb is for YDB filer store.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL