Documentation ¶
Index ¶
- Constants
- Variables
- func CompactFileChunks(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk)
- func DoMinusChunks(as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk)
- func ETag(entry *filer_pb.Entry) (etag string)
- func ETagChunks(chunks []*filer_pb.FileChunk) (etag string)
- func ETagEntry(entry *Entry) (etag string)
- func EntryAttributeToPb(entry *Entry) *filer_pb.FuseAttributes
- func EqualEntry(a, b *Entry) bool
- func FileSize(entry *filer_pb.Entry) (size uint64)
- func HasChunkManifest(chunks []*filer_pb.FileChunk) bool
- func LookupByMasterClientFn(masterClient *wdclient.MasterClient) func(vids []string) (map[string]operation.LookupResult, error)
- func MaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk) (chunks []*filer_pb.FileChunk, err error)
- func MinusChunks(lookupFileIdFn LookupFileIdFunctionType, as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk, err error)
- func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) ([]byte, error)
- func ReadEachLogEntry(r io.Reader, sizeBuf []byte, ns int64, ...) (lastTsNs int64, err error)
- func Replay(filerStore FilerStore, resp *filer_pb.SubscribeMetadataResponse) error
- func ResolveChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (dataChunks, manifestChunks []*filer_pb.FileChunk, manifestResolveErr error)
- func ResolveOneChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunk *filer_pb.FileChunk) (dataChunks []*filer_pb.FileChunk, manifestResolveErr error)
- func SeparateManifestChunks(chunks []*filer_pb.FileChunk) (manifestChunks, nonManifestChunks []*filer_pb.FileChunk)
- func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*filer_pb.FileChunk, ...) error
- func TotalSize(chunks []*filer_pb.FileChunk) (size uint64)
- func VolumeId(fileId string) string
- type Attr
- type BucketName
- type BucketOption
- type ChunkReadAt
- type ChunkStreamReader
- type ChunkView
- type Entry
- func (entry *Entry) DecodeAttributesAndChunks(blob []byte) error
- func (entry *Entry) EncodeAttributesAndChunks() ([]byte, error)
- func (entry *Entry) Size() uint64
- func (entry *Entry) Timestamp() time.Time
- func (entry *Entry) ToProtoEntry() *filer_pb.Entry
- func (entry *Entry) ToProtoFullEntry() *filer_pb.FullEntry
- type Filer
- func (f *Filer) AggregateFromPeers(self string, filers []string)
- func (f *Filer) BeginTransaction(ctx context.Context) (context.Context, error)
- func (f *Filer) CommitTransaction(ctx context.Context) error
- func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFromOtherCluster bool, ...) error
- func (f *Filer) DeleteChunks(chunks []*filer_pb.FileChunk)
- func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, ...) (err error)
- func (f *Filer) FindEntry(ctx context.Context, p util.FullPath) (entry *Entry, err error)
- func (fs *Filer) GetMaster() string
- func (f *Filer) GetStore() (store FilerStore)
- func (fs *Filer) KeepConnectedToMaster()
- func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, ...) ([]*Entry, error)
- func (f *Filer) LoadBuckets()
- func (f *Filer) LoadConfiguration(config *viper.Viper)
- func (f *Filer) NotifyUpdateEvent(ctx context.Context, oldEntry, newEntry *Entry, ...)
- func (f *Filer) ReadBucketOption(buketName string) (replication string, fsync bool)
- func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (lastTsNs int64, err error)
- func (f *Filer) RollbackTransaction(ctx context.Context) error
- func (f *Filer) SetStore(store FilerStore)
- func (f *Filer) Shutdown()
- func (f *Filer) UpdateEntry(ctx context.Context, oldEntry, entry *Entry) (err error)
- type FilerBuckets
- type FilerStore
- type FilerStoreWrapper
- func (fsw *FilerStoreWrapper) BeginTransaction(ctx context.Context) (context.Context, error)
- func (fsw *FilerStoreWrapper) CommitTransaction(ctx context.Context) error
- func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath) (err error)
- func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error)
- func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) (entry *Entry, err error)
- func (fsw *FilerStoreWrapper) GetName() string
- func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration, prefix string) error
- func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) error
- func (fsw *FilerStoreWrapper) KvDelete(ctx context.Context, key []byte) (err error)
- func (fsw *FilerStoreWrapper) KvGet(ctx context.Context, key []byte) (value []byte, err error)
- func (fsw *FilerStoreWrapper) KvPut(ctx context.Context, key []byte, value []byte) (err error)
- func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, ...) ([]*Entry, error)
- func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, ...) ([]*Entry, error)
- func (fsw *FilerStoreWrapper) RollbackTransaction(ctx context.Context) error
- func (fsw *FilerStoreWrapper) Shutdown()
- func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) error
- type LookupFileIdFunctionType
- type MetaAggregator
- type SaveDataAsChunkFunctionType
- type VisibleInterval
Constants ¶
View Source
const ( LogFlushInterval = time.Minute PaginationSize = 1024 * 256 FilerStoreId = "filer.store.id" )
View Source
const ( TopicsDir = "/topics" SystemLogDir = TopicsDir + "/.system/log" )
View Source
const (
ManifestBatch = 1000
)
View Source
const (
MetaOffsetPrefix = "Meta"
)
Variables ¶
View Source
var ( ErrUnsupportedListDirectoryPrefixed = errors.New("unsupported directory prefix listing") ErrKvNotImplemented = errors.New("kv not implemented yet") ErrKvNotFound = errors.New("kv: not found") )
View Source
var (
Stores []FilerStore
)
Functions ¶
func CompactFileChunks ¶
func CompactFileChunks(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk)
func ETagChunks ¶
func EntryAttributeToPb ¶
func EntryAttributeToPb(entry *Entry) *filer_pb.FuseAttributes
func EqualEntry ¶
func HasChunkManifest ¶
func LookupByMasterClientFn ¶
func LookupByMasterClientFn(masterClient *wdclient.MasterClient) func(vids []string) (map[string]operation.LookupResult, error)
func MaybeManifestize ¶
func MinusChunks ¶
func ReadEachLogEntry ¶
func Replay ¶
func Replay(filerStore FilerStore, resp *filer_pb.SubscribeMetadataResponse) error
func ResolveChunkManifest ¶
func ResolveOneChunkManifest ¶
func SeparateManifestChunks ¶
func StreamContent ¶
Types ¶
type Attr ¶
type Attr struct { Mtime time.Time // time of last modification Crtime time.Time // time of creation (OS X only) Mode os.FileMode // file mode Uid uint32 // owner uid Gid uint32 // group gid Mime string // mime type Replication string // replication Collection string // collection name TtlSec int32 // ttl in seconds UserName string GroupNames []string SymlinkTarget string Md5 []byte FileSize uint64 }
func PbToEntryAttribute ¶
func PbToEntryAttribute(attr *filer_pb.FuseAttributes) Attr
func (Attr) IsDirectory ¶
type BucketName ¶
type BucketName string
type BucketOption ¶
type BucketOption struct { Name BucketName Replication string // contains filtered or unexported fields }
type ChunkReadAt ¶
type ChunkReadAt struct {
// contains filtered or unexported fields
}
func NewChunkReaderAtFromClient ¶
func NewChunkReaderAtFromClient(filerClient filer_pb.FilerClient, chunkViews []*ChunkView, chunkCache chunk_cache.ChunkCache, fileSize int64) *ChunkReadAt
type ChunkStreamReader ¶
type ChunkStreamReader struct {
// contains filtered or unexported fields
}
---------------- ChunkStreamReader ----------------------------------
func NewChunkStreamReader ¶
func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader
func NewChunkStreamReaderFromFiler ¶
func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader
func (*ChunkStreamReader) Close ¶
func (c *ChunkStreamReader) Close()
type ChunkView ¶
type ChunkView struct { FileId string Offset int64 Size uint64 LogicOffset int64 // actual offset in the file, for the data specified via [offset, offset+size) in current chunk ChunkSize uint64 CipherKey []byte IsGzipped bool }
func ViewFromChunks ¶
func ViewFromVisibleIntervals ¶
func ViewFromVisibleIntervals(visibles []VisibleInterval, offset int64, size int64) (views []*ChunkView)
func (*ChunkView) IsFullChunk ¶
type Entry ¶
type Entry struct { util.FullPath Attr Extended map[string][]byte // the following is for files Chunks []*filer_pb.FileChunk `json:"chunks,omitempty"` }
func (*Entry) DecodeAttributesAndChunks ¶
func (*Entry) EncodeAttributesAndChunks ¶
func (*Entry) ToProtoEntry ¶
func (*Entry) ToProtoFullEntry ¶
type Filer ¶
type Filer struct { Store *FilerStoreWrapper MasterClient *wdclient.MasterClient GrpcDialOption grpc.DialOption DirBucketsPath string FsyncBuckets []string Cipher bool LocalMetaLogBuffer *log_buffer.LogBuffer MetaAggregator *MetaAggregator Signature int32 // contains filtered or unexported fields }
func (*Filer) AggregateFromPeers ¶
func (*Filer) BeginTransaction ¶
func (*Filer) CreateEntry ¶
func (*Filer) DeleteChunks ¶
func (*Filer) DeleteEntryMetaAndData ¶
func (*Filer) GetStore ¶
func (f *Filer) GetStore() (store FilerStore)
func (*Filer) KeepConnectedToMaster ¶
func (fs *Filer) KeepConnectedToMaster()
func (*Filer) ListDirectoryEntries ¶
func (*Filer) LoadBuckets ¶
func (f *Filer) LoadBuckets()
func (*Filer) LoadConfiguration ¶
func (*Filer) NotifyUpdateEvent ¶
func (*Filer) ReadBucketOption ¶
func (*Filer) ReadPersistedLogBuffer ¶
func (*Filer) SetStore ¶
func (f *Filer) SetStore(store FilerStore)
type FilerBuckets ¶
type FilerStore ¶
type FilerStore interface { // GetName gets the name to locate the configuration in filer.toml file GetName() string // Initialize initializes the file store Initialize(configuration util.Configuration, prefix string) error InsertEntry(context.Context, *Entry) error UpdateEntry(context.Context, *Entry) (err error) // err == filer2.ErrNotFound if not found FindEntry(context.Context, util.FullPath) (entry *Entry, err error) DeleteEntry(context.Context, util.FullPath) (err error) DeleteFolderChildren(context.Context, util.FullPath) (err error) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error) BeginTransaction(ctx context.Context) (context.Context, error) CommitTransaction(ctx context.Context) error RollbackTransaction(ctx context.Context) error KvPut(ctx context.Context, key []byte, value []byte) (err error) KvGet(ctx context.Context, key []byte) (value []byte, err error) KvDelete(ctx context.Context, key []byte) (err error) Shutdown() }
type FilerStoreWrapper ¶
type FilerStoreWrapper struct {
ActualStore FilerStore
}
func NewFilerStoreWrapper ¶
func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper
func (*FilerStoreWrapper) BeginTransaction ¶
func (*FilerStoreWrapper) CommitTransaction ¶
func (fsw *FilerStoreWrapper) CommitTransaction(ctx context.Context) error
func (*FilerStoreWrapper) DeleteEntry ¶
func (*FilerStoreWrapper) DeleteFolderChildren ¶
func (*FilerStoreWrapper) GetName ¶
func (fsw *FilerStoreWrapper) GetName() string
func (*FilerStoreWrapper) Initialize ¶
func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration, prefix string) error
func (*FilerStoreWrapper) InsertEntry ¶
func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) error
func (*FilerStoreWrapper) KvDelete ¶
func (fsw *FilerStoreWrapper) KvDelete(ctx context.Context, key []byte) (err error)
func (*FilerStoreWrapper) ListDirectoryEntries ¶
func (*FilerStoreWrapper) ListDirectoryPrefixedEntries ¶
func (*FilerStoreWrapper) RollbackTransaction ¶
func (fsw *FilerStoreWrapper) RollbackTransaction(ctx context.Context) error
func (*FilerStoreWrapper) Shutdown ¶
func (fsw *FilerStoreWrapper) Shutdown()
func (*FilerStoreWrapper) UpdateEntry ¶
func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) error
type LookupFileIdFunctionType ¶
func LookupFn ¶
func LookupFn(filerClient filer_pb.FilerClient) LookupFileIdFunctionType
type MetaAggregator ¶
type MetaAggregator struct { MetaLogBuffer *log_buffer.LogBuffer // notifying clients ListenersLock sync.Mutex ListenersCond *sync.Cond // contains filtered or unexported fields }
func NewMetaAggregator ¶
func NewMetaAggregator(filers []string, grpcDialOption grpc.DialOption) *MetaAggregator
MetaAggregator only aggregates data "on the fly". The logs are not re-persisted to disk. The old data comes from what each LocalMetadata persisted on disk.
func (*MetaAggregator) StartLoopSubscribe ¶
func (ma *MetaAggregator) StartLoopSubscribe(f *Filer, self string)
type VisibleInterval ¶
type VisibleInterval struct {
// contains filtered or unexported fields
}
func MergeIntoVisibles ¶
func MergeIntoVisibles(visibles []VisibleInterval, chunk *filer_pb.FileChunk) (newVisibles []VisibleInterval)
func NonOverlappingVisibleIntervals ¶
func NonOverlappingVisibleIntervals(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (visibles []VisibleInterval, err error)
NonOverlappingVisibleIntervals translates the file chunk into VisibleInterval in memory If the file chunk content is a chunk manifest
Source Files ¶
Click to show internal directories.
Click to hide internal directories.