blobcache

package
v0.0.0-...-61de953 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 23, 2024 License: MIT Imports: 44 Imported by: 2

Documentation

Index

Constants

View Source
const (
	BlobCacheHostPrefix   string = "blobcache-host"
	BlobCacheClientPrefix string = "blobcache-client"
	BlobCacheVersion      string = "v0.1.0"
)

Variables

View Source
var (
	ErrChannelClosed    = errors.New("redis: channel closed")
	ErrConnectionIssue  = errors.New("redis: connection issue")
	ErrUnknownRedisMode = errors.New("redis: unknown mode")
)
View Source
var (
	Logger *logger
)
View Source
var MetadataKeys = &metadataKeys{}

Functions

func AuthInterceptor

func AuthInterceptor(token string) grpc.UnaryClientInterceptor

func CopyStruct

func CopyStruct(src, dst interface{})

Attempts to copy field values of the same name from the src to the dst struct.

func GetConfigParser

func GetConfigParser(format ConfigFormat) (koanf.Parser, error)

func GetLogger

func GetLogger() *logger

func InitLogger

func InitLogger(debugMode bool)

func ToSlice

func ToSlice(v interface{}) []interface{}

Flattens a struct using its field tags so it can be used by HSet. Struct fields must have the redis tag on them otherwise they will be ignored.

func ToStruct

func ToStruct(m map[string]string, out interface{}) error

Copies the result of HGetAll to a provided struct. If a field cannot be parsed, we use Go's default value. Struct fields must have the redis tag on them otherwise they will be ignored.

func WithClientName

func WithClientName(name string) func(*redis.UniversalOptions)

Types

type BlobCacheClient

type BlobCacheClient struct {
	// contains filtered or unexported fields
}

func NewBlobCacheClient

func NewBlobCacheClient(ctx context.Context, cfg BlobCacheConfig) (*BlobCacheClient, error)

func (*BlobCacheClient) GetContent

func (c *BlobCacheClient) GetContent(hash string, offset int64, length int64) ([]byte, error)

func (*BlobCacheClient) GetState

func (c *BlobCacheClient) GetState() error

func (*BlobCacheClient) StoreContent

func (c *BlobCacheClient) StoreContent(chunks chan []byte) (string, error)

type BlobCacheConfig

type BlobCacheConfig struct {
	Token                          string          `key:"token" json:"token"`
	DebugMode                      bool            `key:"debugMode" json:"debug_mode"`
	TLSEnabled                     bool            `key:"tlsEnabled" json:"tls_enabled"`
	Port                           uint            `key:"port" json:"port"`
	RoundTripThresholdMilliseconds uint            `key:"rttThresholdMilliseconds" json:"rtt_threshold_ms"`
	MaxCacheSizeMb                 int64           `key:"maxCacheSizeMb" json:"max_cache_size_mb"`
	PageSizeBytes                  int64           `key:"pageSizeBytes" json:"page_size_bytes"`
	GRPCDialTimeoutS               int             `key:"grpcDialTimeoutS" json:"grpc_dial_timeout_s"`
	GRPCMessageSizeBytes           int             `key:"grpcMessageSizeBytes" json:"grpc_message_size_bytes"`
	Tailscale                      TailscaleConfig `key:"tailscale" json:"tailscale"`
	Metadata                       MetadataConfig  `key:"metadata" json:"metadata"`
	DiscoveryIntervalS             int             `key:"discoveryIntervalS" json:"discovery_interval_s"`
}

type BlobCacheEntry

type BlobCacheEntry struct {
	Hash    string `redis:"hash" json:"hash"`
	Size    int64  `redis:"size" json:"size"`
	Content []byte `redis:"content" json:"content"`
	Source  string `redis:"source" json:"source"`
}

type BlobCacheHost

type BlobCacheHost struct {
	RTT  time.Duration
	Addr string
}

type BlobCacheMetadata

type BlobCacheMetadata struct {
	// contains filtered or unexported fields
}

func NewBlobCacheMetadata

func NewBlobCacheMetadata(cfg MetadataConfig) (*BlobCacheMetadata, error)

func (*BlobCacheMetadata) AddEntry

func (m *BlobCacheMetadata) AddEntry(ctx context.Context, entry *BlobCacheEntry, host *BlobCacheHost) error

func (*BlobCacheMetadata) GetEntryLocations

func (m *BlobCacheMetadata) GetEntryLocations(ctx context.Context, hash string) (mapset.Set[string], error)

func (*BlobCacheMetadata) RemoveEntryLocation

func (m *BlobCacheMetadata) RemoveEntryLocation(ctx context.Context, hash string, host *BlobCacheHost) error

func (*BlobCacheMetadata) RetrieveEntry

func (m *BlobCacheMetadata) RetrieveEntry(ctx context.Context, hash string) (*BlobCacheEntry, error)

type CacheService

type CacheService struct {
	proto.UnimplementedBlobCacheServer
	// contains filtered or unexported fields
}

func NewCacheService

func NewCacheService(ctx context.Context, cfg BlobCacheConfig) (*CacheService, error)

func (*CacheService) GetContent

func (*CacheService) GetState

func (*CacheService) StartServer

func (cs *CacheService) StartServer(port uint) error

func (*CacheService) StoreContent

func (cs *CacheService) StoreContent(stream proto.BlobCache_StoreContentServer) error

type CacheServiceOpts

type CacheServiceOpts struct {
	Addr string
}

type ClientRequest

type ClientRequest struct {
	// contains filtered or unexported fields
}

type ClientRequestType

type ClientRequestType int
const (
	ClientRequestTypeStorage ClientRequestType = iota
	ClientRequestTypeRetrieval
)

type ConfigFormat

type ConfigFormat string
var (
	JSONConfigFormat ConfigFormat = ".json"
	YAMLConfigFormat ConfigFormat = ".yaml"
	YMLConfigFormat  ConfigFormat = ".yml"
)

type ConfigLoaderFunc

type ConfigLoaderFunc func(k *koanf.Koanf) error

ConfigLoaderFunc is a function type used to load configuration into a Koanf instance. It takes a Koanf pointer 'k' as a parameter and returns an error if the loading process encounters any issues.

type ConfigManager

type ConfigManager[T any] struct {
	// contains filtered or unexported fields
}

ConfigManager is a generic configuration manager that allows handling and manipulation of configuration data for various types. It includes a Koanf instance ('kf') for managing configuration settings.

func NewConfigManager

func NewConfigManager[T any]() (*ConfigManager[T], error)

NewConfigManager creates a new instance of the ConfigManager[T] type for managing configuration of type 'T'. It initializes the ConfigManager with the specified 'T' type, loads a default configuration, and optionally loads a user configuration if the 'CONFIG_PATH' environment variable is provided. If debug mode is enabled, it prints the current configuration.

func (*ConfigManager[T]) GetConfig

func (cm *ConfigManager[T]) GetConfig() T

GetConfig retrieves the current configuration of type 'T' from the ConfigManager. It unmarshals the configuration data and returns it. If any errors occur during unmarshaling, it logs a fatal error and exits the application.

func (*ConfigManager[T]) LoadConfig

func (cm *ConfigManager[T]) LoadConfig(format ConfigFormat, provider koanf.Provider) error

LoadConfig loads configuration data from a given provider in the specified format into the ConfigManager. It obtains a parser for the format, and then loads the configuration data. If any errors occur during the loading process, they are returned as an error.

func (*ConfigManager[T]) Print

func (cm *ConfigManager[T]) Print() string

Print returns a string representation of the current configuration state.

type ContentAddressableStorage

type ContentAddressableStorage struct {
	// contains filtered or unexported fields
}

func NewContentAddressableStorage

func NewContentAddressableStorage(ctx context.Context, currentHost *BlobCacheHost, metadata *BlobCacheMetadata, config BlobCacheConfig) (*ContentAddressableStorage, error)

func (*ContentAddressableStorage) Add

func (cas *ContentAddressableStorage) Add(ctx context.Context, hash string, content []byte, source string) error

func (*ContentAddressableStorage) Cleanup

func (cas *ContentAddressableStorage) Cleanup()

func (*ContentAddressableStorage) Get

func (cas *ContentAddressableStorage) Get(hash string, offset, length int64) ([]byte, error)

type ContentSource

type ContentSource string
const (
	ContentSourceS3      ContentSource = "s3://"
	ContentSourceJuiceFS ContentSource = "jfs://"
)

type DiscoveryClient

type DiscoveryClient struct {
	// contains filtered or unexported fields
}

func NewDiscoveryClient

func NewDiscoveryClient(cfg BlobCacheConfig, tailscale *Tailscale, hostMap *HostMap) *DiscoveryClient

func (*DiscoveryClient) FindNearbyHosts

func (d *DiscoveryClient) FindNearbyHosts(ctx context.Context, client *tailscale.LocalClient) ([]*BlobCacheHost, error)

func (*DiscoveryClient) GetHostState

func (d *DiscoveryClient) GetHostState(ctx context.Context, addr string) (*BlobCacheHost, error)

checkService attempts to connect to the gRPC service and verifies its availability

func (*DiscoveryClient) StartInBackground

func (d *DiscoveryClient) StartInBackground(ctx context.Context) error

Used by blobcache servers to discover their closest peers

type ErrEntryNotFound

type ErrEntryNotFound struct {
	Hash string
}

func (*ErrEntryNotFound) Error

func (e *ErrEntryNotFound) Error() string

type HostMap

type HostMap struct {
	// contains filtered or unexported fields
}

func NewHostMap

func NewHostMap(onHostAdded func(*BlobCacheHost) error) *HostMap

func (*HostMap) Closest

func (hm *HostMap) Closest(timeout time.Duration) (*BlobCacheHost, error)

Closest finds the nearest host within a given timeout If no hosts are found, it will error out

func (*HostMap) Get

func (hm *HostMap) Get(addr string) *BlobCacheHost

func (*HostMap) Members

func (hm *HostMap) Members() mapset.Set[string]

func (*HostMap) Remove

func (hm *HostMap) Remove(host *BlobCacheHost)

func (*HostMap) Set

func (hm *HostMap) Set(host *BlobCacheHost)

type MetadataConfig

type MetadataConfig struct {
	RedisAddr       string `key:"redisAddr" json:"redis_addr"`
	RedisPasswd     string `key:"redisPasswd" json:"redis_passwd"`
	RedisTLSEnabled bool   `key:"redisTLSEnabled" json:"redis_tls_enabled"`
}

type ParserFunc

type ParserFunc func() (koanf.Parser, error)

type RedisClient

type RedisClient struct {
	redis.UniversalClient
}

func NewRedisClient

func NewRedisClient(config RedisConfig, options ...func(*redis.UniversalOptions)) (*RedisClient, error)

func (*RedisClient) Keys

func (r *RedisClient) Keys(ctx context.Context, pattern string) ([]string, error)

Gets all keys using a pattern Actually runs a scan since keys locks up the database.

func (*RedisClient) LRange

func (r *RedisClient) LRange(ctx context.Context, key string, start, stop int64) ([]string, error)

func (*RedisClient) PSubscribe

func (r *RedisClient) PSubscribe(ctx context.Context, channels ...string) (<-chan *redis.Message, <-chan error, func())

func (*RedisClient) Publish

func (r *RedisClient) Publish(ctx context.Context, channel string, message interface{}) *redis.IntCmd

func (*RedisClient) Scan

func (r *RedisClient) Scan(ctx context.Context, pattern string) ([]string, error)

func (*RedisClient) Subscribe

func (r *RedisClient) Subscribe(ctx context.Context, channels ...string) (<-chan *redis.Message, <-chan error)

func (*RedisClient) ToSlice

func (r *RedisClient) ToSlice(v interface{}) []interface{}

func (*RedisClient) ToStruct

func (r *RedisClient) ToStruct(m map[string]string, out interface{}) error

type RedisConfig

type RedisConfig struct {
	Addrs              []string      `key:"addrs" json:"addrs"`
	Mode               RedisMode     `key:"mode" json:"mode"`
	ClientName         string        `key:"clientName" json:"client_name"`
	EnableTLS          bool          `key:"enableTLS" json:"enable_tls"`
	InsecureSkipVerify bool          `key:"insecureSkipVerify" json:"insecure_skip_verify"`
	MinIdleConns       int           `key:"minIdleConns" json:"min_idle_conns"`
	MaxIdleConns       int           `key:"maxIdleConns" json:"max_idle_conns"`
	ConnMaxIdleTime    time.Duration `key:"connMaxIdleTime" json:"conn_max_idle_time"`
	ConnMaxLifetime    time.Duration `key:"connMaxLifetime" json:"conn_max_lifetime"`
	DialTimeout        time.Duration `key:"dialTimeout" json:"dial_timeout"`
	ReadTimeout        time.Duration `key:"readTimeout" json:"read_timeout"`
	WriteTimeout       time.Duration `key:"writeTimeout" json:"write_timeout"`
	MaxRedirects       int           `key:"maxRedirects" json:"max_redirects"`
	MaxRetries         int           `key:"maxRetries" json:"max_retries"`
	PoolSize           int           `key:"poolSize" json:"pool_size"`
	Username           string        `key:"username" json:"username"`
	Password           string        `key:"password" json:"password"`
	RouteByLatency     bool          `key:"routeByLatency" json:"route_by_latency"`
}

type RedisLock

type RedisLock struct {
	// contains filtered or unexported fields
}

func NewRedisLock

func NewRedisLock(client *RedisClient, opts ...RedisLockOption) *RedisLock

func (*RedisLock) Acquire

func (l *RedisLock) Acquire(ctx context.Context, key string, opts RedisLockOptions) error

func (*RedisLock) Release

func (l *RedisLock) Release(key string) error

type RedisLockOption

type RedisLockOption func(*RedisLock)

type RedisLockOptions

type RedisLockOptions struct {
	TtlS    int
	Retries int
}

type RedisMode

type RedisMode string
var (
	RedisModeSingle  RedisMode = "single"
	RedisModeCluster RedisMode = "cluster"
)

type Tailscale

type Tailscale struct {
	// contains filtered or unexported fields
}

func NewTailscale

func NewTailscale(hostname string, cfg BlobCacheConfig) *Tailscale

NewTailscale creates a new Tailscale instance using tsnet

func (*Tailscale) DialWithTimeout

func (t *Tailscale) DialWithTimeout(ctx context.Context, addr string) (net.Conn, error)

DialWithTimeout returns a TCP connection to a tailscale service but times out after GRPCDialTimeoutS

func (*Tailscale) GetOrCreateServer

func (t *Tailscale) GetOrCreateServer() *tsnet.Server

type TailscaleConfig

type TailscaleConfig struct {
	ControlURL   string `key:"controlUrl" json:"control_url"`
	User         string `key:"user" json:"user"`
	AuthKey      string `key:"authKey" json:"auth_key"`
	HostName     string `key:"hostName" json:"host_name"`
	Debug        bool   `key:"debug" json:"debug"`
	Ephemeral    bool   `key:"ephemeral" json:"ephemeral"`
	StateDir     string `key:"stateDir" json:"state_dir"`
	DialTimeoutS int    `key:"dialTimeout" json:"dial_timeout"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL