cache

package
v0.0.0-...-b4aeae5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 7, 2024 License: MIT Imports: 40 Imported by: 0

Documentation

Overview

Package cache implements a virtual provider to cache existing remotes.

Index

Constants

View Source
const (
	// DefCacheChunkSize is the default value for chunk size
	DefCacheChunkSize = fs.SizeSuffix(5 * 1024 * 1024)
	// DefCacheTotalChunkSize is the default value for the maximum size of stored chunks
	DefCacheTotalChunkSize = fs.SizeSuffix(10 * 1024 * 1024 * 1024)
	// DefCacheChunkCleanInterval is the interval at which chunks are cleaned
	DefCacheChunkCleanInterval = fs.Duration(time.Minute)
	// DefCacheInfoAge is the default value for object info age
	DefCacheInfoAge = fs.Duration(6 * time.Hour)
	// DefCacheReadRetries is the default value for read retries
	DefCacheReadRetries = 10
	// DefCacheTotalWorkers is how many workers run in parallel to download chunks
	DefCacheTotalWorkers = 4
	// DefCacheChunkNoMemory will enable or disable in-memory storage for chunks
	DefCacheChunkNoMemory = false
	// DefCacheRps limits the number of requests per second to the source FS
	DefCacheRps = -1
	// DefCacheWrites will cache file data on writes through the cache
	DefCacheWrites = false
	// DefCacheTmpWaitTime says how long should files be stored in local cache before being uploaded
	DefCacheTmpWaitTime = fs.Duration(15 * time.Second)
	// DefCacheDbWaitTime defines how long the cache backend should wait for the DB to be available
	DefCacheDbWaitTime = fs.Duration(1 * time.Second)
)
View Source
const (
	// BackgroundUploadStarted is a state for a temp file that has started upload
	BackgroundUploadStarted = iota
	// BackgroundUploadCompleted is a state for a temp file that has completed upload
	BackgroundUploadCompleted
	// BackgroundUploadError is a state for a temp file that has an error upload
	BackgroundUploadError
)
View Source
const (
	RootBucket   = "root"
	RootTsBucket = "rootTs"
	DataTsBucket = "dataTs"
)

Constants

Variables

This section is empty.

Functions

func NewFs

func NewFs(ctx context.Context, name, rootPath string, m configmap.Mapper) (fs.Fs, error)

NewFs constructs an Fs from the path, container:path

Types

type BackgroundUploadState

type BackgroundUploadState struct {
	Remote string
	Status int
	Error  error
}

BackgroundUploadState is an entity that maps to an existing file which is stored on the temp fs

type Directory

type Directory struct {
	Directory fs.Directory `json:"-"` // can be nil

	CacheFs      *Fs    `json:"-"`       // cache fs
	Name         string `json:"name"`    // name of the directory
	Dir          string `json:"dir"`     // abs path of the directory
	CacheModTime int64  `json:"modTime"` // modification or creation time - IsZero for unknown
	CacheSize    int64  `json:"size"`    // size of directory and contents or -1 if unknown

	CacheItems int64      `json:"items"`     // number of objects or -1 for unknown
	CacheType  string     `json:"cacheType"` // object type
	CacheTs    *time.Time `json:",omitempty"`
}

Directory is a generic dir that stores basic information about it

func DirectoryFromOriginal

func DirectoryFromOriginal(ctx context.Context, f *Fs, d fs.Directory) *Directory

DirectoryFromOriginal builds one from a generic fs.Directory

func NewDirectory

func NewDirectory(f *Fs, remote string) *Directory

NewDirectory builds an empty dir which will be used to unmarshal data in it

func ShallowDirectory

func ShallowDirectory(f *Fs, remote string) *Directory

ShallowDirectory builds an empty dir which will be used to unmarshal data in it

func (*Directory) Fs

func (d *Directory) Fs() fs.Info

Fs returns its FS info

func (*Directory) ID

func (d *Directory) ID() string

ID returns the ID of the cached directory if known

func (*Directory) Items

func (d *Directory) Items() int64

Items returns the cached Items

func (*Directory) ModTime

func (d *Directory) ModTime(ctx context.Context) time.Time

ModTime returns the cached ModTime

func (*Directory) Remote

func (d *Directory) Remote() string

Remote returns the remote path

