Documentation ¶
Index ¶
- type ErrInvalidID
- type ErrNotFound
- type ErrNotPossible
- type ErrReplicaNotFound
- type ErrWrongReplicaState
- type InsertData
- type ReadConfig
- type Remote
- type RemoteReplicateConfig
- type Settings
- type Storage
- func (s *Storage) BlastPathRead(fid string, start uint64, end uint64) (io.ReadCloser, error)
- func (s *Storage) BlastPathStatus(out io.Writer)
- func (s *Storage) DebugID(out io.Writer, id string)
- func (s *Storage) GetMetrics() (m metrics.Metrics)
- func (s *Storage) Health() (bool, string)
- func (s *Storage) Insert(ctx context.Context, data *InsertData) (id string, err error)
- func (s *Storage) Read(ctx context.Context, rc ReadConfig) (io.ReadCloser, error)
- func (s *Storage) ReplicaHeartBeat(ctx context.Context, fn string) error
- func (s *Storage) ReplicaInitialize(ctx context.Context, fn string) error
- func (s *Storage) ReplicaQueueDelete(ctx context.Context, fn string) error
- func (s *Storage) ReplicaReplicate(ctx context.Context, fn string, rc RemoteReplicateConfig) error
- func (s *Storage) Start(ctx context.Context) error
- func (s *Storage) Status(out io.Writer)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ErrInvalidID ¶
type ErrInvalidID struct{}
func (ErrInvalidID) Error ¶
func (e ErrInvalidID) Error() string
type ErrNotFound ¶
type ErrNotFound string
func (ErrNotFound) Error ¶
func (e ErrNotFound) Error() string
type ErrNotPossible ¶
type ErrNotPossible struct{}
func (ErrNotPossible) Error ¶
func (ErrNotPossible) Error() string
type ErrReplicaNotFound ¶
type ErrReplicaNotFound string
func (ErrReplicaNotFound) Error ¶
func (e ErrReplicaNotFound) Error() string
type ErrWrongReplicaState ¶
type ErrWrongReplicaState struct{}
func (ErrWrongReplicaState) Error ¶
func (e ErrWrongReplicaState) Error() string
type InsertData ¶
type InsertData struct { // The actual data written to disk will be read from this Reader. Source io.Reader // If provided then this will ensure that the data received from the // client is at least this long. If this is zero then it is assumed // that the expected length of the data is unknown and therefor should // be read until EOF. Length int64 // If this is defined then tracing will be used at various points during // the insertion process. If this is nil then no tracing will be performed. Tracer *tracing.Trace }
When Calling Insert there are many different values that can be provided which are all bundled up here. This makes it a little cleaner for passing the data between server -> storage -> primary.
type ReadConfig ¶
type ReadConfig interface { // The NameSpace and ID being requested from this storage instance. NameSpace() string ID() string // Returns some FID specific fields as a helper. FID() fid.FID FIDString() string Machine() uint32 Start() uint64 Length() uint32 // If this returns true then the read request will not attempt to make // any remote calls. It will purely attempt to fetch the file from the // local cache and return 404 if its not found locally. LocalOnly() bool // Returns the Logger that is associated with this Read operation. If // this returns nil then a logger will be created from the BAseLogger // in the Storage object. Logger() *slog.Logger // Since the request may need to be forwarded on to a Remote we need to // allow a way for request contexts to be proxied. Since the Storage // implementation does not know or care what those contexts are they // are simply provided as an interface. Context() interface{} }
When performing a read the arguments can be kind of fluid due to the way that requests are made. Authentication may need to be proxied through to a remote in a way that we do not do with normal Inserts. As such this allows the upstream caller to provide Remote specific details that the storage implementation is simply unaware of.
type Remote ¶
type Remote interface { Delete(namespace, fn string) error HeartBeat(namespace, fn string) (bool, error) Initialize(namespace, fn string) error Read(rc ReadConfig) (io.ReadCloser, error) Replicate(rc RemoteReplicateConfig) (bool, error) String() string }
An interface that the Storage object will use when interfacing with remote instances. This implementation allows remotes to use any number of protocols or access methods vs statically defining them in the storage module.
type RemoteReplicateConfig ¶
type RemoteReplicateConfig interface { FileName() string GetBody() io.ReadCloser Hash() string NameSpace() string Offset() uint64 Size() uint64 }
All of the arguments to Replicate() are bundled up here in order to make it easier to pass objects in and around. This vastly simplifies the function footprint.
type Settings ¶
type Settings struct { // User to perform uploads from this namespace. AWSUploader *s3manager.Uploader // A function that will return a pool of Blobby remotes that // should be used for a new Replica. AssignRemotes func(int) ([]Remote, error) // The base directory that files will be stored in for this namespace. BaseDirectory string // If this is set to something other than nil then logging will be // written to this output. BaseLogger *slog.Logger // When set to true then the file will be compressed before its uploaded // to S3. This will break the ability to fetch identifiers not found in // the local file system cache to use accordingly. Compress bool CompressLevel int // A work queue for Compression related activities. CompressWorkQueue *workqueue.WorkQueue // If configured to do so then blobby will keep the primary file around // after it has been uploaded. This allows Read() operations to use the // local file rather than fetching from S3. DelayDelete time.Duration // The DelayQueue that will be used to schedule events like heart beat // timers, replica timeouts, etc. DelayQueue *delayqueue.DelayQueue // A WorkQueue for processing local file delete requests. DeleteLocalWorkQueue *workqueue.WorkQueue // A WorkQueue for processing remote replica delete requests. DeleteRemotesWorkQueue *workqueue.WorkQueue // After this amount of time a replica will be considered "orphaned" and // will trigger an upload of the data. This ensures that a primary being // lost won't cause data loss. HeartBeatTime time.Duration // The machine ID that is serving this name space. This must be unique // within all of the instances in the list of remotes. MachineID uint32 // The name of the napespace that this Storage implementation will // be serving. NameSpace string // The prefix for the namespace= tag; a value of blobby_ for this field // would give blobby_namespace as the tag key in the rendered Prometheus // metrics. NamespaceTagKeyPrefix string // The minimum and maximum number of open master files that are allowed // to be open. OpenFilesMaximum int32 OpenFilesMinimum int32 // A function that fetches data from a remote. Read func(ReadConfig) (io.ReadCloser, error) // The number of replicas that each master file should be assigned. Replicas int // S3 client used for downloading objects from S3. S3Client *s3.S3 // The S3 bucket and base path used for uploads as well as an optional // formatter for the file name as it will be written to S3. If not // provided it will default to the FID string. S3Bucket string S3BasePath string S3KeyFormat *fid.Formatter // If a file grows beyond this size then it will be moved into an // uploading state. UploadLargerThan uint64 // Upload files after this much time regardless of size. UploadOlder time.Duration // A WorkQueue for processing Upload requests. UploadWorkQueue *workqueue.WorkQueue }
type Storage ¶
type Storage struct {
// contains filtered or unexported fields
}
Handles on disk storage for a Blobby instance.
func (*Storage) BlastPathRead ¶
"Blast Path" read function that can fetch a ranged portion of a primary file. This works just like Read except that it will only read from local files and works on byte ranges rather than Blobby IDs.
func (*Storage) BlastPathStatus ¶
"Blast Path" status output for this Storage Name Space. This will output the current status of the primaries hosted by this instance.
func (*Storage) GetMetrics ¶
Returns a copy of the metrics associated with this Storage object.
func (*Storage) Health ¶
Returns true if this Storage is healthy and a string representing the reason why this Storage implementation is healthy.
func (*Storage) Insert ¶
Inserts new data into one of the open primary files. This will return the ID of the newly created object or an error if something went wrong. If an error is returned then it is not safe to assume that the data was successfully written.
func (*Storage) Read ¶
func (s *Storage) Read( ctx context.Context, rc ReadConfig, ) ( io.ReadCloser, error, )
Reads an individual ID (provided via rc). This may involve directly talking to S3, or talking to the remote machine that is serving the given ID.
func (*Storage) ReplicaHeartBeat ¶
Performs a Heart Beat on a replica. The only error condition here is that the replica does not exist.
func (*Storage) ReplicaInitialize ¶
Initializes a new replica in this Storage instance.
func (*Storage) ReplicaQueueDelete ¶
Queues a replica file for deletion.
func (*Storage) ReplicaReplicate ¶
func (s *Storage) ReplicaReplicate( ctx context.Context, fn string, rc RemoteReplicateConfig, ) error
Performs a replica replication call.
func (*Storage) Start ¶
Starts all of the supporting routines for this Storage implementation. This will also scan the storage directory looking for files created by a previous run of blobby. These will be automatically configured as a replica that is in the Uploading state in order to ensure that the data is written to S3 as quickly as possible since it may be from a failed instance.