blobcache

package
v0.0.0-...-80d61da Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 4, 2025 License: MIT Imports: 56 Imported by: 2

Documentation

Index

Constants

View Source
const (
	SourceModeJuiceFS    string = "juicefs"
	SourceModeMountPoint string = "mountpoint"
)
View Source
const (
	BlobCacheHostPrefix   string = "blobcache-host"
	BlobCacheClientPrefix string = "blobcache-client"
	BlobCacheVersion      string = "dev"
)

Variables

View Source
var (
	ErrHostNotFound            = errors.New("host not found")
	ErrUnableToReachHost       = errors.New("unable to reach host")
	ErrInvalidHostVersion      = errors.New("invalid host version")
	ErrContentNotFound         = errors.New("content not found")
	ErrClientNotFound          = errors.New("client not found")
	ErrCacheLockHeld           = errors.New("cache lock held")
	ErrUnableToPopulateContent = errors.New("unable to populate content from original source")
	ErrBlobFsMountFailure      = errors.New("failed to mount blobfs")
	ErrTailscaleAuthentication = errors.New("tailscale authentication failed")
)
View Source
var (
	ErrChannelClosed    = errors.New("redis: channel closed")
	ErrConnectionIssue  = errors.New("redis: connection issue")
	ErrUnknownRedisMode = errors.New("redis: unknown mode")
)
View Source
var (
	Logger *logger
)
View Source
var MetadataKeys = &metadataKeys{}

Functions

func AuthInterceptor

func AuthInterceptor(token string) grpc.UnaryClientInterceptor

func CopyStruct

func CopyStruct(src, dst interface{})

Attempts to copy field values of the same name from the src to the dst struct.

func DialWithTimeout

func DialWithTimeout(ctx context.Context, addr string) (net.Conn, error)

func GenerateFsID

func GenerateFsID(name string) string

Generates a directory ID based on parent ID and name.

func GetConfigParser

func GetConfigParser(format ConfigFormat) (koanf.Parser, error)

func GetLogger

func GetLogger() *logger

func GetPrivateIpAddr

func GetPrivateIpAddr() (string, error)

func InitLogger

func InitLogger(debugMode bool)

func Mount

func Mount(ctx context.Context, opts BlobFsSystemOpts) (func() error, <-chan error, *fuse.Server, error)

func SHA1StringToUint64

func SHA1StringToUint64(hash string) (uint64, error)

SHA1StringToUint64 converts the first 8 bytes of a SHA-1 hash string to a uint64

func ToSlice

func ToSlice(v interface{}) []interface{}

Flattens a struct using its field tags so it can be used by HSet. Struct fields must have the redis tag on them otherwise they will be ignored.

func ToStruct

func ToStruct(m map[string]string, out interface{}) error

Copies the result of HGetAll to a provided struct. If a field cannot be parsed, we use Go's default value. Struct fields must have the redis tag on them otherwise they will be ignored.

func WithClientName

func WithClientName(name string) func(*redis.UniversalOptions)

Types

type BlobCacheClient

type BlobCacheClient struct {
	// contains filtered or unexported fields
}

func NewBlobCacheClient

func NewBlobCacheClient(ctx context.Context, cfg BlobCacheConfig) (*BlobCacheClient, error)

func (*BlobCacheClient) Cleanup

func (c *BlobCacheClient) Cleanup() error

func (*BlobCacheClient) GetContent

func (c *BlobCacheClient) GetContent(hash string, offset int64, length int64) ([]byte, error)

func (*BlobCacheClient) GetContentStream

func (c *BlobCacheClient) GetContentStream(hash string, offset int64, length int64) (chan []byte, error)

func (*BlobCacheClient) GetState

func (c *BlobCacheClient) GetState() error

func (*BlobCacheClient) HostsAvailable

func (c *BlobCacheClient) HostsAvailable() bool

func (*BlobCacheClient) StoreContent

func (c *BlobCacheClient) StoreContent(chunks chan []byte) (string, error)

func (*BlobCacheClient) StoreContentFromSource

func (c *BlobCacheClient) StoreContentFromSource(sourcePath string, sourceOffset int64) (string, error)

func (*BlobCacheClient) WaitForHosts

func (c *BlobCacheClient) WaitForHosts(timeout time.Duration) error

type BlobCacheConfig

type BlobCacheConfig struct {
	Token                           string          `key:"token" json:"token"`
	DebugMode                       bool            `key:"debugMode" json:"debug_mode"`
	TLSEnabled                      bool            `key:"tlsEnabled" json:"tls_enabled"`
	Port                            uint            `key:"port" json:"port"`
	HostStorageCapacityThresholdPct float64         `key:"hostStorageCapacityThresholdPct" json:"host_storage_capacity_threshold_pct"`
	ObjectTtlS                      int             `key:"objectTtlS" json:"object_ttl_s"`
	RoundTripThresholdMilliseconds  uint            `key:"rttThresholdMilliseconds" json:"rtt_threshold_ms"`
	MaxCachePct                     int64           `key:"maxCachePct" json:"max_cache_pct"`
	PageSizeBytes                   int64           `key:"pageSizeBytes" json:"page_size_bytes"`
	GRPCDialTimeoutS                int             `key:"grpcDialTimeoutS" json:"grpc_dial_timeout_s"`
	GRPCMessageSizeBytes            int             `key:"grpcMessageSizeBytes" json:"grpc_message_size_bytes"`
	Tailscale                       TailscaleConfig `key:"tailscale" json:"tailscale"`
	Metadata                        MetadataConfig  `key:"metadata" json:"metadata"`
	DiscoveryIntervalS              int             `key:"discoveryIntervalS" json:"discovery_interval_s"`
	DiscoveryMode                   string          `key:"discoveryMode" json:"discovery_mode"`
	BlobFs                          BlobFsConfig    `key:"blobfs" json:"blobfs"`
}

type BlobCacheEntry

type BlobCacheEntry struct {
	Hash         string `redis:"hash" json:"hash"`
	Size         int64  `redis:"size" json:"size"`
	SourcePath   string `redis:"source_path" json:"source_path"`
	SourceOffset int64  `redis:"source_offset" json:"source_offset"`
}

type BlobCacheHost

type BlobCacheHost struct {
	RTT              time.Duration `redis:"rtt" json:"rtt"`
	Addr             string        `redis:"addr" json:"addr"`
	PrivateAddr      string        `redis:"private_addr" json:"private_addr"`
	CapacityUsagePct float64       `redis:"capacity_usage_pct" json:"capacity_usage_pct"`
}

type BlobCacheMetadata

type BlobCacheMetadata struct {
	// contains filtered or unexported fields
}

func NewBlobCacheMetadata

func NewBlobCacheMetadata(cfg MetadataConfig) (*BlobCacheMetadata, error)

func (*BlobCacheMetadata) AddEntry

func (m *BlobCacheMetadata) AddEntry(ctx context.Context, entry *BlobCacheEntry, host *BlobCacheHost) error

func (*BlobCacheMetadata) AddFsNodeChild

func (m *BlobCacheMetadata) AddFsNodeChild(ctx context.Context, pid, id string) error

func (*BlobCacheMetadata) AddHostToIndex

func (m *BlobCacheMetadata) AddHostToIndex(ctx context.Context, host *BlobCacheHost) error

func (*BlobCacheMetadata) GetAvailableHosts

func (m *BlobCacheMetadata) GetAvailableHosts(ctx context.Context, removeHostCallback func(host *BlobCacheHost)) ([]*BlobCacheHost, error)

func (*BlobCacheMetadata) GetEntryLocations

func (m *BlobCacheMetadata) GetEntryLocations(ctx context.Context, hash string) (mapset.Set[string], error)

func (*BlobCacheMetadata) GetFsNode

func (m *BlobCacheMetadata) GetFsNode(ctx context.Context, id string) (*BlobFsMetadata, error)

func (*BlobCacheMetadata) GetFsNodeChildren

func (m *BlobCacheMetadata) GetFsNodeChildren(ctx context.Context, id string) ([]*BlobFsMetadata, error)

func (*BlobCacheMetadata) GetHostIndex

func (m *BlobCacheMetadata) GetHostIndex(ctx context.Context) ([]*BlobCacheHost, error)

func (*BlobCacheMetadata) RemoveClientLock

func (m *BlobCacheMetadata) RemoveClientLock(ctx context.Context, clientId, hash string) error

func (*BlobCacheMetadata) RemoveEntryLocation

func (m *BlobCacheMetadata) RemoveEntryLocation(ctx context.Context, hash string, host *BlobCacheHost) error

func (*BlobCacheMetadata) RemoveFsNodeChild

func (m *BlobCacheMetadata) RemoveFsNodeChild(ctx context.Context, id string) error

func (*BlobCacheMetadata) RemoveHostFromIndex

func (m *BlobCacheMetadata) RemoveHostFromIndex(ctx context.Context, host *BlobCacheHost) error

func (*BlobCacheMetadata) RetrieveEntry

func (m *BlobCacheMetadata) RetrieveEntry(ctx context.Context, hash string) (*BlobCacheEntry, error)

func (*BlobCacheMetadata) SetClientLock

func (m *BlobCacheMetadata) SetClientLock(ctx context.Context, clientId, hash string) error

func (*BlobCacheMetadata) SetFsNode

func (m *BlobCacheMetadata) SetFsNode(ctx context.Context, id string, metadata *BlobFsMetadata) error

func (*BlobCacheMetadata) SetHostKeepAlive

func (m *BlobCacheMetadata) SetHostKeepAlive(ctx context.Context, host *BlobCacheHost) error

func (*BlobCacheMetadata) StoreContentInBlobFs

func (m *BlobCacheMetadata) StoreContentInBlobFs(ctx context.Context, path string, hash string, size uint64) error

type BlobFs

type BlobFs struct {
	Metadata        *BlobCacheMetadata
	Client          *BlobCacheClient
	Config          BlobCacheConfig
	PrefetchManager *PrefetchManager
	// contains filtered or unexported fields
}

func NewFileSystem

func NewFileSystem(ctx context.Context, opts BlobFsSystemOpts) (*BlobFs, error)

NewFileSystem initializes a new BlobFs with root metadata.

func (*BlobFs) Root

func (bfs *BlobFs) Root() (fs.InodeEmbedder, error)

type BlobFsConfig

type BlobFsConfig struct {
	Enabled            bool                 `key:"enabled" json:"enabled"`
	Prefetch           BlobFsPrefetchConfig `key:"prefetch" json:"prefetch"`
	MountPoint         string               `key:"mountPoint" json:"mount_point"`
	Sources            []SourceConfig       `key:"sources" json:"sources"`
	MaxBackgroundTasks int                  `key:"maxBackgroundTasks" json:"max_background_tasks"`
	MaxWriteKB         int                  `key:"maxWriteKB" json:"max_write_kb"`
	MaxReadAheadKB     int                  `key:"maxReadAheadKB" json:"max_read_ahead_kb"`
	DirectMount        bool                 `key:"directMount" json:"direct_mount"`
	DirectIO           bool                 `key:"directIO" json:"direct_io"`
	Options            []string             `key:"options" json:"options"`
}

type BlobFsMetadata

type BlobFsMetadata struct {
	PID       string `redis:"pid" json:"pid"`
	ID        string `redis:"id" json:"id"`
	Name      string `redis:"name" json:"name"`
	Path      string `redis:"path" json:"path"`
	Hash      string `redis:"hash" json:"hash"`
	Ino       uint64 `redis:"ino" json:"ino"`
	Size      uint64 `redis:"size" json:"size"`
	Blocks    uint64 `redis:"blocks" json:"blocks"`
	Atime     uint64 `redis:"atime" json:"atime"`
	Mtime     uint64 `redis:"mtime" json:"mtime"`
	Ctime     uint64 `redis:"ctime" json:"ctime"`
	Atimensec uint32 `redis:"atimensec" json:"atimensec"`
	Mtimensec uint32 `redis:"mtimensec" json:"mtimensec"`
	Ctimensec uint32 `redis:"ctimensec" json:"ctimensec"`
	Mode      uint32 `redis:"mode" json:"mode"`
	Nlink     uint32 `redis:"nlink" json:"nlink"`
	Rdev      uint32 `redis:"rdev" json:"rdev"`
	Blksize   uint32 `redis:"blksize" json:"blksize"`
	Padding   uint32 `redis:"padding" json:"padding"`
	Uid       uint32 `redis:"uid" json:"uid"`
	Gid       uint32 `redis:"gid" json:"gid"`
	Gen       uint64 `redis:"gen" json:"gen"`
}

