Documentation ¶
Index ¶
- Constants
- func NewFs(name, rootPath string, m configmap.Mapper) (fs.Fs, error)
- type BackgroundUploadState
- type Directory
- type Features
- type Fs
- func (f *Fs) About(ctx context.Context) (*fs.Usage, error)
- func (f *Fs) ChangeNotify(ctx context.Context, notifyFunc func(string, fs.EntryType), ...)
- func (f *Fs) ChunkSize() int64
- func (f *Fs) CleanUp(ctx context.Context) error
- func (f *Fs) CleanUpCache(ignoreLastTs bool)
- func (f *Fs) Command(ctx context.Context, name string, arg []string, opt map[string]string) (interface{}, error)
- func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error)
- func (f *Fs) DirCacheFlush()
- func (f *Fs) DirMove(ctx context.Context, src fs.Fs, srcRemote, dstRemote string) error
- func (f *Fs) Disconnect(ctx context.Context) error
- func (f *Fs) Features() *fs.Features
- func (f *Fs) GetBackgroundUploadChannel() chan BackgroundUploadState
- func (f *Fs) Hashes() hash.Set
- func (f *Fs) InfoAge() time.Duration
- func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error)
- func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error)
- func (f *Fs) MergeDirs(ctx context.Context, dirs []fs.Directory) error
- func (f *Fs) Mkdir(ctx context.Context, dir string) error
- func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object, error)
- func (f *Fs) Name() string
- func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error)
- func (f *Fs) Purge(ctx context.Context, dir string) error
- func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error)
- func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error)
- func (f *Fs) PutUnchecked(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error)
- func (f *Fs) Rmdir(ctx context.Context, dir string) error
- func (f *Fs) Root() string
- func (f *Fs) SetWrapper(wrapper fs.Fs)
- func (f *Fs) Stats() (map[string]map[string]interface{}, error)
- func (f *Fs) StopBackgroundRunners()
- func (f *Fs) String() string
- func (f *Fs) TempUploadWaitTime() time.Duration
- func (f *Fs) UnWrap() fs.Fs
- func (f *Fs) UserInfo(ctx context.Context) (map[string]string, error)
- func (f *Fs) WrapFs() fs.Fs
- type Handle
- type Memory
- func (m *Memory) AddChunk(fp string, data []byte, offset int64) error
- func (m *Memory) AddChunkAhead(fp string, data []byte, offset int64, t time.Duration) error
- func (m *Memory) CleanChunksByAge(chunkAge time.Duration)
- func (m *Memory) CleanChunksByNeed(offset int64)
- func (m *Memory) CleanChunksBySize(maxSize int64)
- func (m *Memory) Connect(defaultExpiration time.Duration) error
- func (m *Memory) GetChunk(cachedObject *Object, offset int64) ([]byte, error)
- func (m *Memory) HasChunk(cachedObject *Object, offset int64) bool
- type NotificationContainer
- type Object
- func (o *Object) Fs() fs.Info
- func (o *Object) Hash(ctx context.Context, ht hash.Type) (string, error)
- func (o *Object) ModTime(ctx context.Context) time.Time
- func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (io.ReadCloser, error)
- func (o *Object) Remote() string
- func (o *Object) Remove(ctx context.Context) error
- func (o *Object) SetModTime(ctx context.Context, t time.Time) error
- func (o *Object) Size() int64
- func (o *Object) Storable() bool
- func (o *Object) String() string
- func (o *Object) UnWrap() fs.Object
- func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error
- type Options
- type Persistent
- func (b *Persistent) AddBatchDir(cachedDirs []*Directory) error
- func (b *Persistent) AddChunk(fp string, data []byte, offset int64) error
- func (b *Persistent) AddDir(cachedDir *Directory) error
- func (b *Persistent) AddObject(cachedObject *Object) error
- func (b *Persistent) CleanChunksByAge(chunkAge time.Duration)
- func (b *Persistent) CleanChunksByNeed(offset int64)
- func (b *Persistent) CleanChunksBySize(maxSize int64)
- func (b *Persistent) Close()
- func (b *Persistent) ExpireDir(cd *Directory) error
- func (b *Persistent) ExpireObject(co *Object, withData bool) error
- func (b *Persistent) GetChunk(cachedObject *Object, offset int64) ([]byte, error)
- func (b *Persistent) GetChunkTs(path string, offset int64) (time.Time, error)
- func (b *Persistent) GetDir(remote string) (*Directory, error)
- func (b *Persistent) GetDirEntries(cachedDir *Directory) (fs.DirEntries, error)
- func (b *Persistent) GetObject(cachedObject *Object) (err error)
- func (b *Persistent) HasChunk(cachedObject *Object, offset int64) bool
- func (b *Persistent) HasEntry(remote string) bool
- func (b *Persistent) Purge()
- func (b *Persistent) ReconcileTempUploads(ctx context.Context, cacheFs *Fs) error
- func (b *Persistent) RemoveDir(fp string) error
- func (b *Persistent) RemoveObject(fp string) error
- func (b *Persistent) SearchPendingUpload(remote string) (started bool, err error)
- func (b *Persistent) Stats() (map[string]map[string]interface{}, error)
- func (b *Persistent) String() string
- type PlaySessionStateNotification
- type PlexNotification
Constants ¶
const ( // DefCacheChunkSize is the default value for chunk size DefCacheChunkSize = fs.SizeSuffix(5 * 1024 * 1024) // DefCacheTotalChunkSize is the default value for the maximum size of stored chunks DefCacheTotalChunkSize = fs.SizeSuffix(10 * 1024 * 1024 * 1024) // DefCacheChunkCleanInterval is the interval at which chunks are cleaned DefCacheChunkCleanInterval = fs.Duration(time.Minute) // DefCacheInfoAge is the default value for object info age DefCacheInfoAge = fs.Duration(6 * time.Hour) // DefCacheReadRetries is the default value for read retries DefCacheReadRetries = 10 // DefCacheTotalWorkers is how many workers run in parallel to download chunks DefCacheTotalWorkers = 4 // DefCacheChunkNoMemory will enable or disable in-memory storage for chunks DefCacheChunkNoMemory = false // DefCacheRps limits the number of requests per second to the source FS DefCacheRps = -1 // DefCacheWrites will cache file data on writes through the cache DefCacheWrites = false // DefCacheTmpWaitTime says how long should files be stored in local cache before being uploaded DefCacheTmpWaitTime = fs.Duration(15 * time.Second) // DefCacheDbWaitTime defines how long the cache backend should wait for the DB to be available DefCacheDbWaitTime = fs.Duration(1 * time.Second) )
const ( // BackgroundUploadStarted is a state for a temp file that has started upload BackgroundUploadStarted = iota // BackgroundUploadCompleted is a state for a temp file that has completed upload BackgroundUploadCompleted // BackgroundUploadError is a state for a temp file that has an error upload BackgroundUploadError )
const ( RootBucket = "root" RootTsBucket = "rootTs" DataTsBucket = "dataTs" )
Constants
Variables ¶
This section is empty.
Functions ¶
Types ¶
type BackgroundUploadState ¶
BackgroundUploadState is an entity that maps to an existing file which is stored on the temp fs
type Directory ¶
type Directory struct { Directory fs.Directory `json:"-"` // can be nil CacheFs *Fs `json:"-"` // cache fs Name string `json:"name"` // name of the directory Dir string `json:"dir"` // abs path of the directory CacheModTime int64 `json:"modTime"` // modification or creation time - IsZero for unknown CacheSize int64 `json:"size"` // size of directory and contents or -1 if unknown CacheItems int64 `json:"items"` // number of objects or -1 for unknown CacheType string `json:"cacheType"` // object type CacheTs *time.Time `json:",omitempty"` }
Directory is a generic dir that stores basic information about it
func DirectoryFromOriginal ¶
DirectoryFromOriginal builds one from a generic fs.Directory
func NewDirectory ¶
NewDirectory builds an empty dir which will be used to unmarshal data in it
func ShallowDirectory ¶
ShallowDirectory builds an empty dir which will be used to unmarshal data in it
type Features ¶
type Features struct { PurgeDb bool // purge the db before starting DbWaitTime time.Duration // time to wait for DB to be available }
Features flags for this storage type
type Fs ¶
Fs represents a wrapped fs.Fs
func (*Fs) ChangeNotify ¶
func (f *Fs) ChangeNotify(ctx context.Context, notifyFunc func(string, fs.EntryType), pollInterval <-chan time.Duration)
ChangeNotify can subscribe multiple callers this is coupled with the wrapped fs ChangeNotify (if it supports it) and also notifies other caches (i.e VFS) to clear out whenever something changes
func (*Fs) CleanUpCache ¶
CleanUpCache will cleanup only the cache data that is expired
func (*Fs) Command ¶ added in v1.52.0
func (f *Fs) Command(ctx context.Context, name string, arg []string, opt map[string]string) (interface{}, error)
Command the backend to run a named command
The command run is name args may be used to read arguments from opts may be used to read optional arguments from
The result should be capable of being JSON encoded If it is a string or a []string it will be shown to the user otherwise it will be JSON encoded and shown to the user like that
func (*Fs) DirMove ¶
DirMove moves src, srcRemote to this remote at dstRemote using server side move operations.
func (*Fs) Disconnect ¶
Disconnect the current user
func (*Fs) GetBackgroundUploadChannel ¶
func (f *Fs) GetBackgroundUploadChannel() chan BackgroundUploadState
GetBackgroundUploadChannel returns a channel that can be listened to for remote activities that happen in the background
func (*Fs) ListR ¶
ListR lists the objects and directories of the Fs starting from dir recursively into out.
func (*Fs) MergeDirs ¶ added in v1.53.0
MergeDirs merges the contents of all the directories passed in into the first one and rmdirs the other directories.
func (*Fs) Put ¶
func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error)
Put in to the remote path with the modTime given of the given size
func (*Fs) PutStream ¶
func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error)
PutStream uploads the object
func (*Fs) PutUnchecked ¶
func (f *Fs) PutUnchecked(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error)
PutUnchecked uploads the object
func (*Fs) SetWrapper ¶
SetWrapper sets the Fs that is wrapping this Fs
func (*Fs) StopBackgroundRunners ¶
func (f *Fs) StopBackgroundRunners()
StopBackgroundRunners will signall all the runners to stop their work can be triggered from a terminate signal or from testing between runs
func (*Fs) TempUploadWaitTime ¶
TempUploadWaitTime returns the configured temp file upload wait time
type Handle ¶
type Handle struct { UseMemory bool // contains filtered or unexported fields }
Handle is managing the read/write/seek operations on an open handle
func NewObjectHandle ¶
NewObjectHandle returns a new Handle for an existing Object
type Memory ¶
type Memory struct {
// contains filtered or unexported fields
}
Memory is a wrapper of transient storage for a go-cache store
func NewMemory ¶
NewMemory builds this cache storage defaultExpiration will set the expiry time of chunks in this storage
func (*Memory) AddChunkAhead ¶
AddChunkAhead adds a new chunk of a cached object
func (*Memory) CleanChunksByAge ¶
CleanChunksByAge will cleanup on a cron basis
func (*Memory) CleanChunksByNeed ¶
CleanChunksByNeed will cleanup chunks after the FS passes a specific chunk
func (*Memory) CleanChunksBySize ¶
CleanChunksBySize will cleanup chunks after the total size passes a certain point
type NotificationContainer ¶
type NotificationContainer struct { Type string `json:"type"` Size int `json:"size"` PlaySessionState []PlaySessionStateNotification `json:"PlaySessionStateNotification"` }
NotificationContainer is part of the API response of Plex
type Object ¶
type Object struct { fs.Object `json:"-"` ParentFs fs.Fs `json:"-"` // parent fs CacheFs *Fs `json:"-"` // cache fs Name string `json:"name"` // name of the directory Dir string `json:"dir"` // abs path of the object CacheModTime int64 `json:"modTime"` // modification or creation time - IsZero for unknown CacheSize int64 `json:"size"` // size of directory and contents or -1 if unknown CacheStorable bool `json:"storable"` // says whether this object can be stored CacheType string `json:"cacheType"` CacheTs time.Time `json:"cacheTs"` CacheHashes map[hash.Type]string // all supported hashes cached // contains filtered or unexported fields }
Object is a generic file like object that stores basic information about it
func ObjectFromOriginal ¶
ObjectFromOriginal builds one from a generic fs.Object
func (*Object) Hash ¶
Hash requests a hash of the object and stores in the cache since it might or might not be called, this is lazy loaded
func (*Object) Open ¶
func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (io.ReadCloser, error)
Open is used to request a specific part of the file using fs.RangeOption
func (*Object) SetModTime ¶
SetModTime sets the ModTime of this object
type Options ¶
type Options struct { Remote string `config:"remote"` PlexURL string `config:"plex_url"` PlexUsername string `config:"plex_username"` PlexPassword string `config:"plex_password"` PlexToken string `config:"plex_token"` PlexInsecure bool `config:"plex_insecure"` ChunkSize fs.SizeSuffix `config:"chunk_size"` InfoAge fs.Duration `config:"info_age"` ChunkTotalSize fs.SizeSuffix `config:"chunk_total_size"` DbPath string `config:"db_path"` ChunkPath string `config:"chunk_path"` DbPurge bool `config:"db_purge"` ChunkCleanInterval fs.Duration `config:"chunk_clean_interval"` ReadRetries int `config:"read_retries"` TotalWorkers int `config:"workers"` ChunkNoMemory bool `config:"chunk_no_memory"` Rps int `config:"rps"` StoreWrites bool `config:"writes"` TempWritePath string `config:"tmp_upload_path"` TempWaitTime fs.Duration `config:"tmp_wait_time"` DbWaitTime fs.Duration `config:"db_wait_time"` }
Options defines the configuration for this backend
type Persistent ¶
type Persistent struct {
// contains filtered or unexported fields
}
Persistent is a wrapper of persistent storage for a bolt.DB file
func GetPersistent ¶
func GetPersistent(dbPath, chunkPath string, f *Features) (*Persistent, error)
GetPersistent returns a single instance for the specific store
func (*Persistent) AddBatchDir ¶
func (b *Persistent) AddBatchDir(cachedDirs []*Directory) error
AddBatchDir will update a list of CachedDirectory metadata and all their entries
func (*Persistent) AddChunk ¶
func (b *Persistent) AddChunk(fp string, data []byte, offset int64) error
AddChunk adds a new chunk of a cached object
func (*Persistent) AddDir ¶
func (b *Persistent) AddDir(cachedDir *Directory) error
AddDir will update a CachedDirectory metadata and all its entries
func (*Persistent) AddObject ¶
func (b *Persistent) AddObject(cachedObject *Object) error
AddObject will create a cached object in its parent directory
func (*Persistent) CleanChunksByAge ¶
func (b *Persistent) CleanChunksByAge(chunkAge time.Duration)
CleanChunksByAge will cleanup on a cron basis
func (*Persistent) CleanChunksByNeed ¶
func (b *Persistent) CleanChunksByNeed(offset int64)
CleanChunksByNeed is a noop for this implementation
func (*Persistent) CleanChunksBySize ¶
func (b *Persistent) CleanChunksBySize(maxSize int64)
CleanChunksBySize will cleanup chunks after the total size passes a certain point
func (*Persistent) Close ¶
func (b *Persistent) Close()
Close should be called when the program ends gracefully
func (*Persistent) ExpireDir ¶
func (b *Persistent) ExpireDir(cd *Directory) error
ExpireDir will flush a CachedDirectory and all its objects from the objects chunks will remain as they are
func (*Persistent) ExpireObject ¶
func (b *Persistent) ExpireObject(co *Object, withData bool) error
ExpireObject will flush an Object and all its data if desired
func (*Persistent) GetChunk ¶
func (b *Persistent) GetChunk(cachedObject *Object, offset int64) ([]byte, error)
GetChunk will retrieve a single chunk which belongs to a cached object or an error if it doesn't find it
func (*Persistent) GetChunkTs ¶
GetChunkTs retrieves the current timestamp of this chunk
func (*Persistent) GetDir ¶
func (b *Persistent) GetDir(remote string) (*Directory, error)
GetDir will retrieve data of a cached directory
func (*Persistent) GetDirEntries ¶
func (b *Persistent) GetDirEntries(cachedDir *Directory) (fs.DirEntries, error)
GetDirEntries will return a CachedDirectory, its list of dir entries and/or an error if it encountered issues
func (*Persistent) GetObject ¶
func (b *Persistent) GetObject(cachedObject *Object) (err error)
GetObject will return a CachedObject from its parent directory or an error if it doesn't find it
func (*Persistent) HasChunk ¶
func (b *Persistent) HasChunk(cachedObject *Object, offset int64) bool
HasChunk confirms the existence of a single chunk of an object
func (*Persistent) HasEntry ¶
func (b *Persistent) HasEntry(remote string) bool
HasEntry confirms the existence of a single entry (dir or object)
func (*Persistent) ReconcileTempUploads ¶
func (b *Persistent) ReconcileTempUploads(ctx context.Context, cacheFs *Fs) error
ReconcileTempUploads will recursively look for all the files in the temp directory and add them to the queue
func (*Persistent) RemoveDir ¶
func (b *Persistent) RemoveDir(fp string) error
RemoveDir will delete a CachedDirectory, all its objects and all the chunks stored for it
func (*Persistent) RemoveObject ¶
func (b *Persistent) RemoveObject(fp string) error
RemoveObject will delete a single cached object and all the chunks which belong to it
func (*Persistent) SearchPendingUpload ¶
func (b *Persistent) SearchPendingUpload(remote string) (started bool, err error)
SearchPendingUpload returns the file info from the pending queue of uploads
func (*Persistent) Stats ¶
func (b *Persistent) Stats() (map[string]map[string]interface{}, error)
Stats returns a go map with the stats key values
func (*Persistent) String ¶
func (b *Persistent) String() string
String will return a human friendly string for this DB (currently the dbPath)
type PlaySessionStateNotification ¶
type PlaySessionStateNotification struct { SessionKey string `json:"sessionKey"` GUID string `json:"guid"` Key string `json:"key"` ViewOffset int64 `json:"viewOffset"` State string `json:"state"` TranscodeSession string `json:"transcodeSession"` }
PlaySessionStateNotification is part of the API response of Plex
type PlexNotification ¶
type PlexNotification struct {
Container NotificationContainer `json:"NotificationContainer"`
}
PlexNotification is part of the API response of Plex