Documentation ¶
Index ¶
- Constants
- Variables
- func CacheRemoteObjectToLocalCluster(filerClient filer_pb.FilerClient, remoteConf *remote_pb.RemoteConf, ...) error
- func CheckDuplicateAccessKey(s3cfg *iam_pb.S3ApiConfiguration) error
- func CompactFileChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk)
- func DeleteMountMapping(filerClient filer_pb.FilerClient, dir string) (err error)
- func DetectMountInfo(grpcDialOption grpc.DialOption, filerAddress pb.ServerAddress, dir string) (*remote_pb.RemoteStorageMapping, string, *remote_pb.RemoteStorageLocation, ...)
- func DoMinusChunks(as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk)
- func DoMinusChunksBySourceFileId(as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk)
- func ETag(entry *filer_pb.Entry) (etag string)
- func ETagChunks(chunks []*filer_pb.FileChunk) (etag string)
- func ETagEntry(entry *Entry) (etag string)
- func EntryAttributeToPb(entry *Entry) *filer_pb.FuseAttributes
- func EqualEntry(a, b *Entry) bool
- func FileSize(entry *filer_pb.Entry) (size uint64)
- func FindGarbageChunks(visibles *IntervalList[*VisibleInterval], start int64, stop int64) (garbageFileIds map[string]struct{})
- func FromPbEntryToExistingEntry(message *filer_pb.Entry, fsEntry *Entry)
- func GetPeerMetaOffsetKey(peerSignature int32) []byte
- func HasChunkManifest(chunks []*filer_pb.FileChunk) bool
- func HasData(entry *filer_pb.Entry) bool
- func InsertMountMapping(filerClient filer_pb.FilerClient, dir string, ...) (err error)
- func IsSameData(a, b *filer_pb.Entry) bool
- func LookupByMasterClientFn(masterClient *wdclient.MasterClient) func(vids []string) (map[string]*operation.LookupResult, error)
- func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionType
- func MapFullPathToRemoteStorageLocation(localMountedDir util.FullPath, ...) *remote_pb.RemoteStorageLocation
- func MapRemoteStorageLocationPathToFullPath(localMountedDir util.FullPath, ...) (fp util.FullPath)
- func MaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk) (chunks []*filer_pb.FileChunk, err error)
- func MergeIntoChunkViews(chunkViews *IntervalList[*ChunkView], start int64, stop int64, ...)
- func MergeIntoVisibles(visibles *IntervalList[*VisibleInterval], start int64, stop int64, ...)
- func MinusChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk, err error)
- func NewFileReader(filerClient filer_pb.FilerClient, entry *filer_pb.Entry) io.Reader
- func ParallelProcessDirectoryStructure(entryChan chan *Entry, concurrency int, eachEntryFn func(entry *Entry) error) (firstErr error)
- func ParseS3ConfigurationFromBytes[T proto.Message](content []byte, config T) error
- func ProtoToText(writer io.Writer, config proto.Message) error
- func ReadAll(buffer []byte, masterClient *wdclient.MasterClient, ...) error
- func ReadEachLogEntry(r io.Reader, sizeBuf []byte, startTsNs, stopTsNs int64, ...) (lastTsNs int64, err error)
- func ReadEntry(masterClient *wdclient.MasterClient, filerClient filer_pb.SeaweedFilerClient, ...) error
- func ReadInsideFiler(filerClient filer_pb.SeaweedFilerClient, dir, name string) (content []byte, err error)
- func ReadMountMappings(grpcDialOption grpc.DialOption, filerAddress pb.ServerAddress) (mappings *remote_pb.RemoteStorageMapping, readErr error)
- func ReadRemoteStorageConf(grpcDialOption grpc.DialOption, filerAddress pb.ServerAddress, ...) (conf *remote_pb.RemoteConf, readErr error)
- func Replay(filerStore FilerStore, resp *filer_pb.SubscribeMetadataResponse) error
- func ResolveChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, ...) (dataChunks, manifestChunks []*filer_pb.FileChunk, manifestResolveErr error)
- func ResolveOneChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunk *filer_pb.FileChunk) (dataChunks []*filer_pb.FileChunk, manifestResolveErr error)
- func SaveInsideFiler(client filer_pb.SeaweedFilerClient, dir, name string, content []byte) error
- func SeparateGarbageChunks(visibles *IntervalList[*VisibleInterval], chunks []*filer_pb.FileChunk) (compacted []*filer_pb.FileChunk, garbage []*filer_pb.FileChunk)
- func SeparateManifestChunks(chunks []*filer_pb.FileChunk) (manifestChunks, nonManifestChunks []*filer_pb.FileChunk)
- func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writer, ...) error
- func TotalSize(chunks []*filer_pb.FileChunk) (size uint64)
- func UnmarshalRemoteStorageMappings(oldContent []byte) (mappings *remote_pb.RemoteStorageMapping, err error)
- func VolumeId(fileId string) string
- type Attr
- type BucketAware
- type ChunkGroup
- func (group *ChunkGroup) AddChunk(chunk *filer_pb.FileChunk) error
- func (group *ChunkGroup) ReadDataAt(fileSize int64, buff []byte, offset int64) (n int, tsNs int64, err error)
- func (group *ChunkGroup) SearchChunks(offset, fileSize int64, whence uint32) (found bool, out int64)
- func (group *ChunkGroup) SetChunks(chunks []*filer_pb.FileChunk) error
- type ChunkReadAt
- type ChunkStreamReader
- type ChunkView
- type Debuggable
- type DoStreamContent
- type Entry
- func (entry *Entry) DecodeAttributesAndChunks(blob []byte) error
- func (entry *Entry) EncodeAttributesAndChunks() ([]byte, error)
- func (entry *Entry) GetChunks() []*filer_pb.FileChunk
- func (entry *Entry) IsInRemoteOnly() bool
- func (entry *Entry) ShallowClone() *Entry
- func (entry *Entry) Size() uint64
- func (entry *Entry) Timestamp() time.Time
- func (entry *Entry) ToExistingProtoEntry(message *filer_pb.Entry)
- func (entry *Entry) ToProtoEntry() *filer_pb.Entry
- func (entry *Entry) ToProtoFullEntry() *filer_pb.FullEntry
- type FileChunkSection
- type Filer
- func (f *Filer) AggregateFromPeers(self pb.ServerAddress, existingNodes []*master_pb.ClusterNodeUpdate, ...)
- func (f *Filer) BeginTransaction(ctx context.Context) (context.Context, error)
- func (f *Filer) CanRename(source, target util.FullPath, oldName string) error
- func (f *Filer) CommitTransaction(ctx context.Context) error
- func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFromOtherCluster bool, ...) error
- func (f *Filer) DeleteChunks(fullpath util.FullPath, chunks []*filer_pb.FileChunk)
- func (f *Filer) DeleteChunksNotRecursive(chunks []*filer_pb.FileChunk)
- func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, ...) (err error)
- func (f *Filer) DeleteUncommittedChunks(chunks []*filer_pb.FileChunk)
- func (f *Filer) DetectBucket(source util.FullPath) (bucket string)
- func (f *Filer) DirectDeleteChunks(chunks []*filer_pb.FileChunk)
- func (f *Filer) FindEntry(ctx context.Context, p util.FullPath) (entry *Entry, err error)
- func (fs *Filer) GetMaster(ctx context.Context) pb.ServerAddress
- func (f *Filer) GetStore() (store FilerStore)
- func (fs *Filer) KeepMasterClientConnected(ctx context.Context)
- func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, ...) (entries []*Entry, hasMore bool, err error)
- func (f *Filer) ListExistingPeerUpdates(ctx context.Context) (existingNodes []*master_pb.ClusterNodeUpdate)
- func (f *Filer) LoadConfiguration(config *util.ViperProxy) (isFresh bool)
- func (f *Filer) LoadFilerConf()
- func (f *Filer) LoadRemoteStorageConfAndMapping()
- func (f *Filer) MaybeBootstrapFromOnePeer(self pb.ServerAddress, existingNodes []*master_pb.ClusterNodeUpdate, ...) (err error)
- func (f *Filer) NotifyUpdateEvent(ctx context.Context, oldEntry, newEntry *Entry, ...)
- func (f *Filer) ReadPersistedLogBuffer(startPosition log_buffer.MessagePosition, stopTsNs int64, ...) (lastTsNs int64, isDone bool, err error)
- func (f *Filer) RollbackTransaction(ctx context.Context) error
- func (f *Filer) SetStore(store FilerStore) (isFresh bool)
- func (f *Filer) Shutdown()
- func (f *Filer) StreamListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, ...) (lastFileName string, err error)
- func (f *Filer) UpdateEntry(ctx context.Context, oldEntry, entry *Entry) (err error)
- type FilerConf
- func (fc *FilerConf) AddLocationConf(locConf *filer_pb.FilerConf_PathConf) (err error)
- func (fc *FilerConf) DeleteLocationConf(locationPrefix string)
- func (fc *FilerConf) GetCollectionTtls(collection string) (ttls map[string]string)
- func (fc *FilerConf) LoadFromBytes(data []byte) (err error)
- func (fc *FilerConf) MatchStorageRule(path string) (pathConf *filer_pb.FilerConf_PathConf)
- func (fc *FilerConf) ToProto() *filer_pb.FilerConf
- func (fc *FilerConf) ToText(writer io.Writer) error
- type FilerRemoteStorage
- func (rs *FilerRemoteStorage) FindMountDirectory(p util.FullPath) (mountDir util.FullPath, remoteLocation *remote_pb.RemoteStorageLocation)
- func (rs *FilerRemoteStorage) FindRemoteStorageClient(p util.FullPath) (client remote_storage.RemoteStorageClient, remoteConf *remote_pb.RemoteConf, ...)
- func (rs *FilerRemoteStorage) GetRemoteStorageClient(storageName string) (client remote_storage.RemoteStorageClient, remoteConf *remote_pb.RemoteConf, ...)
- func (rs *FilerRemoteStorage) LoadRemoteStorageConfigurationsAndMapping(filer *Filer) (err error)
- type FilerStore
- type FilerStorePathTranslator
- func (t *FilerStorePathTranslator) BeginTransaction(ctx context.Context) (context.Context, error)
- func (t *FilerStorePathTranslator) CommitTransaction(ctx context.Context) error
- func (t *FilerStorePathTranslator) DeleteEntry(ctx context.Context, fp util.FullPath) (err error)
- func (t *FilerStorePathTranslator) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error)
- func (t *FilerStorePathTranslator) DeleteOneEntry(ctx context.Context, existingEntry *Entry) (err error)
- func (t *FilerStorePathTranslator) FindEntry(ctx context.Context, fp util.FullPath) (entry *Entry, err error)
- func (t *FilerStorePathTranslator) GetName() string
- func (t *FilerStorePathTranslator) Initialize(configuration util.Configuration, prefix string) error
- func (t *FilerStorePathTranslator) InsertEntry(ctx context.Context, entry *Entry) error
- func (t *FilerStorePathTranslator) KvDelete(ctx context.Context, key []byte) (err error)
- func (t *FilerStorePathTranslator) KvGet(ctx context.Context, key []byte) (value []byte, err error)
- func (t *FilerStorePathTranslator) KvPut(ctx context.Context, key []byte, value []byte) (err error)
- func (t *FilerStorePathTranslator) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, ...) (string, error)
- func (t *FilerStorePathTranslator) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, ...) (string, error)
- func (t *FilerStorePathTranslator) RollbackTransaction(ctx context.Context) error
- func (t *FilerStorePathTranslator) Shutdown()
- func (t *FilerStorePathTranslator) UpdateEntry(ctx context.Context, entry *Entry) error
- type FilerStoreWrapper
- func (fsw *FilerStoreWrapper) AddPathSpecificStore(path string, storeId string, store FilerStore)
- func (fsw *FilerStoreWrapper) BeginTransaction(ctx context.Context) (context.Context, error)
- func (fsw *FilerStoreWrapper) CanDropWholeBucket() bool
- func (fsw *FilerStoreWrapper) CommitTransaction(ctx context.Context) error
- func (fsw *FilerStoreWrapper) Debug(writer io.Writer)
- func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath) (err error)
- func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error)
- func (fsw *FilerStoreWrapper) DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error
- func (fsw *FilerStoreWrapper) DeleteOneEntry(ctx context.Context, existingEntry *Entry) (err error)
- func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) (entry *Entry, err error)
- func (fsw *FilerStoreWrapper) GetName() string
- func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration, prefix string) error
- func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) error
- func (fsw *FilerStoreWrapper) KvDelete(ctx context.Context, key []byte) (err error)
- func (fsw *FilerStoreWrapper) KvGet(ctx context.Context, key []byte) (value []byte, err error)
- func (fsw *FilerStoreWrapper) KvPut(ctx context.Context, key []byte, value []byte) (err error)
- func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, ...) (string, error)
- func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, ...) (lastFileName string, err error)
- func (fsw *FilerStoreWrapper) OnBucketCreation(bucket string)
- func (fsw *FilerStoreWrapper) OnBucketDeletion(bucket string)
- func (fsw *FilerStoreWrapper) RollbackTransaction(ctx context.Context) error
- func (fsw *FilerStoreWrapper) Shutdown()
- func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) error
- type HardLinkId
- type Interval
- type IntervalList
- func NewIntervalList[T IntervalValue]() *IntervalList[T]
- func NonOverlappingVisibleIntervals(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, ...) (visibles *IntervalList[*VisibleInterval], err error)
- func ViewFromChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, ...) (chunkViews *IntervalList[*ChunkView])
- func ViewFromVisibleIntervals(visibles *IntervalList[*VisibleInterval], offset int64, size int64) (chunkViews *IntervalList[*ChunkView])
- func (list *IntervalList[T]) AppendInterval(interval *Interval[T])
- func (list *IntervalList[T]) Front() (interval *Interval[T])
- func (list *IntervalList[T]) InsertInterval(startOffset, stopOffset, tsNs int64, value T)
- func (list *IntervalList[T]) Len() int
- func (list *IntervalList[T]) Overlay(startOffset, stopOffset, tsNs int64, value T)
- type IntervalValue
- type ListEachEntryFunc
- type MetaAggregator
- type OnChunksFunc
- type OnHardLinkIdsFunc
- type Point
- type ReaderCache
- type ReaderPattern
- type SaveDataAsChunkFunctionType
- type SectionIndex
- type SingleChunkCacher
- type VirtualFilerStore
- type VisibleInterval
- type VolumeServerJwtFunction
Constants ¶
const ( LogFlushInterval = time.Minute PaginationSize = 1024 FilerStoreId = "filer.store.id" )
const ( DirectoryEtcRoot = "/etc/" DirectoryEtcSeaweedFS = "/etc/seaweedfs" DirectoryEtcRemote = "/etc/remote" FilerConfName = "filer.conf" IamConfigDirectory = "/etc/iam" IamIdentityFile = "identity.json" IamPoliciesFile = "policies.json" )
const ( TopicsDir = "/topics" SystemLogDir = TopicsDir + "/.system/log" )
const CountEntryChunksForGzip = 50
const (
HARD_LINK_MARKER = '\x01'
)
const (
ManifestBatch = 10000
)
const (
MetaOffsetPrefix = "Meta"
)
const ModeChangeLimit = 3
const (
MsgFailDelNonEmptyFolder = "fail to delete non-empty folder"
)
const REMOTE_STORAGE_CONF_SUFFIX = ".conf"
const REMOTE_STORAGE_MOUNT_FILE = "mount.mapping"
const ( // see weedfs_file_lseek.go SEEK_DATA uint32 = 3 // seek to next data after the offset )
const SectionSize = 2 * 1024 * 1024 * 32 // 64MiB
Variables ¶
var ( ErrUnsupportedListDirectoryPrefixed = errors.New("unsupported directory prefix listing") ErrUnsupportedSuperLargeDirectoryListing = errors.New("unsupported super large directory listing") ErrKvNotImplemented = errors.New("kv not implemented yet") ErrKvNotFound = errors.New("kv: not found") )
var ( Root = &Entry{ FullPath: "/", Attr: Attr{ Mtime: time.Now(), Crtime: time.Now(), Mode: os.ModeDir | 0755, Uid: OS_UID, Gid: OS_GID, }, } )
var (
Stores []FilerStore
)
var (
VolumeNotFoundPattern = regexp.MustCompile(`volume \d+? not found`)
)
Functions ¶
func CacheRemoteObjectToLocalCluster ¶
func CacheRemoteObjectToLocalCluster(filerClient filer_pb.FilerClient, remoteConf *remote_pb.RemoteConf, remoteLocation *remote_pb.RemoteStorageLocation, parent util.FullPath, entry *filer_pb.Entry) error
func CheckDuplicateAccessKey ¶
func CheckDuplicateAccessKey(s3cfg *iam_pb.S3ApiConfiguration) error
CheckDuplicateAccessKey returns an error message when s3cfg has duplicate access keys
func CompactFileChunks ¶
func DeleteMountMapping ¶
func DeleteMountMapping(filerClient filer_pb.FilerClient, dir string) (err error)
func DetectMountInfo ¶
func DetectMountInfo(grpcDialOption grpc.DialOption, filerAddress pb.ServerAddress, dir string) (*remote_pb.RemoteStorageMapping, string, *remote_pb.RemoteStorageLocation, *remote_pb.RemoteConf, error)
func ETagChunks ¶
func EntryAttributeToPb ¶
func EntryAttributeToPb(entry *Entry) *filer_pb.FuseAttributes
func EqualEntry ¶
func FindGarbageChunks ¶
func FindGarbageChunks(visibles *IntervalList[*VisibleInterval], start int64, stop int64) (garbageFileIds map[string]struct{})
func GetPeerMetaOffsetKey ¶
func HasChunkManifest ¶
func InsertMountMapping ¶
func InsertMountMapping(filerClient filer_pb.FilerClient, dir string, remoteStorageLocation *remote_pb.RemoteStorageLocation) (err error)
func IsSameData ¶
func LookupByMasterClientFn ¶
func LookupByMasterClientFn(masterClient *wdclient.MasterClient) func(vids []string) (map[string]*operation.LookupResult, error)
func LookupFn ¶
func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionType
func MapFullPathToRemoteStorageLocation ¶
func MapFullPathToRemoteStorageLocation(localMountedDir util.FullPath, remoteMountedLocation *remote_pb.RemoteStorageLocation, fp util.FullPath) *remote_pb.RemoteStorageLocation
func MaybeManifestize ¶
func MergeIntoChunkViews ¶
func MergeIntoVisibles ¶
func MergeIntoVisibles(visibles *IntervalList[*VisibleInterval], start int64, stop int64, chunk *filer_pb.FileChunk)
func MinusChunks ¶
func NewFileReader ¶
func ParallelProcessDirectoryStructure ¶
func ParallelProcessDirectoryStructure(entryChan chan *Entry, concurrency int, eachEntryFn func(entry *Entry) error) (firstErr error)
ParallelProcessDirectoryStructure processes each entry in parallel, and also ensure parent directories are processed first. This also assumes the parent directories are in the entryChan already.
func ReadEachLogEntry ¶
func ReadEachLogEntry(r io.Reader, sizeBuf []byte, startTsNs, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastTsNs int64, err error)
func ReadEntry ¶
func ReadEntry(masterClient *wdclient.MasterClient, filerClient filer_pb.SeaweedFilerClient, dir, name string, byteBuffer *bytes.Buffer) error
func ReadInsideFiler ¶
func ReadInsideFiler(filerClient filer_pb.SeaweedFilerClient, dir, name string) (content []byte, err error)
func ReadMountMappings ¶
func ReadMountMappings(grpcDialOption grpc.DialOption, filerAddress pb.ServerAddress) (mappings *remote_pb.RemoteStorageMapping, readErr error)
func ReadRemoteStorageConf ¶
func ReadRemoteStorageConf(grpcDialOption grpc.DialOption, filerAddress pb.ServerAddress, storageName string) (conf *remote_pb.RemoteConf, readErr error)
func Replay ¶
func Replay(filerStore FilerStore, resp *filer_pb.SubscribeMetadataResponse) error
func ResolveChunkManifest ¶
func ResolveOneChunkManifest ¶
func SaveInsideFiler ¶
func SaveInsideFiler(client filer_pb.SeaweedFilerClient, dir, name string, content []byte) error
func SeparateGarbageChunks ¶
func SeparateGarbageChunks(visibles *IntervalList[*VisibleInterval], chunks []*filer_pb.FileChunk) (compacted []*filer_pb.FileChunk, garbage []*filer_pb.FileChunk)
func SeparateManifestChunks ¶
func StreamContent ¶
func UnmarshalRemoteStorageMappings ¶
func UnmarshalRemoteStorageMappings(oldContent []byte) (mappings *remote_pb.RemoteStorageMapping, err error)
Types ¶
type Attr ¶
type Attr struct { Mtime time.Time // time of last modification Crtime time.Time // time of creation (OS X only) Mode os.FileMode // file mode Uid uint32 // owner uid Gid uint32 // group gid Mime string // mime type TtlSec int32 // ttl in seconds UserName string GroupNames []string SymlinkTarget string Md5 []byte FileSize uint64 Rdev uint32 Inode uint64 }
func PbToEntryAttribute ¶
func PbToEntryAttribute(attr *filer_pb.FuseAttributes) Attr
func (Attr) IsDirectory ¶
type BucketAware ¶
type ChunkGroup ¶
type ChunkGroup struct {
// contains filtered or unexported fields
}
func NewChunkGroup ¶
func NewChunkGroup(lookupFn wdclient.LookupFileIdFunctionType, chunkCache chunk_cache.ChunkCache, chunks []*filer_pb.FileChunk) (*ChunkGroup, error)
func (*ChunkGroup) ReadDataAt ¶
func (*ChunkGroup) SearchChunks ¶
func (group *ChunkGroup) SearchChunks(offset, fileSize int64, whence uint32) (found bool, out int64)
FIXME: needa tests
type ChunkReadAt ¶
type ChunkReadAt struct {
// contains filtered or unexported fields
}
func NewChunkReaderAtFromClient ¶
func NewChunkReaderAtFromClient(readerCache *ReaderCache, chunkViews *IntervalList[*ChunkView], fileSize int64) *ChunkReadAt
func (*ChunkReadAt) Close ¶
func (c *ChunkReadAt) Close() error
func (*ChunkReadAt) ReadAt ¶
func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error)
func (*ChunkReadAt) ReadAtWithTime ¶
type ChunkStreamReader ¶
type ChunkStreamReader struct {
// contains filtered or unexported fields
}
---------------- ChunkStreamReader ----------------------------------
func NewChunkStreamReader ¶
func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader
func NewChunkStreamReaderFromFiler ¶
func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader
func (*ChunkStreamReader) Close ¶
func (c *ChunkStreamReader) Close()
type ChunkView ¶
type ChunkView struct { FileId string OffsetInChunk int64 // offset within the chunk ViewSize uint64 ViewOffset int64 // actual offset in the file, for the data specified via [offset, offset+size) in current chunk ChunkSize uint64 CipherKey []byte IsGzipped bool ModifiedTsNs int64 }
func (*ChunkView) Clone ¶
func (cv *ChunkView) Clone() IntervalValue
func (*ChunkView) IsFullChunk ¶
func (*ChunkView) SetStartStop ¶
type Debuggable ¶
type DoStreamContent ¶
func PrepareStreamContent ¶
func PrepareStreamContent(masterClient wdclient.HasLookupFileIdFunction, jwtFunc VolumeServerJwtFunction, chunks []*filer_pb.FileChunk, offset int64, size int64) (DoStreamContent, error)
func PrepareStreamContentWithThrottler ¶
func PrepareStreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunction, jwtFunc VolumeServerJwtFunction, chunks []*filer_pb.FileChunk, offset int64, size int64, downloadMaxBytesPs int64) (DoStreamContent, error)
type Entry ¶
type Entry struct { util.FullPath Attr Extended map[string][]byte // the following is for files Chunks []*filer_pb.FileChunk `json:"chunks,omitempty"` HardLinkId HardLinkId HardLinkCounter int32 Content []byte Remote *filer_pb.RemoteEntry Quota int64 }
func (*Entry) DecodeAttributesAndChunks ¶
func (*Entry) EncodeAttributesAndChunks ¶
func (*Entry) IsInRemoteOnly ¶
func (*Entry) ShallowClone ¶
func (*Entry) ToExistingProtoEntry ¶
func (*Entry) ToProtoEntry ¶
func (*Entry) ToProtoFullEntry ¶
type FileChunkSection ¶
type FileChunkSection struct {
// contains filtered or unexported fields
}
func NewFileChunkSection ¶
func NewFileChunkSection(si SectionIndex) *FileChunkSection
func (*FileChunkSection) DataStartOffset ¶
func (section *FileChunkSection) DataStartOffset(group *ChunkGroup, offset int64, fileSize int64) int64
func (*FileChunkSection) NextStopOffset ¶
func (section *FileChunkSection) NextStopOffset(group *ChunkGroup, offset int64, fileSize int64) int64
type Filer ¶
type Filer struct { UniqueFilerId int32 UniqueFilerEpoch int32 Store VirtualFilerStore MasterClient *wdclient.MasterClient GrpcDialOption grpc.DialOption DirBucketsPath string Cipher bool LocalMetaLogBuffer *log_buffer.LogBuffer MetaAggregator *MetaAggregator Signature int32 FilerConf *FilerConf RemoteStorage *FilerRemoteStorage Dlm *lock_manager.DistributedLockManager MaxFilenameLength uint32 // contains filtered or unexported fields }
func NewFiler ¶
func NewFiler(masters pb.ServerDiscovery, grpcDialOption grpc.DialOption, filerHost pb.ServerAddress, filerGroup string, collection string, replication string, dataCenter string, maxFilenameLength uint32, notifyFn func()) *Filer
func (*Filer) AggregateFromPeers ¶
func (f *Filer) AggregateFromPeers(self pb.ServerAddress, existingNodes []*master_pb.ClusterNodeUpdate, startFrom time.Time)
func (*Filer) BeginTransaction ¶
func (*Filer) CreateEntry ¶
func (*Filer) DeleteChunks ¶
func (*Filer) DeleteChunksNotRecursive ¶
func (*Filer) DeleteEntryMetaAndData ¶
func (*Filer) DeleteUncommittedChunks ¶
func (*Filer) DirectDeleteChunks ¶
func (*Filer) GetStore ¶
func (f *Filer) GetStore() (store FilerStore)
func (*Filer) KeepMasterClientConnected ¶
func (*Filer) ListDirectoryEntries ¶
func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string, namePattern string, namePatternExclude string) (entries []*Entry, hasMore bool, err error)
For now, prefix and namePattern are mutually exclusive
func (*Filer) ListExistingPeerUpdates ¶
func (f *Filer) ListExistingPeerUpdates(ctx context.Context) (existingNodes []*master_pb.ClusterNodeUpdate)
func (*Filer) LoadConfiguration ¶
func (f *Filer) LoadConfiguration(config *util.ViperProxy) (isFresh bool)
func (*Filer) LoadFilerConf ¶
func (f *Filer) LoadFilerConf()
func (*Filer) LoadRemoteStorageConfAndMapping ¶
func (f *Filer) LoadRemoteStorageConfAndMapping()
////////////////////////////////// load and maintain remote storages //////////////////////////////////
func (*Filer) MaybeBootstrapFromOnePeer ¶
func (f *Filer) MaybeBootstrapFromOnePeer(self pb.ServerAddress, existingNodes []*master_pb.ClusterNodeUpdate, snapshotTime time.Time) (err error)
func (*Filer) NotifyUpdateEvent ¶
func (*Filer) ReadPersistedLogBuffer ¶
func (f *Filer) ReadPersistedLogBuffer(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastTsNs int64, isDone bool, err error)
func (*Filer) SetStore ¶
func (f *Filer) SetStore(store FilerStore) (isFresh bool)
func (*Filer) StreamListDirectoryEntries ¶
func (f *Filer) StreamListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string, namePattern string, namePatternExclude string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error)
For now, prefix and namePattern are mutually exclusive
type FilerConf ¶
type FilerConf struct {
// contains filtered or unexported fields
}
func NewFilerConf ¶
func NewFilerConf() (fc *FilerConf)
func ReadFilerConf ¶
func ReadFilerConf(filerGrpcAddress pb.ServerAddress, grpcDialOption grpc.DialOption, masterClient *wdclient.MasterClient) (*FilerConf, error)
func (*FilerConf) AddLocationConf ¶
func (fc *FilerConf) AddLocationConf(locConf *filer_pb.FilerConf_PathConf) (err error)
func (*FilerConf) DeleteLocationConf ¶
func (*FilerConf) GetCollectionTtls ¶
func (*FilerConf) LoadFromBytes ¶
func (*FilerConf) MatchStorageRule ¶
func (fc *FilerConf) MatchStorageRule(path string) (pathConf *filer_pb.FilerConf_PathConf)
type FilerRemoteStorage ¶
type FilerRemoteStorage struct {
// contains filtered or unexported fields
}
func NewFilerRemoteStorage ¶
func NewFilerRemoteStorage() (rs *FilerRemoteStorage)
func (*FilerRemoteStorage) FindMountDirectory ¶
func (rs *FilerRemoteStorage) FindMountDirectory(p util.FullPath) (mountDir util.FullPath, remoteLocation *remote_pb.RemoteStorageLocation)
func (*FilerRemoteStorage) FindRemoteStorageClient ¶
func (rs *FilerRemoteStorage) FindRemoteStorageClient(p util.FullPath) (client remote_storage.RemoteStorageClient, remoteConf *remote_pb.RemoteConf, found bool)
func (*FilerRemoteStorage) GetRemoteStorageClient ¶
func (rs *FilerRemoteStorage) GetRemoteStorageClient(storageName string) (client remote_storage.RemoteStorageClient, remoteConf *remote_pb.RemoteConf, found bool)
func (*FilerRemoteStorage) LoadRemoteStorageConfigurationsAndMapping ¶
func (rs *FilerRemoteStorage) LoadRemoteStorageConfigurationsAndMapping(filer *Filer) (err error)
type FilerStore ¶
type FilerStore interface { // GetName gets the name to locate the configuration in filer.toml file GetName() string // Initialize initializes the file store Initialize(configuration util.Configuration, prefix string) error InsertEntry(context.Context, *Entry) error UpdateEntry(context.Context, *Entry) (err error) // err == filer_pb.ErrNotFound if not found FindEntry(context.Context, util.FullPath) (entry *Entry, err error) DeleteEntry(context.Context, util.FullPath) (err error) DeleteFolderChildren(context.Context, util.FullPath) (err error) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) BeginTransaction(ctx context.Context) (context.Context, error) CommitTransaction(ctx context.Context) error RollbackTransaction(ctx context.Context) error KvPut(ctx context.Context, key []byte, value []byte) (err error) KvGet(ctx context.Context, key []byte) (value []byte, err error) KvDelete(ctx context.Context, key []byte) (err error) Shutdown() }
type FilerStorePathTranslator ¶
type FilerStorePathTranslator struct {
// contains filtered or unexported fields
}
func NewFilerStorePathTranslator ¶
func NewFilerStorePathTranslator(storeRoot string, store FilerStore) *FilerStorePathTranslator
func (*FilerStorePathTranslator) BeginTransaction ¶
func (*FilerStorePathTranslator) CommitTransaction ¶
func (t *FilerStorePathTranslator) CommitTransaction(ctx context.Context) error
func (*FilerStorePathTranslator) DeleteEntry ¶
func (*FilerStorePathTranslator) DeleteFolderChildren ¶
func (*FilerStorePathTranslator) DeleteOneEntry ¶
func (t *FilerStorePathTranslator) DeleteOneEntry(ctx context.Context, existingEntry *Entry) (err error)
func (*FilerStorePathTranslator) GetName ¶
func (t *FilerStorePathTranslator) GetName() string
func (*FilerStorePathTranslator) Initialize ¶
func (t *FilerStorePathTranslator) Initialize(configuration util.Configuration, prefix string) error
func (*FilerStorePathTranslator) InsertEntry ¶
func (t *FilerStorePathTranslator) InsertEntry(ctx context.Context, entry *Entry) error
func (*FilerStorePathTranslator) KvDelete ¶
func (t *FilerStorePathTranslator) KvDelete(ctx context.Context, key []byte) (err error)
func (*FilerStorePathTranslator) ListDirectoryEntries ¶
func (*FilerStorePathTranslator) ListDirectoryPrefixedEntries ¶
func (*FilerStorePathTranslator) RollbackTransaction ¶
func (t *FilerStorePathTranslator) RollbackTransaction(ctx context.Context) error
func (*FilerStorePathTranslator) Shutdown ¶
func (t *FilerStorePathTranslator) Shutdown()
func (*FilerStorePathTranslator) UpdateEntry ¶
func (t *FilerStorePathTranslator) UpdateEntry(ctx context.Context, entry *Entry) error
type FilerStoreWrapper ¶
type FilerStoreWrapper struct {
// contains filtered or unexported fields
}
func NewFilerStoreWrapper ¶
func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper
func (*FilerStoreWrapper) AddPathSpecificStore ¶
func (fsw *FilerStoreWrapper) AddPathSpecificStore(path string, storeId string, store FilerStore)
func (*FilerStoreWrapper) BeginTransaction ¶
func (*FilerStoreWrapper) CanDropWholeBucket ¶
func (fsw *FilerStoreWrapper) CanDropWholeBucket() bool
func (*FilerStoreWrapper) CommitTransaction ¶
func (fsw *FilerStoreWrapper) CommitTransaction(ctx context.Context) error
func (*FilerStoreWrapper) Debug ¶
func (fsw *FilerStoreWrapper) Debug(writer io.Writer)
func (*FilerStoreWrapper) DeleteEntry ¶
func (*FilerStoreWrapper) DeleteFolderChildren ¶
func (*FilerStoreWrapper) DeleteHardLink ¶
func (fsw *FilerStoreWrapper) DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error
func (*FilerStoreWrapper) DeleteOneEntry ¶
func (fsw *FilerStoreWrapper) DeleteOneEntry(ctx context.Context, existingEntry *Entry) (err error)
func (*FilerStoreWrapper) GetName ¶
func (fsw *FilerStoreWrapper) GetName() string
func (*FilerStoreWrapper) Initialize ¶
func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration, prefix string) error
func (*FilerStoreWrapper) InsertEntry ¶
func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) error
func (*FilerStoreWrapper) KvDelete ¶
func (fsw *FilerStoreWrapper) KvDelete(ctx context.Context, key []byte) (err error)
func (*FilerStoreWrapper) ListDirectoryEntries ¶
func (*FilerStoreWrapper) ListDirectoryPrefixedEntries ¶
func (*FilerStoreWrapper) OnBucketCreation ¶
func (fsw *FilerStoreWrapper) OnBucketCreation(bucket string)
func (*FilerStoreWrapper) OnBucketDeletion ¶
func (fsw *FilerStoreWrapper) OnBucketDeletion(bucket string)
func (*FilerStoreWrapper) RollbackTransaction ¶
func (fsw *FilerStoreWrapper) RollbackTransaction(ctx context.Context) error
func (*FilerStoreWrapper) Shutdown ¶
func (fsw *FilerStoreWrapper) Shutdown()
func (*FilerStoreWrapper) UpdateEntry ¶
func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) error
type HardLinkId ¶
type HardLinkId []byte // 16 bytes + 1 marker byte
func NewHardLinkId ¶
func NewHardLinkId() HardLinkId
type Interval ¶
type IntervalList ¶
type IntervalList[T IntervalValue] struct { Lock sync.RWMutex // contains filtered or unexported fields }
IntervalList mark written intervals within one page chunk
func NewIntervalList ¶
func NewIntervalList[T IntervalValue]() *IntervalList[T]
func NonOverlappingVisibleIntervals ¶
func NonOverlappingVisibleIntervals(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, startOffset int64, stopOffset int64) (visibles *IntervalList[*VisibleInterval], err error)
NonOverlappingVisibleIntervals translates the file chunk into VisibleInterval in memory If the file chunk content is a chunk manifest
func ViewFromChunks ¶
func ViewFromChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, offset int64, size int64) (chunkViews *IntervalList[*ChunkView])
func ViewFromVisibleIntervals ¶
func ViewFromVisibleIntervals(visibles *IntervalList[*VisibleInterval], offset int64, size int64) (chunkViews *IntervalList[*ChunkView])
func (*IntervalList[T]) AppendInterval ¶
func (list *IntervalList[T]) AppendInterval(interval *Interval[T])
func (*IntervalList[T]) Front ¶
func (list *IntervalList[T]) Front() (interval *Interval[T])
func (*IntervalList[T]) InsertInterval ¶
func (list *IntervalList[T]) InsertInterval(startOffset, stopOffset, tsNs int64, value T)
func (*IntervalList[T]) Len ¶
func (list *IntervalList[T]) Len() int
func (*IntervalList[T]) Overlay ¶
func (list *IntervalList[T]) Overlay(startOffset, stopOffset, tsNs int64, value T)
type IntervalValue ¶
type IntervalValue interface { SetStartStop(start, stop int64) Clone() IntervalValue }
type ListEachEntryFunc ¶
type MetaAggregator ¶
type MetaAggregator struct { MetaLogBuffer *log_buffer.LogBuffer // notifying clients ListenersLock sync.Mutex ListenersCond *sync.Cond // contains filtered or unexported fields }
func NewMetaAggregator ¶
func NewMetaAggregator(filer *Filer, self pb.ServerAddress, grpcDialOption grpc.DialOption) *MetaAggregator
MetaAggregator only aggregates data "on the fly". The logs are not re-persisted to disk. The old data comes from what each LocalMetadata persisted on disk.
func (*MetaAggregator) OnPeerUpdate ¶
func (ma *MetaAggregator) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startFrom time.Time)
type OnChunksFunc ¶
type OnHardLinkIdsFunc ¶
type OnHardLinkIdsFunc func([]HardLinkId) error
type ReaderCache ¶
func NewReaderCache ¶
func NewReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn wdclient.LookupFileIdFunctionType) *ReaderCache
func (*ReaderCache) MaybeCache ¶
func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView])
func (*ReaderCache) ReadChunkAt ¶
func (*ReaderCache) UnCache ¶
func (rc *ReaderCache) UnCache(fileId string)
type ReaderPattern ¶
type ReaderPattern struct {
// contains filtered or unexported fields
}
func NewReaderPattern ¶
func NewReaderPattern() *ReaderPattern
func (*ReaderPattern) IsRandomMode ¶
func (rp *ReaderPattern) IsRandomMode() bool
func (*ReaderPattern) MonitorReadAt ¶
func (rp *ReaderPattern) MonitorReadAt(offset int64, size int)
type SectionIndex ¶
type SectionIndex int64
type SingleChunkCacher ¶
type VirtualFilerStore ¶
type VirtualFilerStore interface { FilerStore DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error DeleteOneEntry(ctx context.Context, entry *Entry) error AddPathSpecificStore(path string, storeId string, store FilerStore) OnBucketCreation(bucket string) OnBucketDeletion(bucket string) CanDropWholeBucket() bool }
type VisibleInterval ¶
type VisibleInterval struct {
// contains filtered or unexported fields
}
func (*VisibleInterval) Clone ¶
func (v *VisibleInterval) Clone() IntervalValue
func (*VisibleInterval) SetStartStop ¶
func (v *VisibleInterval) SetStartStop(start, stop int64)
type VolumeServerJwtFunction ¶
Source Files ¶
- configuration.go
- entry.go
- entry_codec.go
- filechunk_group.go
- filechunk_manifest.go
- filechunk_section.go
- filechunks.go
- filechunks_read.go
- filer.go
- filer_buckets.go
- filer_conf.go
- filer_delete_entry.go
- filer_deletion.go
- filer_hardlink.go
- filer_notify.go
- filer_notify_append.go
- filer_on_meta_event.go
- filer_rename.go
- filer_search.go
- filerstore.go
- filerstore_hardlink.go
- filerstore_translate_path.go
- filerstore_wrapper.go
- interval_list.go
- meta_aggregator.go
- meta_replay.go
- read_remote.go
- read_write.go
- reader_at.go
- reader_cache.go
- reader_pattern.go
- remote_mapping.go
- remote_storage.go
- s3iam_conf.go
- stream.go
- topics.go
Directories ¶
Path | Synopsis |
---|---|
elastic
|
|
v7
Package elastic is for elastic filer store.
|
Package elastic is for elastic filer store. |
Package sqlite is for sqlite filer store.
|
Package sqlite is for sqlite filer store. |
* Package tikv is for TiKV filer store.
|
* Package tikv is for TiKV filer store. |
Package ydb is for YDB filer store.
|
Package ydb is for YDB filer store. |