type BlobFsNode

type BlobFsNode struct {
	Path     string
	ID       string
	PID      string
	Name     string
	Target   string
	Hash     string
	Attr     fuse.Attr
	Prefetch *bool
}

type BlobFsPrefetchConfig

type BlobFsPrefetchConfig struct {
	Enabled           bool     `key:"enabled" json:"enabled"`
	MinFileSizeBytes  uint64   `key:"minFileSizeBytes" json:"min_file_size_bytes"`
	EvictionIntervalS int      `key:"evictionIntervalS" json:"eviction_interval_s"`
	IdleTtlS          int      `key:"idleTtlS" json:"idle_ttl_s"`
	WindowSizeBytes   uint64   `key:"windowSizeBytes" json:"window_size_bytes"`
	IgnoreFileExt     []string `key:"ignoreFileExt" json:"ignore_file_ext"`
	DataTimeoutS      int      `key:"dataTimeoutS" json:"data_timeout_s"`
}

type BlobFsSystemOpts

type BlobFsSystemOpts struct {
	Verbose  bool
	Metadata *BlobCacheMetadata
	Config   BlobCacheConfig
	Client   *BlobCacheClient
}

type CacheService

type CacheService struct {
	proto.UnimplementedBlobCacheServer
	// contains filtered or unexported fields
}

func NewCacheService

func NewCacheService(ctx context.Context, cfg BlobCacheConfig) (*CacheService, error)

func (*CacheService) GetContent

func (*CacheService) GetContentStream

func (*CacheService) GetState

func (*CacheService) HostKeepAlive

func (cs *CacheService) HostKeepAlive()

func (*CacheService) StartServer

func (cs *CacheService) StartServer(port uint) error

func (*CacheService) StoreContent

func (cs *CacheService) StoreContent(stream proto.BlobCache_StoreContentServer) error

type CacheServiceOpts

type CacheServiceOpts struct {
	Addr string
}

type ClientRequest

type ClientRequest struct {
	// contains filtered or unexported fields
}

type ClientRequestType

type ClientRequestType int
const (
	ClientRequestTypeStorage ClientRequestType = iota
	ClientRequestTypeRetrieval
)

type ConfigFormat

type ConfigFormat string
var (
	JSONConfigFormat ConfigFormat = ".json"
	YAMLConfigFormat ConfigFormat = ".yaml"
	YMLConfigFormat  ConfigFormat = ".yml"
)

type ConfigLoaderFunc

type ConfigLoaderFunc func(k *koanf.Koanf) error

ConfigLoaderFunc is a function type used to load configuration into a Koanf instance. It takes a Koanf pointer 'k' as a parameter and returns an error if the loading process encounters any issues.

type ConfigManager

type ConfigManager[T any] struct {
	// contains filtered or unexported fields
}

ConfigManager is a generic configuration manager that allows handling and manipulation of configuration data for various types. It includes a Koanf instance ('kf') for managing configuration settings.

func NewConfigManager

func NewConfigManager[T any]() (*ConfigManager[T], error)

NewConfigManager creates a new instance of the ConfigManager[T] type for managing configuration of type 'T'. It initializes the ConfigManager with the specified 'T' type, loads a default configuration, and optionally loads a user configuration if the 'CONFIG_PATH' environment variable is provided. If debug mode is enabled, it prints the current configuration.

func (*ConfigManager[T]) GetConfig

func (cm *ConfigManager[T]) GetConfig() T

GetConfig retrieves the current configuration of type 'T' from the ConfigManager. It unmarshals the configuration data and returns it. If any errors occur during unmarshaling, it logs a fatal error and exits the application.

func (*ConfigManager[T]) LoadConfig

func (cm *ConfigManager[T]) LoadConfig(format ConfigFormat, provider koanf.Provider) error

LoadConfig loads configuration data from a given provider in the specified format into the ConfigManager. It obtains a parser for the format, and then loads the configuration data. If any errors occur during the loading process, they are returned as an error.

func (*ConfigManager[T]) Print

func (cm *ConfigManager[T]) Print() string

Print returns a string representation of the current configuration state.

type ContentAddressableStorage

type ContentAddressableStorage struct {
	// contains filtered or unexported fields
}

func NewContentAddressableStorage

func NewContentAddressableStorage(ctx context.Context, currentHost *BlobCacheHost, metadata *BlobCacheMetadata, config BlobCacheConfig) (*ContentAddressableStorage, error)

func (*ContentAddressableStorage) Add

func (cas *ContentAddressableStorage) Add(ctx context.Context, hash string, content []byte, sourcePath string, sourceOffset int64) error

func (*ContentAddressableStorage) Cleanup

func (cas *ContentAddressableStorage) Cleanup()

func (*ContentAddressableStorage) Get

func (cas *ContentAddressableStorage) Get(hash string, offset, length int64, dst []byte) (int64, error)

type DiscoveryClient

type DiscoveryClient struct {
	// contains filtered or unexported fields
}

func NewDiscoveryClient

func NewDiscoveryClient(cfg BlobCacheConfig, tailscale *Tailscale, hostMap *HostMap, metadata *BlobCacheMetadata) *DiscoveryClient

func (*DiscoveryClient) FindNearbyHosts

func (d *DiscoveryClient) FindNearbyHosts(ctx context.Context) ([]*BlobCacheHost, error)

func (*DiscoveryClient) GetHostState

func (d *DiscoveryClient) GetHostState(ctx context.Context, addr string) (*BlobCacheHost, error)

checkService attempts to connect to the gRPC service and verifies its availability

func (*DiscoveryClient) GetHostStateViaMetadata

func (d *DiscoveryClient) GetHostStateViaMetadata(ctx context.Context, addr, privateAddr string) (*BlobCacheHost, error)

func (*DiscoveryClient) StartInBackground

func (d *DiscoveryClient) StartInBackground(ctx context.Context) error

Used by blobcache servers to discover their closest peers

type DiscoveryMode

type DiscoveryMode string
const (
	DiscoveryModeTailscale DiscoveryMode = "tailscale"
	DiscoveryModeMetadata  DiscoveryMode = "metadata"
)

type ErrEntryNotFound

type ErrEntryNotFound struct {
	Hash string
}

func (*ErrEntryNotFound) Error

func (e *ErrEntryNotFound) Error() string

type ErrNodeNotFound

type ErrNodeNotFound struct {
	Id string
}

func (*ErrNodeNotFound) Error

func (e *ErrNodeNotFound) Error() string

type FSNode

type FSNode struct {
	fs.Inode
	// contains filtered or unexported fields
}

func (*FSNode) Create

func (n *FSNode) Create(ctx context.Context, name string, flags uint32, mode uint32, out *fuse.EntryOut) (inode *fs.Inode, fh fs.FileHandle, fuseFlags uint32, errno syscall.Errno)

func (*FSNode) Getattr

func (n *FSNode) Getattr(ctx context.Context, fh fs.FileHandle, out *fuse.AttrOut) syscall.Errno

func (*FSNode) Lookup

func (n *FSNode) Lookup(ctx context.Context, name string, out *fuse.EntryOut) (*fs.Inode, syscall.Errno)

func (*FSNode) Mkdir

func (n *FSNode) Mkdir(ctx context.Context, name string, mode uint32, out *fuse.EntryOut) (*fs.Inode, syscall.Errno)

func (*FSNode) OnAdd

func (n *FSNode) OnAdd(ctx context.Context)

func (*FSNode) Open

func (n *FSNode) Open(ctx context.Context, flags uint32) (fh fs.FileHandle, fuseFlags uint32, errno syscall.Errno)

func (*FSNode) Opendir

func (n *FSNode) Opendir(ctx context.Context) syscall.Errno

func (*FSNode) Read

func (n *FSNode) Read(ctx context.Context, f fs.FileHandle, dest []byte, off int64) (fuse.ReadResult, syscall.Errno)

func (*FSNode) Readdir

func (n *FSNode) Readdir(ctx context.Context) (fs.DirStream, syscall.Errno)
func (n *FSNode) Readlink(ctx context.Context) ([]byte, syscall.Errno)

func (*FSNode) Rename

func (n *FSNode) Rename(ctx context.Context, oldName string, newParent fs.InodeEmbedder, newName string, flags uint32) syscall.Errno

func (*FSNode) Rmdir

func (n *FSNode) Rmdir(ctx context.Context, name string) syscall.Errno
func (n *FSNode) Unlink(ctx context.Context, name string) syscall.Errno

type FileSystem

type FileSystem interface {
	Mount(opts FileSystemOpts) (func() error, <-chan error, error)
	Unmount() error
	Format() error
}

type FileSystemOpts

type FileSystemOpts struct {
	MountPoint string
	Verbose    bool
	Metadata   *BlobCacheMetadata
}

BlobFS types

type FileSystemStorage

type FileSystemStorage interface {
	Metadata()
	Get(string)
	ReadFile(interface{}, []byte, int64)
}

type HostMap

type HostMap struct {
	// contains filtered or unexported fields
}

func NewHostMap

func NewHostMap(cfg BlobCacheConfig, onHostAdded func(*BlobCacheHost) error) *HostMap

func (*HostMap) Closest

func (hm *HostMap) Closest(timeout time.Duration) (*BlobCacheHost, error)

Closest finds the nearest host within a given timeout If no hosts are found, it will error out

func (*HostMap) ClosestWithCapacity

func (hm *HostMap) ClosestWithCapacity(timeout time.Duration) (*BlobCacheHost, error)

ClosestWithCapacity finds the nearest host with available storage capacity within a given timeout If no hosts are found, it will error out

func (*HostMap) Get

func (hm *HostMap) Get(addr string) *BlobCacheHost

func (*HostMap) Members

func (hm *HostMap) Members() mapset.Set[string]

func (*HostMap) Remove

func (hm *HostMap) Remove(host *BlobCacheHost)

func (*HostMap) Set

func (hm *HostMap) Set(host *BlobCacheHost)

type JuiceFSConfig

type JuiceFSConfig struct {
	RedisURI   string `key:"redisURI" json:"redis_uri"`
	Bucket     string `key:"bucket" json:"bucket"`
	AccessKey  string `key:"accessKey" json:"access_key"`
	SecretKey  string `key:"secretKey" json:"secret_key"`
	CacheSize  int64  `key:"cacheSize" json:"cache_size"`
	BlockSize  int64  `key:"blockSize" json:"block_size"`
	Prefetch   int64  `key:"prefetch" json:"prefetch"`
	BufferSize int64  `key:"bufferSize" json:"buffer_size"`
}

type JuiceFsSource

type JuiceFsSource struct {
	// contains filtered or unexported fields
}

func (*JuiceFsSource) Format

func (s *JuiceFsSource) Format(fsName string) error

func (*JuiceFsSource) Mount

func (s *JuiceFsSource) Mount(localPath string) error

func (*JuiceFsSource) Unmount

func (s *JuiceFsSource) Unmount(localPath string) error

type MetadataConfig

type MetadataConfig struct {
	RedisAddr       string `key:"redisAddr" json:"redis_addr"`
	RedisPasswd     string `key:"redisPasswd" json:"redis_passwd"`
	RedisTLSEnabled bool   `key:"redisTLSEnabled" json:"redis_tls_enabled"`
}

type MountPointConfig

type MountPointConfig struct {
	BucketName  string `key:"bucketName" json:"bucket_name"`
	AccessKey   string `key:"accessKey" json:"access_key"`
	SecretKey   string `key:"secretKey" json:"secret_key"`
	Region      string `key:"region" json:"region"`
	EndpointURL string `key:"endpointUrl" json:"endpoint_url"`
}

type MountPointSource

type MountPointSource struct {
	// contains filtered or unexported fields
}

func (*MountPointSource) Format

func (s *MountPointSource) Format(fsName string) error

func (*MountPointSource) Mount

func (s *MountPointSource) Mount(localPath string) error

func (*MountPointSource) Unmount

func (s *MountPointSource) Unmount(localPath string) error

type ParserFunc

type ParserFunc func() (koanf.Parser, error)

type PrefetchBuffer

type PrefetchBuffer struct {
	// contains filtered or unexported fields
}

func NewPrefetchBuffer

func NewPrefetchBuffer(opts PrefetchOpts) *PrefetchBuffer

func (*PrefetchBuffer) Clear

func (pb *PrefetchBuffer) Clear()

func (*PrefetchBuffer) GetRange

func (pb *PrefetchBuffer) GetRange(offset uint64, dst []byte) error

func (*PrefetchBuffer) IsIdle

func (pb *PrefetchBuffer) IsIdle() bool

type PrefetchManager

type PrefetchManager struct {
	// contains filtered or unexported fields
}

func NewPrefetchManager

func NewPrefetchManager(ctx context.Context, config BlobCacheConfig, client *BlobCacheClient) *PrefetchManager

func (*PrefetchManager) GetPrefetchBuffer

func (pm *PrefetchManager) GetPrefetchBuffer(hash string, fileSize uint64) *PrefetchBuffer

GetPrefetchBuffer returns an existing prefetch buffer if it exists, or nil

func (*PrefetchManager) Start

func (pm *PrefetchManager) Start()

type PrefetchOpts

type PrefetchOpts struct {
	Ctx         context.Context
	CancelFunc  context.CancelFunc
	Hash        string
	FileSize    uint64
	WindowSize  uint64
	Offset      uint64
	Client      *BlobCacheClient
	DataTimeout time.Duration
	Manager     *PrefetchManager
}

type RedisClient

type RedisClient struct {
	redis.UniversalClient
}

func NewRedisClient

func NewRedisClient(config RedisConfig, options ...func(*redis.UniversalOptions)) (*RedisClient, error)

func (*RedisClient) Keys

func (r *RedisClient) Keys(ctx context.Context, pattern string) ([]string, error)

Gets all keys using a pattern Actually runs a scan since keys locks up the database.

func (*RedisClient) LRange

func (r *RedisClient) LRange(ctx context.Context, key string, start, stop int64) ([]string, error)

func (*RedisClient) PSubscribe

func (r *RedisClient) PSubscribe(ctx context.Context, channels ...string) (<-chan *redis.Message, <-chan error, func())

func (*RedisClient) Publish

func (r *RedisClient) Publish(ctx context.Context, channel string, message interface{}) *redis.IntCmd

func (*RedisClient) Scan

func (r *RedisClient) Scan(ctx context.Context, pattern string) ([]string, error)

func (*RedisClient) Subscribe

func (r *RedisClient) Subscribe(ctx context.Context, channels ...string) (<-chan *redis.Message, <-chan error)

func (*RedisClient) ToSlice

func (r *RedisClient) ToSlice(v interface{}) []interface{}

func (*RedisClient) ToStruct

func (r *RedisClient) ToStruct(m map[string]string, out interface{}) error

type RedisConfig

type RedisConfig struct {
	Addrs              []string      `key:"addrs" json:"addrs"`
	Mode               RedisMode     `key:"mode" json:"mode"`
	ClientName         string        `key:"clientName" json:"client_name"`
	EnableTLS          bool          `key:"enableTLS" json:"enable_tls"`
	InsecureSkipVerify bool          `key:"insecureSkipVerify" json:"insecure_skip_verify"`
	MinIdleConns       int           `key:"minIdleConns" json:"min_idle_conns"`
	MaxIdleConns       int           `key:"maxIdleConns" json:"max_idle_conns"`
	ConnMaxIdleTime    time.Duration `key:"connMaxIdleTime" json:"conn_max_idle_time"`
	ConnMaxLifetime    time.Duration `key:"connMaxLifetime" json:"conn_max_lifetime"`
	DialTimeout        time.Duration `key:"dialTimeout" json:"dial_timeout"`
	ReadTimeout        time.Duration `key:"readTimeout" json:"read_timeout"`
	WriteTimeout       time.Duration `key:"writeTimeout" json:"write_timeout"`
	MaxRedirects       int           `key:"maxRedirects" json:"max_redirects"`
	MaxRetries         int           `key:"maxRetries" json:"max_retries"`
	PoolSize           int           `key:"poolSize" json:"pool_size"`
	Username           string        `key:"username" json:"username"`
	Password           string        `key:"password" json:"password"`
	RouteByLatency     bool          `key:"routeByLatency" json:"route_by_latency"`
}

type RedisLock

type RedisLock struct {
	// contains filtered or unexported fields
}

func NewRedisLock

func NewRedisLock(client *RedisClient, opts ...RedisLockOption) *RedisLock

func (*RedisLock) Acquire

func (l *RedisLock) Acquire(ctx context.Context, key string, opts RedisLockOptions) error

func (*RedisLock) Release

func (l *RedisLock) Release(key string) error

type RedisLockOption

type RedisLockOption func(*RedisLock)

type RedisLockOptions

type RedisLockOptions struct {
	TtlS    int
	Retries int
}

type RedisMode

type RedisMode string
var (
	RedisModeSingle  RedisMode = "single"
	RedisModeCluster RedisMode = "cluster"
)

type Source

type Source interface {
	Mount(localPath string) error
	Format(fsName string) error
	Unmount(localPath string) error
}

func NewJuiceFsSource

func NewJuiceFsSource(config JuiceFSConfig) (Source, error)

func NewMountPointSource

func NewMountPointSource(config MountPointConfig) (Source, error)

func NewSource

func NewSource(config SourceConfig) (Source, error)

type SourceConfig

type SourceConfig struct {
	Mode           string           `key:"mode" json:"mode"`
	FilesystemName string           `key:"fsName" json:"filesystem_name"`
	FilesystemPath string           `key:"fsPath" json:"filesystem_path"`
	JuiceFS        JuiceFSConfig    `key:"juicefs" json:"juicefs"`
	MountPoint     MountPointConfig `key:"mountpoint" json:"mountpoint"`
}

type StorageLayer

type StorageLayer interface {
}

type Tailscale

type Tailscale struct {
	// contains filtered or unexported fields
}

func NewTailscale

func NewTailscale(ctx context.Context, hostname string, cfg BlobCacheConfig) *Tailscale

NewTailscale creates a new Tailscale instance using tsnet

func (*Tailscale) DialWithTimeout

func (t *Tailscale) DialWithTimeout(ctx context.Context, addr string) (net.Conn, error)

DialWithTimeout returns a TCP connection to a tailscale service but times out after GRPCDialTimeoutS

func (*Tailscale) GetOrCreateServer

func (t *Tailscale) GetOrCreateServer() (*tsnet.Server, error)

func (*Tailscale) WaitForAuth

func (t *Tailscale) WaitForAuth(ctx context.Context, timeout time.Duration) error

type TailscaleConfig

type TailscaleConfig struct {
	WaitForAuth  bool   `key:"waitForAuth" json:"wait_for_auth"`
	ControlURL   string `key:"controlUrl" json:"control_url"`
	User         string `key:"user" json:"user"`
	AuthKey      string `key:"authKey" json:"auth_key"`
	HostName     string `key:"hostName" json:"host_name"`
	Debug        bool   `key:"debug" json:"debug"`
	Ephemeral    bool   `key:"ephemeral" json:"ephemeral"`
	StateDir     string `key:"stateDir" json:"state_dir"`
	DialTimeoutS int    `key:"dialTimeout" json:"dial_timeout"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL