Documentation
¶
Index ¶
- Constants
- Variables
- func AuthInterceptor(token string) grpc.UnaryClientInterceptor
- func CopyStruct(src, dst interface{})
- func DialWithTimeout(ctx context.Context, addr string) (net.Conn, error)
- func GenerateFsID(name string) string
- func GetConfigParser(format ConfigFormat) (koanf.Parser, error)
- func GetLogger() *logger
- func GetPrivateIpAddr() (string, error)
- func InitLogger(debugMode bool)
- func Mount(ctx context.Context, opts BlobFsSystemOpts) (func() error, <-chan error, *fuse.Server, error)
- func SHA1StringToUint64(hash string) (uint64, error)
- func ToSlice(v interface{}) []interface{}
- func ToStruct(m map[string]string, out interface{}) error
- func WithClientName(name string) func(*redis.UniversalOptions)
- type BlobCacheClient
- func (c *BlobCacheClient) Cleanup() error
- func (c *BlobCacheClient) GetContent(hash string, offset int64, length int64) ([]byte, error)
- func (c *BlobCacheClient) GetContentStream(hash string, offset int64, length int64) (chan []byte, error)
- func (c *BlobCacheClient) GetState() error
- func (c *BlobCacheClient) HostsAvailable() bool
- func (c *BlobCacheClient) StoreContent(chunks chan []byte) (string, error)
- func (c *BlobCacheClient) StoreContentFromSource(sourcePath string, sourceOffset int64) (string, error)
- func (c *BlobCacheClient) WaitForHosts(timeout time.Duration) error
- type BlobCacheConfig
- type BlobCacheEntry
- type BlobCacheHost
- type BlobCacheMetadata
- func (m *BlobCacheMetadata) AddEntry(ctx context.Context, entry *BlobCacheEntry, host *BlobCacheHost) error
- func (m *BlobCacheMetadata) AddFsNodeChild(ctx context.Context, pid, id string) error
- func (m *BlobCacheMetadata) AddHostToIndex(ctx context.Context, host *BlobCacheHost) error
- func (m *BlobCacheMetadata) GetAvailableHosts(ctx context.Context, removeHostCallback func(host *BlobCacheHost)) ([]*BlobCacheHost, error)
- func (m *BlobCacheMetadata) GetEntryLocations(ctx context.Context, hash string) (mapset.Set[string], error)
- func (m *BlobCacheMetadata) GetFsNode(ctx context.Context, id string) (*BlobFsMetadata, error)
- func (m *BlobCacheMetadata) GetFsNodeChildren(ctx context.Context, id string) ([]*BlobFsMetadata, error)
- func (m *BlobCacheMetadata) GetHostIndex(ctx context.Context) ([]*BlobCacheHost, error)
- func (m *BlobCacheMetadata) RemoveClientLock(ctx context.Context, clientId, hash string) error
- func (m *BlobCacheMetadata) RemoveEntryLocation(ctx context.Context, hash string, host *BlobCacheHost) error
- func (m *BlobCacheMetadata) RemoveFsNodeChild(ctx context.Context, id string) error
- func (m *BlobCacheMetadata) RemoveHostFromIndex(ctx context.Context, host *BlobCacheHost) error
- func (m *BlobCacheMetadata) RetrieveEntry(ctx context.Context, hash string) (*BlobCacheEntry, error)
- func (m *BlobCacheMetadata) SetClientLock(ctx context.Context, clientId, hash string) error
- func (m *BlobCacheMetadata) SetFsNode(ctx context.Context, id string, metadata *BlobFsMetadata) error
- func (m *BlobCacheMetadata) SetHostKeepAlive(ctx context.Context, host *BlobCacheHost) error
- func (m *BlobCacheMetadata) StoreContentInBlobFs(ctx context.Context, path string, hash string, size uint64) error
- type BlobFs
- type BlobFsConfig
- type BlobFsMetadata
- type BlobFsNode
- type BlobFsPrefetchConfig
- type BlobFsSystemOpts
- type CacheService
- func (cs *CacheService) GetContent(ctx context.Context, req *proto.GetContentRequest) (*proto.GetContentResponse, error)
- func (cs *CacheService) GetContentStream(req *proto.GetContentRequest, stream proto.BlobCache_GetContentStreamServer) error
- func (cs *CacheService) GetState(ctx context.Context, req *proto.GetStateRequest) (*proto.GetStateResponse, error)
- func (cs *CacheService) HostKeepAlive()
- func (cs *CacheService) StartServer(port uint) error
- func (cs *CacheService) StoreContent(stream proto.BlobCache_StoreContentServer) error
- func (cs *CacheService) StoreContentFromSource(ctx context.Context, req *proto.StoreContentFromSourceRequest) (*proto.StoreContentFromSourceResponse, error)
- type CacheServiceOpts
- type ClientRequest
- type ClientRequestType
- type ConfigFormat
- type ConfigLoaderFunc
- type ConfigManager
- type ContentAddressableStorage
- type DiscoveryClient
- func (d *DiscoveryClient) FindNearbyHosts(ctx context.Context) ([]*BlobCacheHost, error)
- func (d *DiscoveryClient) GetHostState(ctx context.Context, addr string) (*BlobCacheHost, error)
- func (d *DiscoveryClient) GetHostStateViaMetadata(ctx context.Context, addr, privateAddr string) (*BlobCacheHost, error)
- func (d *DiscoveryClient) StartInBackground(ctx context.Context) error
- type DiscoveryMode
- type ErrEntryNotFound
- type ErrNodeNotFound
- type FSNode
- func (n *FSNode) Create(ctx context.Context, name string, flags uint32, mode uint32, ...) (inode *fs.Inode, fh fs.FileHandle, fuseFlags uint32, errno syscall.Errno)
- func (n *FSNode) Getattr(ctx context.Context, fh fs.FileHandle, out *fuse.AttrOut) syscall.Errno
- func (n *FSNode) Lookup(ctx context.Context, name string, out *fuse.EntryOut) (*fs.Inode, syscall.Errno)
- func (n *FSNode) Mkdir(ctx context.Context, name string, mode uint32, out *fuse.EntryOut) (*fs.Inode, syscall.Errno)
- func (n *FSNode) OnAdd(ctx context.Context)
- func (n *FSNode) Open(ctx context.Context, flags uint32) (fh fs.FileHandle, fuseFlags uint32, errno syscall.Errno)
- func (n *FSNode) Opendir(ctx context.Context) syscall.Errno
- func (n *FSNode) Read(ctx context.Context, f fs.FileHandle, dest []byte, off int64) (fuse.ReadResult, syscall.Errno)
- func (n *FSNode) Readdir(ctx context.Context) (fs.DirStream, syscall.Errno)
- func (n *FSNode) Readlink(ctx context.Context) ([]byte, syscall.Errno)
- func (n *FSNode) Rename(ctx context.Context, oldName string, newParent fs.InodeEmbedder, ...) syscall.Errno
- func (n *FSNode) Rmdir(ctx context.Context, name string) syscall.Errno
- func (n *FSNode) Unlink(ctx context.Context, name string) syscall.Errno
- type FileSystem
- type FileSystemOpts
- type FileSystemStorage
- type HostMap
- func (hm *HostMap) Closest(timeout time.Duration) (*BlobCacheHost, error)
- func (hm *HostMap) ClosestWithCapacity(timeout time.Duration) (*BlobCacheHost, error)
- func (hm *HostMap) Get(addr string) *BlobCacheHost
- func (hm *HostMap) Members() mapset.Set[string]
- func (hm *HostMap) Remove(host *BlobCacheHost)
- func (hm *HostMap) Set(host *BlobCacheHost)
- type JuiceFSConfig
- type JuiceFsSource
- type MetadataConfig
- type MountPointConfig
- type MountPointSource
- type ParserFunc
- type PrefetchBuffer
- type PrefetchManager
- type PrefetchOpts
- type RedisClient
- func (r *RedisClient) Keys(ctx context.Context, pattern string) ([]string, error)
- func (r *RedisClient) LRange(ctx context.Context, key string, start, stop int64) ([]string, error)
- func (r *RedisClient) PSubscribe(ctx context.Context, channels ...string) (<-chan *redis.Message, <-chan error, func())
- func (r *RedisClient) Publish(ctx context.Context, channel string, message interface{}) *redis.IntCmd
- func (r *RedisClient) Scan(ctx context.Context, pattern string) ([]string, error)
- func (r *RedisClient) Subscribe(ctx context.Context, channels ...string) (<-chan *redis.Message, <-chan error)
- func (r *RedisClient) ToSlice(v interface{}) []interface{}
- func (r *RedisClient) ToStruct(m map[string]string, out interface{}) error
- type RedisConfig
- type RedisLock
- type RedisLockOption
- type RedisLockOptions
- type RedisMode
- type Source
- type SourceConfig
- type StorageLayer
- type Tailscale
- type TailscaleConfig
Constants ¶
const ( SourceModeJuiceFS string = "juicefs" SourceModeMountPoint string = "mountpoint" )
const ( BlobCacheHostPrefix string = "blobcache-host" BlobCacheClientPrefix string = "blobcache-client" BlobCacheVersion string = "dev" )
Variables ¶
var ( ErrHostNotFound = errors.New("host not found") ErrUnableToReachHost = errors.New("unable to reach host") ErrInvalidHostVersion = errors.New("invalid host version") ErrContentNotFound = errors.New("content not found") ErrClientNotFound = errors.New("client not found") ErrCacheLockHeld = errors.New("cache lock held") ErrUnableToPopulateContent = errors.New("unable to populate content from original source") ErrBlobFsMountFailure = errors.New("failed to mount blobfs") ErrTailscaleAuthentication = errors.New("tailscale authentication failed") )
var ( ErrChannelClosed = errors.New("redis: channel closed") ErrConnectionIssue = errors.New("redis: connection issue") ErrUnknownRedisMode = errors.New("redis: unknown mode") )
var (
Logger *logger
)
var MetadataKeys = &metadataKeys{}
Functions ¶
func AuthInterceptor ¶
func AuthInterceptor(token string) grpc.UnaryClientInterceptor
func CopyStruct ¶
func CopyStruct(src, dst interface{})
Attempts to copy field values of the same name from the src to the dst struct.
func GenerateFsID ¶
Generates a directory ID based on parent ID and name.
func GetConfigParser ¶
func GetConfigParser(format ConfigFormat) (koanf.Parser, error)
func GetPrivateIpAddr ¶
func InitLogger ¶
func InitLogger(debugMode bool)
func SHA1StringToUint64 ¶
SHA1StringToUint64 converts the first 8 bytes of a SHA-1 hash string to a uint64
func ToSlice ¶
func ToSlice(v interface{}) []interface{}
Flattens a struct using its field tags so it can be used by HSet. Struct fields must have the redis tag on them otherwise they will be ignored.
func ToStruct ¶
Copies the result of HGetAll to a provided struct. If a field cannot be parsed, we use Go's default value. Struct fields must have the redis tag on them otherwise they will be ignored.
func WithClientName ¶
func WithClientName(name string) func(*redis.UniversalOptions)
Types ¶
type BlobCacheClient ¶
type BlobCacheClient struct {
// contains filtered or unexported fields
}
func NewBlobCacheClient ¶
func NewBlobCacheClient(ctx context.Context, cfg BlobCacheConfig) (*BlobCacheClient, error)
func (*BlobCacheClient) Cleanup ¶
func (c *BlobCacheClient) Cleanup() error
func (*BlobCacheClient) GetContent ¶
func (*BlobCacheClient) GetContentStream ¶
func (*BlobCacheClient) GetState ¶
func (c *BlobCacheClient) GetState() error
func (*BlobCacheClient) HostsAvailable ¶
func (c *BlobCacheClient) HostsAvailable() bool
func (*BlobCacheClient) StoreContent ¶
func (c *BlobCacheClient) StoreContent(chunks chan []byte) (string, error)
func (*BlobCacheClient) StoreContentFromSource ¶
func (c *BlobCacheClient) StoreContentFromSource(sourcePath string, sourceOffset int64) (string, error)
func (*BlobCacheClient) WaitForHosts ¶
func (c *BlobCacheClient) WaitForHosts(timeout time.Duration) error
type BlobCacheConfig ¶
type BlobCacheConfig struct { Token string `key:"token" json:"token"` DebugMode bool `key:"debugMode" json:"debug_mode"` TLSEnabled bool `key:"tlsEnabled" json:"tls_enabled"` Port uint `key:"port" json:"port"` HostStorageCapacityThresholdPct float64 `key:"hostStorageCapacityThresholdPct" json:"host_storage_capacity_threshold_pct"` ObjectTtlS int `key:"objectTtlS" json:"object_ttl_s"` RoundTripThresholdMilliseconds uint `key:"rttThresholdMilliseconds" json:"rtt_threshold_ms"` MaxCachePct int64 `key:"maxCachePct" json:"max_cache_pct"` PageSizeBytes int64 `key:"pageSizeBytes" json:"page_size_bytes"` GRPCDialTimeoutS int `key:"grpcDialTimeoutS" json:"grpc_dial_timeout_s"` GRPCMessageSizeBytes int `key:"grpcMessageSizeBytes" json:"grpc_message_size_bytes"` Tailscale TailscaleConfig `key:"tailscale" json:"tailscale"` Metadata MetadataConfig `key:"metadata" json:"metadata"` DiscoveryIntervalS int `key:"discoveryIntervalS" json:"discovery_interval_s"` DiscoveryMode string `key:"discoveryMode" json:"discovery_mode"` BlobFs BlobFsConfig `key:"blobfs" json:"blobfs"` }
type BlobCacheEntry ¶
type BlobCacheHost ¶
type BlobCacheMetadata ¶
type BlobCacheMetadata struct {
// contains filtered or unexported fields
}
func NewBlobCacheMetadata ¶
func NewBlobCacheMetadata(cfg MetadataConfig) (*BlobCacheMetadata, error)
func (*BlobCacheMetadata) AddEntry ¶
func (m *BlobCacheMetadata) AddEntry(ctx context.Context, entry *BlobCacheEntry, host *BlobCacheHost) error
func (*BlobCacheMetadata) AddFsNodeChild ¶
func (m *BlobCacheMetadata) AddFsNodeChild(ctx context.Context, pid, id string) error
func (*BlobCacheMetadata) AddHostToIndex ¶
func (m *BlobCacheMetadata) AddHostToIndex(ctx context.Context, host *BlobCacheHost) error
func (*BlobCacheMetadata) GetAvailableHosts ¶
func (m *BlobCacheMetadata) GetAvailableHosts(ctx context.Context, removeHostCallback func(host *BlobCacheHost)) ([]*BlobCacheHost, error)
func (*BlobCacheMetadata) GetEntryLocations ¶
func (*BlobCacheMetadata) GetFsNode ¶
func (m *BlobCacheMetadata) GetFsNode(ctx context.Context, id string) (*BlobFsMetadata, error)
func (*BlobCacheMetadata) GetFsNodeChildren ¶
func (m *BlobCacheMetadata) GetFsNodeChildren(ctx context.Context, id string) ([]*BlobFsMetadata, error)
func (*BlobCacheMetadata) GetHostIndex ¶
func (m *BlobCacheMetadata) GetHostIndex(ctx context.Context) ([]*BlobCacheHost, error)
func (*BlobCacheMetadata) RemoveClientLock ¶
func (m *BlobCacheMetadata) RemoveClientLock(ctx context.Context, clientId, hash string) error
func (*BlobCacheMetadata) RemoveEntryLocation ¶
func (m *BlobCacheMetadata) RemoveEntryLocation(ctx context.Context, hash string, host *BlobCacheHost) error
func (*BlobCacheMetadata) RemoveFsNodeChild ¶
func (m *BlobCacheMetadata) RemoveFsNodeChild(ctx context.Context, id string) error
func (*BlobCacheMetadata) RemoveHostFromIndex ¶
func (m *BlobCacheMetadata) RemoveHostFromIndex(ctx context.Context, host *BlobCacheHost) error
func (*BlobCacheMetadata) RetrieveEntry ¶
func (m *BlobCacheMetadata) RetrieveEntry(ctx context.Context, hash string) (*BlobCacheEntry, error)
func (*BlobCacheMetadata) SetClientLock ¶
func (m *BlobCacheMetadata) SetClientLock(ctx context.Context, clientId, hash string) error
func (*BlobCacheMetadata) SetFsNode ¶
func (m *BlobCacheMetadata) SetFsNode(ctx context.Context, id string, metadata *BlobFsMetadata) error
func (*BlobCacheMetadata) SetHostKeepAlive ¶
func (m *BlobCacheMetadata) SetHostKeepAlive(ctx context.Context, host *BlobCacheHost) error
func (*BlobCacheMetadata) StoreContentInBlobFs ¶
type BlobFs ¶
type BlobFs struct { Metadata *BlobCacheMetadata Client *BlobCacheClient Config BlobCacheConfig PrefetchManager *PrefetchManager // contains filtered or unexported fields }
func NewFileSystem ¶
func NewFileSystem(ctx context.Context, opts BlobFsSystemOpts) (*BlobFs, error)
NewFileSystem initializes a new BlobFs with root metadata.
type BlobFsConfig ¶
type BlobFsConfig struct { Enabled bool `key:"enabled" json:"enabled"` Prefetch BlobFsPrefetchConfig `key:"prefetch" json:"prefetch"` MountPoint string `key:"mountPoint" json:"mount_point"` Sources []SourceConfig `key:"sources" json:"sources"` MaxBackgroundTasks int `key:"maxBackgroundTasks" json:"max_background_tasks"` MaxWriteKB int `key:"maxWriteKB" json:"max_write_kb"` MaxReadAheadKB int `key:"maxReadAheadKB" json:"max_read_ahead_kb"` DirectMount bool `key:"directMount" json:"direct_mount"` DirectIO bool `key:"directIO" json:"direct_io"` Options []string `key:"options" json:"options"` }
type BlobFsMetadata ¶
type BlobFsMetadata struct { PID string `redis:"pid" json:"pid"` ID string `redis:"id" json:"id"` Name string `redis:"name" json:"name"` Path string `redis:"path" json:"path"` Hash string `redis:"hash" json:"hash"` Ino uint64 `redis:"ino" json:"ino"` Size uint64 `redis:"size" json:"size"` Blocks uint64 `redis:"blocks" json:"blocks"` Atime uint64 `redis:"atime" json:"atime"` Mtime uint64 `redis:"mtime" json:"mtime"` Ctime uint64 `redis:"ctime" json:"ctime"` Atimensec uint32 `redis:"atimensec" json:"atimensec"` Mtimensec uint32 `redis:"mtimensec" json:"mtimensec"` Ctimensec uint32 `redis:"ctimensec" json:"ctimensec"` Mode uint32 `redis:"mode" json:"mode"` Nlink uint32 `redis:"nlink" json:"nlink"` Rdev uint32 `redis:"rdev" json:"rdev"` Blksize uint32 `redis:"blksize" json:"blksize"` Padding uint32 `redis:"padding" json:"padding"` Uid uint32 `redis:"uid" json:"uid"` Gid uint32 `redis:"gid" json:"gid"` Gen uint64 `redis:"gen" json:"gen"` }
type BlobFsNode ¶
type BlobFsPrefetchConfig ¶
type BlobFsPrefetchConfig struct { Enabled bool `key:"enabled" json:"enabled"` MinFileSizeBytes uint64 `key:"minFileSizeBytes" json:"min_file_size_bytes"` EvictionIntervalS int `key:"evictionIntervalS" json:"eviction_interval_s"` IdleTtlS int `key:"idleTtlS" json:"idle_ttl_s"` WindowSizeBytes uint64 `key:"windowSizeBytes" json:"window_size_bytes"` IgnoreFileExt []string `key:"ignoreFileExt" json:"ignore_file_ext"` DataTimeoutS int `key:"dataTimeoutS" json:"data_timeout_s"` }
type BlobFsSystemOpts ¶
type BlobFsSystemOpts struct { Verbose bool Metadata *BlobCacheMetadata Config BlobCacheConfig Client *BlobCacheClient }
type CacheService ¶
type CacheService struct { proto.UnimplementedBlobCacheServer // contains filtered or unexported fields }
func NewCacheService ¶
func NewCacheService(ctx context.Context, cfg BlobCacheConfig) (*CacheService, error)
func (*CacheService) GetContent ¶
func (cs *CacheService) GetContent(ctx context.Context, req *proto.GetContentRequest) (*proto.GetContentResponse, error)
func (*CacheService) GetContentStream ¶
func (cs *CacheService) GetContentStream(req *proto.GetContentRequest, stream proto.BlobCache_GetContentStreamServer) error
func (*CacheService) GetState ¶
func (cs *CacheService) GetState(ctx context.Context, req *proto.GetStateRequest) (*proto.GetStateResponse, error)
func (*CacheService) HostKeepAlive ¶
func (cs *CacheService) HostKeepAlive()
func (*CacheService) StartServer ¶
func (cs *CacheService) StartServer(port uint) error
func (*CacheService) StoreContent ¶
func (cs *CacheService) StoreContent(stream proto.BlobCache_StoreContentServer) error
func (*CacheService) StoreContentFromSource ¶
func (cs *CacheService) StoreContentFromSource(ctx context.Context, req *proto.StoreContentFromSourceRequest) (*proto.StoreContentFromSourceResponse, error)
type CacheServiceOpts ¶
type CacheServiceOpts struct {
Addr string
}
type ClientRequest ¶
type ClientRequest struct {
// contains filtered or unexported fields
}
type ClientRequestType ¶
type ClientRequestType int
const ( ClientRequestTypeStorage ClientRequestType = iota ClientRequestTypeRetrieval )
type ConfigFormat ¶
type ConfigFormat string
var ( JSONConfigFormat ConfigFormat = ".json" YAMLConfigFormat ConfigFormat = ".yaml" YMLConfigFormat ConfigFormat = ".yml" )
type ConfigLoaderFunc ¶
type ConfigLoaderFunc func(k *koanf.Koanf) error
ConfigLoaderFunc is a function type used to load configuration into a Koanf instance. It takes a Koanf pointer 'k' as a parameter and returns an error if the loading process encounters any issues.
type ConfigManager ¶
type ConfigManager[T any] struct { // contains filtered or unexported fields }
ConfigManager is a generic configuration manager that allows handling and manipulation of configuration data for various types. It includes a Koanf instance ('kf') for managing configuration settings.
func NewConfigManager ¶
func NewConfigManager[T any]() (*ConfigManager[T], error)
NewConfigManager creates a new instance of the ConfigManager[T] type for managing configuration of type 'T'. It initializes the ConfigManager with the specified 'T' type, loads a default configuration, and optionally loads a user configuration if the 'CONFIG_PATH' environment variable is provided. If debug mode is enabled, it prints the current configuration.
func (*ConfigManager[T]) GetConfig ¶
func (cm *ConfigManager[T]) GetConfig() T
GetConfig retrieves the current configuration of type 'T' from the ConfigManager. It unmarshals the configuration data and returns it. If any errors occur during unmarshaling, it logs a fatal error and exits the application.
func (*ConfigManager[T]) LoadConfig ¶
func (cm *ConfigManager[T]) LoadConfig(format ConfigFormat, provider koanf.Provider) error
LoadConfig loads configuration data from a given provider in the specified format into the ConfigManager. It obtains a parser for the format, and then loads the configuration data. If any errors occur during the loading process, they are returned as an error.
func (*ConfigManager[T]) Print ¶
func (cm *ConfigManager[T]) Print() string
Print returns a string representation of the current configuration state.
type ContentAddressableStorage ¶
type ContentAddressableStorage struct {
// contains filtered or unexported fields
}
func NewContentAddressableStorage ¶
func NewContentAddressableStorage(ctx context.Context, currentHost *BlobCacheHost, metadata *BlobCacheMetadata, config BlobCacheConfig) (*ContentAddressableStorage, error)
func (*ContentAddressableStorage) Cleanup ¶
func (cas *ContentAddressableStorage) Cleanup()
type DiscoveryClient ¶
type DiscoveryClient struct {
// contains filtered or unexported fields
}
func NewDiscoveryClient ¶
func NewDiscoveryClient(cfg BlobCacheConfig, tailscale *Tailscale, hostMap *HostMap, metadata *BlobCacheMetadata) *DiscoveryClient
func (*DiscoveryClient) FindNearbyHosts ¶
func (d *DiscoveryClient) FindNearbyHosts(ctx context.Context) ([]*BlobCacheHost, error)
func (*DiscoveryClient) GetHostState ¶
func (d *DiscoveryClient) GetHostState(ctx context.Context, addr string) (*BlobCacheHost, error)
checkService attempts to connect to the gRPC service and verifies its availability
func (*DiscoveryClient) GetHostStateViaMetadata ¶
func (d *DiscoveryClient) GetHostStateViaMetadata(ctx context.Context, addr, privateAddr string) (*BlobCacheHost, error)
func (*DiscoveryClient) StartInBackground ¶
func (d *DiscoveryClient) StartInBackground(ctx context.Context) error
Used by blobcache servers to discover their closest peers
type DiscoveryMode ¶
type DiscoveryMode string
const ( DiscoveryModeTailscale DiscoveryMode = "tailscale" DiscoveryModeMetadata DiscoveryMode = "metadata" )
type ErrEntryNotFound ¶
type ErrEntryNotFound struct {
Hash string
}
func (*ErrEntryNotFound) Error ¶
func (e *ErrEntryNotFound) Error() string
type ErrNodeNotFound ¶
type ErrNodeNotFound struct {
Id string
}
func (*ErrNodeNotFound) Error ¶
func (e *ErrNodeNotFound) Error() string
type FSNode ¶
type FileSystem ¶
type FileSystemOpts ¶
type FileSystemOpts struct { MountPoint string Verbose bool Metadata *BlobCacheMetadata }
BlobFS types
type FileSystemStorage ¶
type HostMap ¶
type HostMap struct {
// contains filtered or unexported fields
}
func NewHostMap ¶
func NewHostMap(cfg BlobCacheConfig, onHostAdded func(*BlobCacheHost) error) *HostMap
func (*HostMap) Closest ¶
func (hm *HostMap) Closest(timeout time.Duration) (*BlobCacheHost, error)
Closest finds the nearest host within a given timeout If no hosts are found, it will error out
func (*HostMap) ClosestWithCapacity ¶
func (hm *HostMap) ClosestWithCapacity(timeout time.Duration) (*BlobCacheHost, error)
ClosestWithCapacity finds the nearest host with available storage capacity within a given timeout If no hosts are found, it will error out
func (*HostMap) Get ¶
func (hm *HostMap) Get(addr string) *BlobCacheHost
func (*HostMap) Remove ¶
func (hm *HostMap) Remove(host *BlobCacheHost)
func (*HostMap) Set ¶
func (hm *HostMap) Set(host *BlobCacheHost)
type JuiceFSConfig ¶
type JuiceFSConfig struct { RedisURI string `key:"redisURI" json:"redis_uri"` Bucket string `key:"bucket" json:"bucket"` AccessKey string `key:"accessKey" json:"access_key"` SecretKey string `key:"secretKey" json:"secret_key"` CacheSize int64 `key:"cacheSize" json:"cache_size"` BlockSize int64 `key:"blockSize" json:"block_size"` Prefetch int64 `key:"prefetch" json:"prefetch"` BufferSize int64 `key:"bufferSize" json:"buffer_size"` }
type JuiceFsSource ¶
type JuiceFsSource struct {
// contains filtered or unexported fields
}
func (*JuiceFsSource) Format ¶
func (s *JuiceFsSource) Format(fsName string) error
func (*JuiceFsSource) Mount ¶
func (s *JuiceFsSource) Mount(localPath string) error
func (*JuiceFsSource) Unmount ¶
func (s *JuiceFsSource) Unmount(localPath string) error
type MetadataConfig ¶
type MountPointConfig ¶
type MountPointSource ¶
type MountPointSource struct {
// contains filtered or unexported fields
}
func (*MountPointSource) Format ¶
func (s *MountPointSource) Format(fsName string) error
func (*MountPointSource) Mount ¶
func (s *MountPointSource) Mount(localPath string) error
func (*MountPointSource) Unmount ¶
func (s *MountPointSource) Unmount(localPath string) error
type ParserFunc ¶
type ParserFunc func() (koanf.Parser, error)
type PrefetchBuffer ¶
type PrefetchBuffer struct {
// contains filtered or unexported fields
}
func NewPrefetchBuffer ¶
func NewPrefetchBuffer(opts PrefetchOpts) *PrefetchBuffer
func (*PrefetchBuffer) Clear ¶
func (pb *PrefetchBuffer) Clear()
func (*PrefetchBuffer) GetRange ¶
func (pb *PrefetchBuffer) GetRange(offset uint64, dst []byte) error
func (*PrefetchBuffer) IsIdle ¶
func (pb *PrefetchBuffer) IsIdle() bool
type PrefetchManager ¶
type PrefetchManager struct {
// contains filtered or unexported fields
}
func NewPrefetchManager ¶
func NewPrefetchManager(ctx context.Context, config BlobCacheConfig, client *BlobCacheClient) *PrefetchManager
func (*PrefetchManager) GetPrefetchBuffer ¶
func (pm *PrefetchManager) GetPrefetchBuffer(hash string, fileSize uint64) *PrefetchBuffer
GetPrefetchBuffer returns an existing prefetch buffer if it exists, or nil
func (*PrefetchManager) Start ¶
func (pm *PrefetchManager) Start()
type PrefetchOpts ¶
type PrefetchOpts struct { Ctx context.Context CancelFunc context.CancelFunc Hash string FileSize uint64 WindowSize uint64 Offset uint64 Client *BlobCacheClient DataTimeout time.Duration Manager *PrefetchManager }
type RedisClient ¶
type RedisClient struct {
redis.UniversalClient
}
func NewRedisClient ¶
func NewRedisClient(config RedisConfig, options ...func(*redis.UniversalOptions)) (*RedisClient, error)
func (*RedisClient) Keys ¶
Gets all keys using a pattern Actually runs a scan since keys locks up the database.
func (*RedisClient) PSubscribe ¶
func (r *RedisClient) PSubscribe(ctx context.Context, channels ...string) (<-chan *redis.Message, <-chan error, func())
func (*RedisClient) Publish ¶
func (r *RedisClient) Publish(ctx context.Context, channel string, message interface{}) *redis.IntCmd
func (*RedisClient) Subscribe ¶
func (r *RedisClient) Subscribe(ctx context.Context, channels ...string) (<-chan *redis.Message, <-chan error)
func (*RedisClient) ToSlice ¶
func (r *RedisClient) ToSlice(v interface{}) []interface{}
type RedisConfig ¶
type RedisConfig struct { Addrs []string `key:"addrs" json:"addrs"` Mode RedisMode `key:"mode" json:"mode"` ClientName string `key:"clientName" json:"client_name"` EnableTLS bool `key:"enableTLS" json:"enable_tls"` InsecureSkipVerify bool `key:"insecureSkipVerify" json:"insecure_skip_verify"` MinIdleConns int `key:"minIdleConns" json:"min_idle_conns"` MaxIdleConns int `key:"maxIdleConns" json:"max_idle_conns"` ConnMaxIdleTime time.Duration `key:"connMaxIdleTime" json:"conn_max_idle_time"` ConnMaxLifetime time.Duration `key:"connMaxLifetime" json:"conn_max_lifetime"` DialTimeout time.Duration `key:"dialTimeout" json:"dial_timeout"` ReadTimeout time.Duration `key:"readTimeout" json:"read_timeout"` WriteTimeout time.Duration `key:"writeTimeout" json:"write_timeout"` MaxRedirects int `key:"maxRedirects" json:"max_redirects"` MaxRetries int `key:"maxRetries" json:"max_retries"` PoolSize int `key:"poolSize" json:"pool_size"` Username string `key:"username" json:"username"` Password string `key:"password" json:"password"` RouteByLatency bool `key:"routeByLatency" json:"route_by_latency"` }
type RedisLock ¶
type RedisLock struct {
// contains filtered or unexported fields
}
func NewRedisLock ¶
func NewRedisLock(client *RedisClient, opts ...RedisLockOption) *RedisLock
type RedisLockOption ¶
type RedisLockOption func(*RedisLock)
type RedisLockOptions ¶
type Source ¶
type Source interface { Mount(localPath string) error Format(fsName string) error Unmount(localPath string) error }
func NewJuiceFsSource ¶
func NewJuiceFsSource(config JuiceFSConfig) (Source, error)
func NewMountPointSource ¶
func NewMountPointSource(config MountPointConfig) (Source, error)
func NewSource ¶
func NewSource(config SourceConfig) (Source, error)
type SourceConfig ¶
type SourceConfig struct { Mode string `key:"mode" json:"mode"` FilesystemName string `key:"fsName" json:"filesystem_name"` FilesystemPath string `key:"fsPath" json:"filesystem_path"` JuiceFS JuiceFSConfig `key:"juicefs" json:"juicefs"` MountPoint MountPointConfig `key:"mountpoint" json:"mountpoint"` }
type StorageLayer ¶
type StorageLayer interface { }
type Tailscale ¶
type Tailscale struct {
// contains filtered or unexported fields
}
func NewTailscale ¶
func NewTailscale(ctx context.Context, hostname string, cfg BlobCacheConfig) *Tailscale
NewTailscale creates a new Tailscale instance using tsnet
func (*Tailscale) DialWithTimeout ¶
DialWithTimeout returns a TCP connection to a tailscale service but times out after GRPCDialTimeoutS
func (*Tailscale) GetOrCreateServer ¶
type TailscaleConfig ¶
type TailscaleConfig struct { WaitForAuth bool `key:"waitForAuth" json:"wait_for_auth"` ControlURL string `key:"controlUrl" json:"control_url"` User string `key:"user" json:"user"` AuthKey string `key:"authKey" json:"auth_key"` HostName string `key:"hostName" json:"host_name"` Debug bool `key:"debug" json:"debug"` Ephemeral bool `key:"ephemeral" json:"ephemeral"` StateDir string `key:"stateDir" json:"state_dir"` DialTimeoutS int `key:"dialTimeout" json:"dial_timeout"` }