func (*Directory) Size

func (d *Directory) Size() int64

Size returns the cached Size

func (*Directory) String

func (d *Directory) String() string

String returns a human friendly name for this object

type Features

type Features struct {
	PurgeDb    bool          // purge the db before starting
	DbWaitTime time.Duration // time to wait for DB to be available
}

Features flags for this storage type

type Fs

type Fs struct {
	fs.Fs
	// contains filtered or unexported fields
}

Fs represents a wrapped fs.Fs

func (*Fs) About

func (f *Fs) About(ctx context.Context) (*fs.Usage, error)

About gets quota information from the Fs

func (*Fs) ChangeNotify

func (f *Fs) ChangeNotify(ctx context.Context, notifyFunc func(string, fs.EntryType), pollInterval <-chan time.Duration)

ChangeNotify can subscribe multiple callers this is coupled with the wrapped fs ChangeNotify (if it supports it) and also notifies other caches (i.e VFS) to clear out whenever something changes

func (*Fs) ChunkSize

func (f *Fs) ChunkSize() int64

ChunkSize returns the configured chunk size

func (*Fs) CleanUp

func (f *Fs) CleanUp(ctx context.Context) error

CleanUp the trash in the Fs

func (*Fs) CleanUpCache

func (f *Fs) CleanUpCache(ignoreLastTs bool)

CleanUpCache will cleanup only the cache data that is expired

func (*Fs) Command

func (f *Fs) Command(ctx context.Context, name string, arg []string, opt map[string]string) (interface{}, error)

Command the backend to run a named command

The command run is name args may be used to read arguments from opts may be used to read optional arguments from

The result should be capable of being JSON encoded If it is a string or a []string it will be shown to the user otherwise it will be JSON encoded and shown to the user like that

func (*Fs) Copy

func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error)

Copy src to this remote using server-side copy operations.

func (*Fs) DirCacheFlush

func (f *Fs) DirCacheFlush()

DirCacheFlush flushes the dir cache

func (*Fs) DirMove

func (f *Fs) DirMove(ctx context.Context, src fs.Fs, srcRemote, dstRemote string) error

DirMove moves src, srcRemote to this remote at dstRemote using server-side move operations.

func (*Fs) Disconnect

func (f *Fs) Disconnect(ctx context.Context) error

Disconnect the current user

func (*Fs) Features

func (f *Fs) Features() *fs.Features

Features returns the optional features of this Fs

func (*Fs) GetBackgroundUploadChannel

func (f *Fs) GetBackgroundUploadChannel() chan BackgroundUploadState

GetBackgroundUploadChannel returns a channel that can be listened to for remote activities that happen in the background

func (*Fs) Hashes

func (f *Fs) Hashes() hash.Set

Hashes returns the supported hash sets.

func (*Fs) InfoAge

func (f *Fs) InfoAge() time.Duration

InfoAge returns the configured file age

func (*Fs) List

func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error)

List the objects and directories in dir into entries

func (*Fs) ListR

func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error)

ListR lists the objects and directories of the Fs starting from dir recursively into out.

func (*Fs) MergeDirs

func (f *Fs) MergeDirs(ctx context.Context, dirs []fs.Directory) error

MergeDirs merges the contents of all the directories passed in into the first one and rmdirs the other directories.

func (*Fs) Mkdir

func (f *Fs) Mkdir(ctx context.Context, dir string) error

Mkdir makes the directory (container, bucket)

func (*Fs) Move

func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object, error)

Move src to this remote using server-side move operations.

func (*Fs) Name

func (f *Fs) Name() string

Name of the remote (as passed into NewFs)

func (*Fs) NewObject

func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error)

NewObject finds the Object at remote.

func (*Fs) Purge

func (f *Fs) Purge(ctx context.Context, dir string) error

Purge all files in the directory

func (*Fs) Put

func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error)

Put in to the remote path with the modTime given of the given size

func (*Fs) PutStream

func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error)

PutStream uploads the object

func (*Fs) PutUnchecked

func (f *Fs) PutUnchecked(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error)

PutUnchecked uploads the object

func (*Fs) Rmdir

func (f *Fs) Rmdir(ctx context.Context, dir string) error

Rmdir removes the directory (container, bucket) if empty

func (*Fs) Root

