Documentation
¶
Index ¶
- Constants
- Variables
- func AuthInterceptor(token string) grpc.UnaryClientInterceptor
- func CopyStruct(src, dst interface{})
- func GetConfigParser(format ConfigFormat) (koanf.Parser, error)
- func GetLogger() *logger
- func InitLogger(debugMode bool)
- func ToSlice(v interface{}) []interface{}
- func ToStruct(m map[string]string, out interface{}) error
- func WithClientName(name string) func(*redis.UniversalOptions)
- type BlobCacheClient
- type BlobCacheConfig
- type BlobCacheEntry
- type BlobCacheHost
- type BlobCacheMetadata
- func (m *BlobCacheMetadata) AddEntry(ctx context.Context, entry *BlobCacheEntry, host *BlobCacheHost) error
- func (m *BlobCacheMetadata) GetEntryLocations(ctx context.Context, hash string) (mapset.Set[string], error)
- func (m *BlobCacheMetadata) RemoveEntryLocation(ctx context.Context, hash string, host *BlobCacheHost) error
- func (m *BlobCacheMetadata) RetrieveEntry(ctx context.Context, hash string) (*BlobCacheEntry, error)
- type CacheService
- func (cs *CacheService) GetContent(ctx context.Context, req *proto.GetContentRequest) (*proto.GetContentResponse, error)
- func (cs *CacheService) GetState(ctx context.Context, req *proto.GetStateRequest) (*proto.GetStateResponse, error)
- func (cs *CacheService) StartServer(port uint) error
- func (cs *CacheService) StoreContent(stream proto.BlobCache_StoreContentServer) error
- type CacheServiceOpts
- type ClientRequest
- type ClientRequestType
- type ConfigFormat
- type ConfigLoaderFunc
- type ConfigManager
- type ContentAddressableStorage
- type ContentSource
- type DiscoveryClient
- type ErrEntryNotFound
- type HostMap
- type MetadataConfig
- type ParserFunc
- type RedisClient
- func (r *RedisClient) Keys(ctx context.Context, pattern string) ([]string, error)
- func (r *RedisClient) LRange(ctx context.Context, key string, start, stop int64) ([]string, error)
- func (r *RedisClient) PSubscribe(ctx context.Context, channels ...string) (<-chan *redis.Message, <-chan error, func())
- func (r *RedisClient) Publish(ctx context.Context, channel string, message interface{}) *redis.IntCmd
- func (r *RedisClient) Scan(ctx context.Context, pattern string) ([]string, error)
- func (r *RedisClient) Subscribe(ctx context.Context, channels ...string) (<-chan *redis.Message, <-chan error)
- func (r *RedisClient) ToSlice(v interface{}) []interface{}
- func (r *RedisClient) ToStruct(m map[string]string, out interface{}) error
- type RedisConfig
- type RedisLock
- type RedisLockOption
- type RedisLockOptions
- type RedisMode
- type Tailscale
- type TailscaleConfig
Constants ¶
const ( BlobCacheHostPrefix string = "blobcache-host" BlobCacheClientPrefix string = "blobcache-client" BlobCacheVersion string = "v0.1.0" )
Variables ¶
var ( ErrChannelClosed = errors.New("redis: channel closed") ErrConnectionIssue = errors.New("redis: connection issue") ErrUnknownRedisMode = errors.New("redis: unknown mode") )
var (
Logger *logger
)
var MetadataKeys = &metadataKeys{}
Functions ¶
func AuthInterceptor ¶
func AuthInterceptor(token string) grpc.UnaryClientInterceptor
func CopyStruct ¶
func CopyStruct(src, dst interface{})
Attempts to copy field values of the same name from the src to the dst struct.
func GetConfigParser ¶
func GetConfigParser(format ConfigFormat) (koanf.Parser, error)
func InitLogger ¶
func InitLogger(debugMode bool)
func ToSlice ¶
func ToSlice(v interface{}) []interface{}
Flattens a struct using its field tags so it can be used by HSet. Struct fields must have the redis tag on them otherwise they will be ignored.
func ToStruct ¶
Copies the result of HGetAll to a provided struct. If a field cannot be parsed, we use Go's default value. Struct fields must have the redis tag on them otherwise they will be ignored.
func WithClientName ¶
func WithClientName(name string) func(*redis.UniversalOptions)
Types ¶
type BlobCacheClient ¶
type BlobCacheClient struct {
// contains filtered or unexported fields
}
func NewBlobCacheClient ¶
func NewBlobCacheClient(ctx context.Context, cfg BlobCacheConfig) (*BlobCacheClient, error)
func (*BlobCacheClient) GetContent ¶
func (*BlobCacheClient) GetState ¶
func (c *BlobCacheClient) GetState() error
func (*BlobCacheClient) StoreContent ¶
func (c *BlobCacheClient) StoreContent(chunks chan []byte) (string, error)
type BlobCacheConfig ¶
type BlobCacheConfig struct { Token string `key:"token" json:"token"` DebugMode bool `key:"debugMode" json:"debug_mode"` TLSEnabled bool `key:"tlsEnabled" json:"tls_enabled"` Port uint `key:"port" json:"port"` RoundTripThresholdMilliseconds uint `key:"rttThresholdMilliseconds" json:"rtt_threshold_ms"` MaxCacheSizeMb int64 `key:"maxCacheSizeMb" json:"max_cache_size_mb"` PageSizeBytes int64 `key:"pageSizeBytes" json:"page_size_bytes"` GRPCDialTimeoutS int `key:"grpcDialTimeoutS" json:"grpc_dial_timeout_s"` GRPCMessageSizeBytes int `key:"grpcMessageSizeBytes" json:"grpc_message_size_bytes"` Tailscale TailscaleConfig `key:"tailscale" json:"tailscale"` Metadata MetadataConfig `key:"metadata" json:"metadata"` DiscoveryIntervalS int `key:"discoveryIntervalS" json:"discovery_interval_s"` }
type BlobCacheEntry ¶
type BlobCacheHost ¶
type BlobCacheMetadata ¶
type BlobCacheMetadata struct {
// contains filtered or unexported fields
}
func NewBlobCacheMetadata ¶
func NewBlobCacheMetadata(cfg MetadataConfig) (*BlobCacheMetadata, error)
func (*BlobCacheMetadata) AddEntry ¶
func (m *BlobCacheMetadata) AddEntry(ctx context.Context, entry *BlobCacheEntry, host *BlobCacheHost) error
func (*BlobCacheMetadata) GetEntryLocations ¶
func (*BlobCacheMetadata) RemoveEntryLocation ¶
func (m *BlobCacheMetadata) RemoveEntryLocation(ctx context.Context, hash string, host *BlobCacheHost) error
func (*BlobCacheMetadata) RetrieveEntry ¶
func (m *BlobCacheMetadata) RetrieveEntry(ctx context.Context, hash string) (*BlobCacheEntry, error)
type CacheService ¶
type CacheService struct { proto.UnimplementedBlobCacheServer // contains filtered or unexported fields }
func NewCacheService ¶
func NewCacheService(ctx context.Context, cfg BlobCacheConfig) (*CacheService, error)
func (*CacheService) GetContent ¶
func (cs *CacheService) GetContent(ctx context.Context, req *proto.GetContentRequest) (*proto.GetContentResponse, error)
func (*CacheService) GetState ¶
func (cs *CacheService) GetState(ctx context.Context, req *proto.GetStateRequest) (*proto.GetStateResponse, error)
func (*CacheService) StartServer ¶
func (cs *CacheService) StartServer(port uint) error
func (*CacheService) StoreContent ¶
func (cs *CacheService) StoreContent(stream proto.BlobCache_StoreContentServer) error
type CacheServiceOpts ¶
type CacheServiceOpts struct {
Addr string
}
type ClientRequest ¶
type ClientRequest struct {
// contains filtered or unexported fields
}
type ClientRequestType ¶
type ClientRequestType int
const ( ClientRequestTypeStorage ClientRequestType = iota ClientRequestTypeRetrieval )
type ConfigFormat ¶
type ConfigFormat string
var ( JSONConfigFormat ConfigFormat = ".json" YAMLConfigFormat ConfigFormat = ".yaml" YMLConfigFormat ConfigFormat = ".yml" )
type ConfigLoaderFunc ¶
type ConfigLoaderFunc func(k *koanf.Koanf) error
ConfigLoaderFunc is a function type used to load configuration into a Koanf instance. It takes a Koanf pointer 'k' as a parameter and returns an error if the loading process encounters any issues.
type ConfigManager ¶
type ConfigManager[T any] struct { // contains filtered or unexported fields }
ConfigManager is a generic configuration manager that allows handling and manipulation of configuration data for various types. It includes a Koanf instance ('kf') for managing configuration settings.
func NewConfigManager ¶
func NewConfigManager[T any]() (*ConfigManager[T], error)
NewConfigManager creates a new instance of the ConfigManager[T] type for managing configuration of type 'T'. It initializes the ConfigManager with the specified 'T' type, loads a default configuration, and optionally loads a user configuration if the 'CONFIG_PATH' environment variable is provided. If debug mode is enabled, it prints the current configuration.
func (*ConfigManager[T]) GetConfig ¶
func (cm *ConfigManager[T]) GetConfig() T
GetConfig retrieves the current configuration of type 'T' from the ConfigManager. It unmarshals the configuration data and returns it. If any errors occur during unmarshaling, it logs a fatal error and exits the application.
func (*ConfigManager[T]) LoadConfig ¶
func (cm *ConfigManager[T]) LoadConfig(format ConfigFormat, provider koanf.Provider) error
LoadConfig loads configuration data from a given provider in the specified format into the ConfigManager. It obtains a parser for the format, and then loads the configuration data. If any errors occur during the loading process, they are returned as an error.
func (*ConfigManager[T]) Print ¶
func (cm *ConfigManager[T]) Print() string
Print returns a string representation of the current configuration state.
type ContentAddressableStorage ¶
type ContentAddressableStorage struct {
// contains filtered or unexported fields
}
func NewContentAddressableStorage ¶
func NewContentAddressableStorage(ctx context.Context, currentHost *BlobCacheHost, metadata *BlobCacheMetadata, config BlobCacheConfig) (*ContentAddressableStorage, error)
func (*ContentAddressableStorage) Cleanup ¶
func (cas *ContentAddressableStorage) Cleanup()
type ContentSource ¶
type ContentSource string
const ( ContentSourceS3 ContentSource = "s3://" ContentSourceJuiceFS ContentSource = "jfs://" )
type DiscoveryClient ¶
type DiscoveryClient struct {
// contains filtered or unexported fields
}
func NewDiscoveryClient ¶
func NewDiscoveryClient(cfg BlobCacheConfig, tailscale *Tailscale, hostMap *HostMap) *DiscoveryClient
func (*DiscoveryClient) FindNearbyHosts ¶
func (d *DiscoveryClient) FindNearbyHosts(ctx context.Context, client *tailscale.LocalClient) ([]*BlobCacheHost, error)
func (*DiscoveryClient) GetHostState ¶
func (d *DiscoveryClient) GetHostState(ctx context.Context, addr string) (*BlobCacheHost, error)
checkService attempts to connect to the gRPC service and verifies its availability
func (*DiscoveryClient) StartInBackground ¶
func (d *DiscoveryClient) StartInBackground(ctx context.Context) error
Used by blobcache servers to discover their closest peers
type ErrEntryNotFound ¶
type ErrEntryNotFound struct {
Hash string
}
func (*ErrEntryNotFound) Error ¶
func (e *ErrEntryNotFound) Error() string
type HostMap ¶
type HostMap struct {
// contains filtered or unexported fields
}
func NewHostMap ¶
func NewHostMap(onHostAdded func(*BlobCacheHost) error) *HostMap
func (*HostMap) Closest ¶
func (hm *HostMap) Closest(timeout time.Duration) (*BlobCacheHost, error)
Closest finds the nearest host within a given timeout If no hosts are found, it will error out
func (*HostMap) Get ¶
func (hm *HostMap) Get(addr string) *BlobCacheHost
func (*HostMap) Remove ¶
func (hm *HostMap) Remove(host *BlobCacheHost)
func (*HostMap) Set ¶
func (hm *HostMap) Set(host *BlobCacheHost)
type MetadataConfig ¶
type ParserFunc ¶
type ParserFunc func() (koanf.Parser, error)
type RedisClient ¶
type RedisClient struct {
redis.UniversalClient
}
func NewRedisClient ¶
func NewRedisClient(config RedisConfig, options ...func(*redis.UniversalOptions)) (*RedisClient, error)
func (*RedisClient) Keys ¶
Gets all keys using a pattern Actually runs a scan since keys locks up the database.
func (*RedisClient) PSubscribe ¶
func (r *RedisClient) PSubscribe(ctx context.Context, channels ...string) (<-chan *redis.Message, <-chan error, func())
func (*RedisClient) Publish ¶
func (r *RedisClient) Publish(ctx context.Context, channel string, message interface{}) *redis.IntCmd
func (*RedisClient) Subscribe ¶
func (r *RedisClient) Subscribe(ctx context.Context, channels ...string) (<-chan *redis.Message, <-chan error)
func (*RedisClient) ToSlice ¶
func (r *RedisClient) ToSlice(v interface{}) []interface{}
type RedisConfig ¶
type RedisConfig struct { Addrs []string `key:"addrs" json:"addrs"` Mode RedisMode `key:"mode" json:"mode"` ClientName string `key:"clientName" json:"client_name"` EnableTLS bool `key:"enableTLS" json:"enable_tls"` InsecureSkipVerify bool `key:"insecureSkipVerify" json:"insecure_skip_verify"` MinIdleConns int `key:"minIdleConns" json:"min_idle_conns"` MaxIdleConns int `key:"maxIdleConns" json:"max_idle_conns"` ConnMaxIdleTime time.Duration `key:"connMaxIdleTime" json:"conn_max_idle_time"` ConnMaxLifetime time.Duration `key:"connMaxLifetime" json:"conn_max_lifetime"` DialTimeout time.Duration `key:"dialTimeout" json:"dial_timeout"` ReadTimeout time.Duration `key:"readTimeout" json:"read_timeout"` WriteTimeout time.Duration `key:"writeTimeout" json:"write_timeout"` MaxRedirects int `key:"maxRedirects" json:"max_redirects"` MaxRetries int `key:"maxRetries" json:"max_retries"` PoolSize int `key:"poolSize" json:"pool_size"` Username string `key:"username" json:"username"` Password string `key:"password" json:"password"` RouteByLatency bool `key:"routeByLatency" json:"route_by_latency"` }
type RedisLock ¶
type RedisLock struct {
// contains filtered or unexported fields
}
func NewRedisLock ¶
func NewRedisLock(client *RedisClient, opts ...RedisLockOption) *RedisLock
type RedisLockOption ¶
type RedisLockOption func(*RedisLock)
type RedisLockOptions ¶
type Tailscale ¶
type Tailscale struct {
// contains filtered or unexported fields
}
func NewTailscale ¶
func NewTailscale(hostname string, cfg BlobCacheConfig) *Tailscale
NewTailscale creates a new Tailscale instance using tsnet
func (*Tailscale) DialWithTimeout ¶
DialWithTimeout returns a TCP connection to a tailscale service but times out after GRPCDialTimeoutS
func (*Tailscale) GetOrCreateServer ¶
type TailscaleConfig ¶
type TailscaleConfig struct { ControlURL string `key:"controlUrl" json:"control_url"` User string `key:"user" json:"user"` AuthKey string `key:"authKey" json:"auth_key"` HostName string `key:"hostName" json:"host_name"` Debug bool `key:"debug" json:"debug"` Ephemeral bool `key:"ephemeral" json:"ephemeral"` StateDir string `key:"stateDir" json:"state_dir"` DialTimeoutS int `key:"dialTimeout" json:"dial_timeout"` }