func (f *Fs) Root() string

Root of the remote (as passed into NewFs)

func (*Fs) SetWrapper

func (f *Fs) SetWrapper(wrapper fs.Fs)

SetWrapper sets the Fs that is wrapping this Fs

func (*Fs) Shutdown

func (f *Fs) Shutdown(ctx context.Context) error

Shutdown the backend, closing any background tasks and any cached connections.

func (*Fs) Stats

func (f *Fs) Stats() (map[string]map[string]interface{}, error)

Stats returns stats about the cache storage

func (*Fs) StopBackgroundRunners

func (f *Fs) StopBackgroundRunners()

StopBackgroundRunners will signal all the runners to stop their work can be triggered from a terminate signal or from testing between runs

func (*Fs) String

func (f *Fs) String() string

String returns a description of the FS

func (*Fs) TempUploadWaitTime

func (f *Fs) TempUploadWaitTime() time.Duration

TempUploadWaitTime returns the configured temp file upload wait time

func (*Fs) UnWrap

func (f *Fs) UnWrap() fs.Fs

UnWrap returns the Fs that this Fs is wrapping

func (*Fs) UserInfo

func (f *Fs) UserInfo(ctx context.Context) (map[string]string, error)

UserInfo returns info about the connected user

func (*Fs) WrapFs

func (f *Fs) WrapFs() fs.Fs

WrapFs returns the Fs that is wrapping this Fs

type Handle

type Handle struct {
	UseMemory bool
	// contains filtered or unexported fields
}

Handle is managing the read/write/seek operations on an open handle

func NewObjectHandle

func NewObjectHandle(ctx context.Context, o *Object, cfs *Fs) *Handle

NewObjectHandle returns a new Handle for an existing Object

func (*Handle) Close

func (r *Handle) Close() error

Close will tell the workers to stop

func (*Handle) Read

func (r *Handle) Read(p []byte) (n int, err error)

Read a chunk from storage or len(p)

func (*Handle) Seek

func (r *Handle) Seek(offset int64, whence int) (int64, error)

Seek will move the current offset based on whence and instruct the workers to move there too

func (*Handle) String

func (r *Handle) String() string

String representation of this reader

type Memory

type Memory struct {
	// contains filtered or unexported fields
}

Memory is a wrapper of transient storage for a go-cache store

func NewMemory

func NewMemory(defaultExpiration time.Duration) *Memory

NewMemory builds this cache storage defaultExpiration will set the expiry time of chunks in this storage

func (*Memory) AddChunk

func (m *Memory) AddChunk(fp string, data []byte, offset int64) error

AddChunk adds a new chunk of a cached object

func (*Memory) AddChunkAhead

func (m *Memory) AddChunkAhead(fp string, data []byte, offset int64, t time.Duration) error

AddChunkAhead adds a new chunk of a cached object

func (*Memory) CleanChunksByAge

func (m *Memory) CleanChunksByAge(chunkAge time.Duration)

CleanChunksByAge will cleanup on a cron basis

func (*Memory) CleanChunksByNeed

func (m *Memory) CleanChunksByNeed(offset int64)

CleanChunksByNeed will cleanup chunks after the FS passes a specific chunk

func (*Memory) CleanChunksBySize

func (m *Memory) CleanChunksBySize(maxSize int64)

CleanChunksBySize will cleanup chunks after the total size passes a certain point

func (*Memory) Connect

func (m *Memory) Connect(defaultExpiration time.Duration) error

Connect will create a connection for the storage

func (*Memory) GetChunk

func (m *Memory) GetChunk(cachedObject *Object, offset int64) ([]byte, error)

GetChunk will retrieve a single chunk which belongs to a cached object or an error if it doesn't find it

func (*Memory) HasChunk

func (m *Memory) HasChunk(cachedObject *Object, offset int64) bool

HasChunk confirms the existence of a single chunk of an object

type NotificationContainer

type NotificationContainer struct {
	Type             string                         `json:"type"`
	Size             int                            `json:"size"`
	PlaySessionState []PlaySessionStateNotification `json:"PlaySessionStateNotification"`
}

NotificationContainer is part of the API response of Plex

type Object

type Object struct {
	fs.Object `json:"-"`

	ParentFs      fs.Fs     `json:"-"`        // parent fs
	CacheFs       *Fs       `json:"-"`        // cache fs
	Name          string    `json:"name"`     // name of the directory
	Dir           string    `json:"dir"`      // abs path of the object
	CacheModTime  int64     `json:"modTime"`  // modification or creation time - IsZero for unknown
	CacheSize     int64     `json:"size"`     // size of directory and contents or -1 if unknown
	CacheStorable bool      `json:"storable"` // says whether this object can be stored
	CacheType     string    `json:"cacheType"`
	CacheTs       time.Time `json:"cacheTs"`

	CacheHashes map[hash.Type]string // all supported hashes cached
	// contains filtered or unexported fields
}

Object is a generic file like object that stores basic information about it

func NewObject

func NewObject(f *Fs, remote string) *Object

NewObject builds one from a generic fs.Object

func ObjectFromOriginal

func ObjectFromOriginal(ctx context.Context, f *Fs, o fs.Object) *Object

ObjectFromOriginal builds one from a generic fs.Object

func (*Object) Fs

func (o *Object) Fs() fs.Info

Fs returns its FS info

func (*Object) Hash

func (o *Object) Hash(ctx context.Context, ht hash.Type) (string, error)

Hash requests a hash of the object and stores in the cache since it might or might not be called, this is lazy loaded

func (*Object) ModTime

func (o *Object) ModTime(ctx context.Context) time.Time

ModTime returns the cached ModTime

func (*Object) Open

func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (io.ReadCloser, error)

Open is used to request a specific part of the file using fs.RangeOption

func (*Object) Remote

func (o *Object) Remote() string

Remote returns the remote path

func (*Object) Remove

func (o *Object) Remove(ctx context.Context) error

Remove deletes the object from both the cache and the source

func (*Object) SetModTime

func (o *Object) SetModTime(ctx context.Context, t time.Time) error

SetModTime sets the ModTime of this object

func (*Object) Size

func (o *Object) Size() int64

Size returns the cached Size

func (*Object) Storable

func (o *Object) Storable() bool

Storable returns the cached Storable

func (*Object) String

func (o *Object) String() string

String returns a human friendly name for this object

func (*Object) UnWrap

func (o *Object) UnWrap() fs.Object

UnWrap returns the Object that this Object is wrapping or nil if it isn't wrapping anything

func (*Object) Update

func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error

Update will change the object data

type Options

type Options struct {
	Remote             string        `config:"remote"`
	PlexURL            string        `config:"plex_url"`
	PlexUsername       string        `config:"plex_username"`
	PlexPassword       string        `config:"plex_password"`
	PlexToken          string        `config:"plex_token"`
	PlexInsecure       bool          `config:"plex_insecure"`
	ChunkSize          fs.SizeSuffix `config:"chunk_size"`
	InfoAge            fs.Duration   `config:"info_age"`
	ChunkTotalSize     fs.SizeSuffix `config:"chunk_total_size"`
	DbPath             string        `config:"db_path"`
	ChunkPath          string        `config:"chunk_path"`
	DbPurge            bool          `config:"db_purge"`
	ChunkCleanInterval fs.Duration   `config:"chunk_clean_interval"`
	ReadRetries        int           `config:"read_retries"`
	TotalWorkers       int           `config:"workers"`
	ChunkNoMemory      bool          `config:"chunk_no_memory"`
	Rps                int           `config:"rps"`
	StoreWrites        bool          `config:"writes"`
	TempWritePath      string        `config:"tmp_upload_path"`
	TempWaitTime       fs.Duration   `config:"tmp_wait_time"`
	DbWaitTime         fs.Duration   `config:"db_wait_time"`
}

Options defines the configuration for this backend

type Persistent

type Persistent struct {
	// contains filtered or unexported fields
}

Persistent is a wrapper of persistent storage for a bolt.DB file

func GetPersistent

func GetPersistent(dbPath, chunkPath string, f *Features) (*Persistent, error)

GetPersistent returns a single instance for the specific store

func (*Persistent) AddBatchDir

func (b *Persistent) AddBatchDir(cachedDirs []*Directory) error

AddBatchDir will update a list of CachedDirectory metadata and all their entries

func (*Persistent) AddChunk

func (b *Persistent) AddChunk(fp string, data []byte, offset int64) error

AddChunk adds a new chunk of a cached object

func (*Persistent) AddDir

func (b *Persistent) AddDir(cachedDir *Directory) error

AddDir will update a CachedDirectory metadata and all its entries

func (*Persistent) AddObject

func (b *Persistent) AddObject(cachedObject *Object) error

AddObject will create a cached object in its parent directory

func (*Persistent) CleanChunksByAge

func (b *Persistent) CleanChunksByAge(chunkAge time.Duration)

CleanChunksByAge will cleanup on a cron basis

func (*Persistent) CleanChunksByNeed

func (b *Persistent) CleanChunksByNeed(offset int64)

CleanChunksByNeed is a noop for this implementation

func (*Persistent) CleanChunksBySize

func (b *Persistent) CleanChunksBySize(maxSize int64)

CleanChunksBySize will cleanup chunks after the total size passes a certain point

func (*Persistent) Close

func (b *Persistent) Close()

Close should be called when the program ends gracefully

func (*Persistent) ExpireDir

func (b *Persistent) ExpireDir(cd *Directory) error

ExpireDir will flush a CachedDirectory and all its objects from the objects chunks will remain as they are

func (*Persistent) ExpireObject

func (b *Persistent) ExpireObject(co *Object, withData bool) error

ExpireObject will flush an Object and all its data if desired

func (*Persistent) GetChunk

func (b *Persistent) GetChunk(cachedObject *Object, offset int64) ([]byte, error)

GetChunk will retrieve a single chunk which belongs to a cached object or an error if it doesn't find it

func (*Persistent) GetChunkTs

func (b *Persistent) GetChunkTs(path string, offset int64) (time.Time, error)

GetChunkTs retrieves the current timestamp of this chunk

func (*Persistent) GetDir

func (b *Persistent) GetDir(remote string) (*Directory, error)

GetDir will retrieve data of a cached directory

func (*Persistent) GetDirEntries

func (b *Persistent) GetDirEntries(cachedDir *Directory) (fs.DirEntries, error)

GetDirEntries will return a CachedDirectory, its list of dir entries and/or an error if it encountered issues

func (*Persistent) GetObject

func (b *Persistent) GetObject(cachedObject *Object) (err error)

GetObject will return a CachedObject from its parent directory or an error if it doesn't find it

func (*Persistent) HasChunk

func (b *Persistent) HasChunk(cachedObject *Object, offset int64) bool

HasChunk confirms the existence of a single chunk of an object

func (*Persistent) HasEntry

func (b *Persistent) HasEntry(remote string) bool

HasEntry confirms the existence of a single entry (dir or object)

func (*Persistent) Purge

func (b *Persistent) Purge()

Purge will flush the entire cache

func (*Persistent) ReconcileTempUploads

func (b *Persistent) ReconcileTempUploads(ctx context.Context, cacheFs *Fs) error

ReconcileTempUploads will recursively look for all the files in the temp directory and add them to the queue

func (*Persistent) RemoveDir

func (b *Persistent) RemoveDir(fp string) error

RemoveDir will delete a CachedDirectory, all its objects and all the chunks stored for it

func (*Persistent) RemoveObject

func (b *Persistent) RemoveObject(fp string) error

RemoveObject will delete a single cached object and all the chunks which belong to it

func (*Persistent) SearchPendingUpload

func (b *Persistent) SearchPendingUpload(remote string) (started bool, err error)

SearchPendingUpload returns the file info from the pending queue of uploads

func (*Persistent) Stats

func (b *Persistent) Stats() (map[string]map[string]interface{}, error)

Stats returns a go map with the stats key values

func (*Persistent) String

func (b *Persistent) String() string

String will return a human friendly string for this DB (currently the dbPath)

type PlaySessionStateNotification

type PlaySessionStateNotification struct {
	SessionKey       string `json:"sessionKey"`
	GUID             string `json:"guid"`
	Key              string `json:"key"`
	ViewOffset       int64  `json:"viewOffset"`
	State            string `json:"state"`
	TranscodeSession string `json:"transcodeSession"`
}

PlaySessionStateNotification is part of the API response of Plex

type PlexNotification

type PlexNotification struct {
	Container NotificationContainer `json:"NotificationContainer"`
}

PlexNotification is part of the API response of Plex

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL