Documentation ¶
Overview ¶
Package objectserver provides a Object Server implementation.
Hummingbird Replication ¶
The hummingbird object server is backwards-compatible with the python-swift replicator, but hummingbird-to-hummingbird replication uses its own simple protocol that it sends over a hijacked HTTP connection.
Messages are 32-bit length prefixed JSON serialized structures. The basic flow looks like:
replicator sends a BeginReplicationRequest{Device string, Partition string, NeedHashes bool} server responds with a BeginReplicationResponse{Hashes map[string]string} for each object file in the partition where suffix hash doesn't match { replicator sends a SyncFileRequest{Path string, Xattrs string, Size int} server responds with a SyncFileResponse{Exists bool, NewerExists bool, GoAhead bool} if response.GoAhead is true { replicator sends raw file body server responds with a FileUploadResponse{Success bool} } }
The replicator limits concurrency per-device and overall. When the server gets a BeginReplicationRequest, it'll wait up to 60 seconds for a slot to open up before rejecting it.
Unlike python-swift, the replicator will only read each filesystem once per pass.
Index ¶
- Variables
- func AsyncDir(policy int) string
- func GetHashes(driveRoot string, device string, partition string, recalculate []string, ...) (map[string]string, error)
- func HashCleanupListDir(hashDir string, reclaimAge int64) ([]string, error)
- func InvalidateHash(hashDir string) error
- func MetadataHash(metadata map[string]string) string
- func MetadataMerge(a map[string]string, b map[string]string) map[string]string
- func MoveParts(args []string, cnf srv.ConfigLoader)
- func NewReplicator(serverconf conf.Config, flags *flag.FlagSet, cnf srv.ConfigLoader) (ipPort *srv.IpPort, server srv.Server, logger srv.LowLevelLogger, err error)
- func NewServer(serverconf conf.Config, flags *flag.FlagSet, cnf srv.ConfigLoader) (*srv.IpPort, srv.Server, srv.LowLevelLogger, error)
- func ObjHash(vars map[string]string, hashPathPrefix string, hashPathSuffix string) string
- func ObjHashDir(vars map[string]string, driveRoot string, hashPathPrefix string, ...) string
- func ObjectFiles(directory string) (string, string)
- func ObjectMetadata(dataFile string, metaFile string) (map[string]string, error)
- func OneTimeChan() chan time.Time
- func OpenObjectMetadata(fd uintptr, metaFile string) (map[string]string, error)
- func PolicyDir(policy int) string
- func QuarantineHash(hashDir string) error
- func RecalculateSuffixHash(suffixDir string, reclaimAge int64) (string, error)
- func RegisterObjectEngine(name string, newEngine ObjectEngineConstructor)
- func RestoreDevice(args []string, cnf srv.ConfigLoader)
- func SendPriRepJob(job *PriorityRepJob, client *http.Client) (string, bool)
- func TempDirPath(driveRoot string, device string) string
- func UnPolicyDir(dir string) (int, error)
- type Auditor
- type AuditorDaemon
- type BeginReplicationRequest
- type BeginReplicationResponse
- type DeviceStats
- type ECAuditFunc
- type FileUploadResponse
- type IndexDB
- func (ot *IndexDB) Close()
- func (ot *IndexDB) Commit(f fs.AtomicFileWriter, hsh string, shard int, timestamp int64, method string, ...) error
- func (ot *IndexDB) List(startHash, stopHash, marker string, limit int) ([]*IndexDBItem, error)
- func (ot *IndexDB) ListObjectsToStabilize() ([]*IndexDBItem, error)
- func (ot *IndexDB) Lookup(hsh string, shard int, justStable bool) (*IndexDBItem, error)
- func (ot *IndexDB) Remove(hsh string, shard int, timestamp int64, nursery bool) error
- func (ot *IndexDB) RingPartRange(ringPart int) (string, string)
- func (ot *IndexDB) SetRestablized(hsh string, shard int, timestamp int64) error
- func (ot *IndexDB) TempFile(hsh string, shard int, timestamp int64, sizeHint int64, nursery bool) (fs.AtomicFileWriter, error)
- func (ot *IndexDB) WholeObjectPath(hsh string, shard int, timestamp int64, nursery bool) (string, error)
- type IndexDBItem
- type NoMoreNodes
- type NurseryObjectEngine
- type Object
- type ObjectEngine
- type ObjectEngineConstructor
- type ObjectServer
- func (server *ObjectServer) AcquireDevice(next http.Handler) http.Handler
- func (server *ObjectServer) Background(flags *flag.FlagSet) chan struct{}
- func (server *ObjectServer) DiskUsageHandler(writer http.ResponseWriter, request *http.Request)
- func (server *ObjectServer) Finalize()
- func (server *ObjectServer) GetHandler(config conf.Config, metricsPrefix string) http.Handler
- func (server *ObjectServer) HealthcheckHandler(writer http.ResponseWriter, request *http.Request)
- func (server *ObjectServer) LogRequest(next http.Handler) http.Handler
- func (server *ObjectServer) ObjDeleteHandler(writer http.ResponseWriter, request *http.Request)
- func (server *ObjectServer) ObjGetHandler(writer http.ResponseWriter, request *http.Request)
- func (server *ObjectServer) ObjPostHandler(writer http.ResponseWriter, request *http.Request)
- func (server *ObjectServer) ObjPutHandler(writer http.ResponseWriter, request *http.Request)
- func (server *ObjectServer) OptionsHandler(writer http.ResponseWriter, request *http.Request)
- func (server *ObjectServer) ReconHandler(writer http.ResponseWriter, request *http.Request)
- func (server *ObjectServer) Type() string
- type ObjectStabilizer
- type PolicyHandlerRegistrator
- type PriorityRepJob
- type PriorityReplicationResult
- type RepConn
- type ReplicationDevice
- type Replicator
- func (server *Replicator) Background(flags *flag.FlagSet) chan struct{}
- func (server *Replicator) Finalize()
- func (r *Replicator) GetHandler(config conf.Config, metricsPrefix string) http.Handler
- func (server *Replicator) HealthcheckHandler(writer http.ResponseWriter, request *http.Request)
- func (r *Replicator) LogRequest(next http.Handler) http.Handler
- func (r *Replicator) ProgressReportHandler(w http.ResponseWriter, req *http.Request)
- func (r *Replicator) Run()
- func (r *Replicator) RunForever()
- func (server *Replicator) Type() string
- type SwiftEngine
- type SwiftObject
- func (o *SwiftObject) Close() error
- func (o *SwiftObject) Commit(metadata map[string]string) error
- func (o *SwiftObject) CommitMetadata(metadata map[string]string) error
- func (o *SwiftObject) ContentLength() int64
- func (o *SwiftObject) Copy(dsts ...io.Writer) (written int64, err error)
- func (o *SwiftObject) CopyRange(w io.Writer, start int64, end int64) (int64, error)
- func (o *SwiftObject) Delete(metadata map[string]string) error
- func (o *SwiftObject) Exists() bool
- func (o *SwiftObject) Metadata() map[string]string
- func (o *SwiftObject) Quarantine() error
- func (o *SwiftObject) Repr() string
- func (o *SwiftObject) SetData(size int64) (io.Writer, error)
- type SyncFileRequest
- type SyncFileResponse
Constants ¶
This section is empty.
Variables ¶
var AuditForeverInterval = 30 * time.Second
AuditForeverInterval represents how often a auditor check should be performed.
var DriveFullError = errors.New("Drive Full")
DriveFullError can be returned by Object.SetData and Object.Delete if the disk is too full for the operation.
var LockPathError = errors.New("Error locking path")
var PathNotDirError = errors.New("Path is not a directory")
var RepUnmountedError = fmt.Errorf("Device unmounted")
Functions ¶
func HashCleanupListDir ¶
func InvalidateHash ¶
InvalidateHash invalidates the hashdir's suffix hash, indicating it needs to be recalculated.
func MetadataHash ¶
MetadataHash returns a hash of the contents of the metadata.
func MetadataMerge ¶
MetadataMerge will return the result of merging the a and b metadata sets; neither a nor b should be used after calling this method.
func MoveParts ¶
func MoveParts(args []string, cnf srv.ConfigLoader)
MoveParts takes two object .ring.gz files as []string{oldRing, newRing} and dispatches priority replication jobs to rebalance data in line with any ring changes.
func NewReplicator ¶
func ObjHashDir ¶
func ObjectFiles ¶
func ObjectMetadata ¶
func OneTimeChan ¶
OneTimeChan returns a channel that will yield the current time once, then is closed.
func OpenObjectMetadata ¶
func QuarantineHash ¶
func RecalculateSuffixHash ¶
func RegisterObjectEngine ¶
func RegisterObjectEngine(name string, newEngine ObjectEngineConstructor)
RegisterObjectEngine lets you tell hummingbird about a new object engine.
func RestoreDevice ¶
func RestoreDevice(args []string, cnf srv.ConfigLoader)
RestoreDevice takes an IP address and device name such as []string{"172.24.0.1", "sda1"} and attempts to restores its data from peers.
func SendPriRepJob ¶
func SendPriRepJob(job *PriorityRepJob, client *http.Client) (string, bool)
func TempDirPath ¶
func UnPolicyDir ¶
Types ¶
type Auditor ¶
type Auditor struct { *AuditorDaemon // contains filtered or unexported fields }
Auditor keeps track of general audit data.
type AuditorDaemon ¶
type AuditorDaemon struct {
// contains filtered or unexported fields
}
AuditorDaemon keeps track of object specific audit data.
func NewAuditorDaemon ¶
func NewAuditorDaemon(serverconf conf.Config, flags *flag.FlagSet, cnf srv.ConfigLoader) (*AuditorDaemon, error)
NewAuditor returns a new AuditorDaemon with the given conf.
func (*AuditorDaemon) RunForever ¶
func (d *AuditorDaemon) RunForever()
RunForever triggering audit passes every time AuditForeverInterval has passed.
type BeginReplicationRequest ¶
type DeviceStats ¶
type DeviceStats struct { Stats map[string]int64 LastCheckin time.Time PassStarted time.Time DeviceStarted time.Time LastPassFinishDate time.Time LastPassDuration time.Duration CancelCount int64 FilesSent int64 BytesSent int64 PartitionsDone int64 PartitionsTotal int64 TotalPasses int64 PriorityRepsDone int64 }
type ECAuditFunc ¶
type ECAuditFunc interface {
AuditEcObj(path string, item *IndexDBItem, md5BytesPerSec int64) (int64, error)
}
type FileUploadResponse ¶
type IndexDB ¶
type IndexDB struct { RingPartPower uint // GLH: Temp exported for fakelist // contains filtered or unexported fields }
IndexDB will track a set of objects.
This is the "index.db" per disk. Right now it just handles whole objects, but eventually we'd like to add either slab support or direct database embedding for small objects. But, those details should be transparent from users of a IndexDB.
This is different from the standard Swift full replica object tracking in that the directory structure is much shallower, there are a configurable number of databases per drive instead of a ton of hashes.pkl files, and the version tracking / consolidation is much simpler.
The IndexDB stores the newest object contents it knows about and discards any older ones, like the standard Swift's .data files. It does not have .meta files at all, and certainly not stacked to infinity .meta files. Instead the metadata is stored in the database as JSON.
A given IndexDB may not even store any metadata, such as in an EC system, with just "key" IndexDBs storing the metadata.
func NewIndexDB ¶
func NewIndexDB(dbpath, filepath, temppath string, ringPartPower, dbPartPower, subdirs int, reserve int64, logger *zap.Logger) (*IndexDB, error)
NewIndexDB creates a IndexDB to manage a set of objects.
The ringPartPower is defined by the ring in use, but should be greater than the dbPartPower. The dbPartPower will define how many databases are created (e.g. dbPartPower = 6 gives 64 databases). The subdirs value will define how many subdirectories are created where object content files are placed.
func (*IndexDB) Close ¶
func (ot *IndexDB) Close()
Close closes all the underlying databases for the IndexDB; you should discard the IndexDB instance after this call.
func (*IndexDB) Commit ¶
func (ot *IndexDB) Commit(f fs.AtomicFileWriter, hsh string, shard int, timestamp int64, method string, metahash string, metadata []byte, nursery bool, shardhash string) error
Commit moves the temporary file (from TempFile) into place and records its information in the database. It may actually discard it completely if there is already a newer object information in place for the hash:shard.
Shard is mostly for EC type policies; just use 0 if you're using a full replica policy.
Timestamp is the timestamp for the object contents, not necessarily the metadata.
func (*IndexDB) List ¶
func (ot *IndexDB) List(startHash, stopHash, marker string, limit int) ([]*IndexDBItem, error)
List returns the items for the ringPart given.
This is for replication, auditing, that sort of thing.
func (*IndexDB) ListObjectsToStabilize ¶
func (ot *IndexDB) ListObjectsToStabilize() ([]*IndexDBItem, error)
ListObjectsToStabilize lists all objects that are in the nursery or set to restabilzed
func (*IndexDB) Lookup ¶
Lookup returns the stored information for the hsh and shard. Will return (nil, error) if there is an error. (nil, nil) if not found
func (*IndexDB) SetRestablized ¶
func (*IndexDB) TempFile ¶
func (ot *IndexDB) TempFile(hsh string, shard int, timestamp int64, sizeHint int64, nursery bool) (fs.AtomicFileWriter, error)
TempFile returns a temporary file to write to for eventually adding the hash:shard to the IndexDB with Commit; may return (nil, nil) if there is already a newer or equal timestamp in place for the hash:shard.
type IndexDBItem ¶
type IndexDBItem struct { Hash string Shard int Timestamp int64 Metahash string `json:"-"` Nursery bool Metabytes []byte `json:"-"` Deletion bool Path string ShardHash string Restabilize bool }
IndexDBItem is a single item returned by List.
type NoMoreNodes ¶
type NoMoreNodes struct{}
func (*NoMoreNodes) Next ¶
func (n *NoMoreNodes) Next() *ring.Device
type NurseryObjectEngine ¶
type NurseryObjectEngine interface { ObjectEngine GetObjectsToStabilize(device string, c chan ObjectStabilizer, cancel chan struct{}) GetObjectsToReplicate(prirep PriorityRepJob, c chan ObjectStabilizer, cancel chan struct{}) }
type Object ¶
type Object interface { // Exists determines whether or not there is an object to serve. Deleted objects do not exist, even if there is a tombstone. Exists() bool // Quarantine removes the file's data, presumably after determining it's been corrupted. Quarantine() error // Metadata returns the object's metadata. Will be nil if the object doesn't exist. Metadata() map[string]string // ContentLength returns the object's content-length. ContentLength() int64 // CopyRange copies a range of data from the object to the writer. CopyRange(io.Writer, int64, int64) (int64, error) // Copy copies an object's entire contents to the writer(s). Copy(...io.Writer) (int64, error) // SetData sets the data for the object, given the size (if known). It returns a writer and an error if any. SetData(size int64) (io.Writer, error) // Commit saves a new object data that was started with SetData. Commit(metadata map[string]string) error // CommitMetadata updates the object's metadata. CommitMetadata(metadata map[string]string) error // Delete deletes the object. Delete(metadata map[string]string) error // Close releases any resources held by the Object instance. Close() error // Repr returns a representation of the object, used for logging. Repr() string }
type ObjectEngine ¶
type ObjectEngine interface { // New creates a new instance of the Object, for interacting with a single object. New(vars map[string]string, needData bool, asyncWG *sync.WaitGroup) (Object, error) GetReplicationDevice(oring ring.Ring, dev *ring.Device, policy int, r *Replicator) (ReplicationDevice, error) }
ObjectEngine is the type you have to give hummingbird to create a new object engine.
func SwiftEngineConstructor ¶
func SwiftEngineConstructor(config conf.Config, policy *conf.Policy, flags *flag.FlagSet) (ObjectEngine, error)
SwiftEngineConstructor creates a SwiftEngine given the object server configs.
type ObjectEngineConstructor ¶
ObjectEngineConstructor> is a function that, given configs and flags, returns an ObjectEngine
func FindEngine ¶
func FindEngine(name string) (ObjectEngineConstructor, error)
FindEngine returns the registered object engine with the given name.
type ObjectServer ¶
type ObjectServer struct {
// contains filtered or unexported fields
}
func (*ObjectServer) AcquireDevice ¶
func (server *ObjectServer) AcquireDevice(next http.Handler) http.Handler
func (*ObjectServer) Background ¶
func (server *ObjectServer) Background(flags *flag.FlagSet) chan struct{}
func (*ObjectServer) DiskUsageHandler ¶
func (server *ObjectServer) DiskUsageHandler(writer http.ResponseWriter, request *http.Request)
func (*ObjectServer) Finalize ¶
func (server *ObjectServer) Finalize()
func (*ObjectServer) GetHandler ¶
func (*ObjectServer) HealthcheckHandler ¶
func (server *ObjectServer) HealthcheckHandler(writer http.ResponseWriter, request *http.Request)
func (*ObjectServer) LogRequest ¶
func (server *ObjectServer) LogRequest(next http.Handler) http.Handler
func (*ObjectServer) ObjDeleteHandler ¶
func (server *ObjectServer) ObjDeleteHandler(writer http.ResponseWriter, request *http.Request)
func (*ObjectServer) ObjGetHandler ¶
func (server *ObjectServer) ObjGetHandler(writer http.ResponseWriter, request *http.Request)
func (*ObjectServer) ObjPostHandler ¶
func (server *ObjectServer) ObjPostHandler(writer http.ResponseWriter, request *http.Request)
func (*ObjectServer) ObjPutHandler ¶
func (server *ObjectServer) ObjPutHandler(writer http.ResponseWriter, request *http.Request)
func (*ObjectServer) OptionsHandler ¶
func (server *ObjectServer) OptionsHandler(writer http.ResponseWriter, request *http.Request)
func (*ObjectServer) ReconHandler ¶
func (server *ObjectServer) ReconHandler(writer http.ResponseWriter, request *http.Request)
func (*ObjectServer) Type ¶
func (server *ObjectServer) Type() string
type ObjectStabilizer ¶
type PolicyHandlerRegistrator ¶
type PolicyHandlerRegistrator interface {
RegisterHandlers(addRoute func(method, path string, handler http.HandlerFunc))
}
type PriorityRepJob ¶
type RepConn ¶
type RepConn interface { SendMessage(v interface{}) error RecvMessage(v interface{}) error Write(data []byte) (l int, err error) Flush() error Read(data []byte) (l int, err error) Disconnected() bool Close() }
func NewIncomingRepConn ¶
type ReplicationDevice ¶
type ReplicationDevice interface { Replicate() ReplicateLoop() Key() string Cancel() PriorityReplicate(w http.ResponseWriter, pri PriorityRepJob) UpdateStat(string, int64) }
func GetNurseryDevice ¶
func GetNurseryDevice(oring ring.Ring, dev *ring.Device, policy int, r *Replicator, f NurseryObjectEngine) (ReplicationDevice, error)
type Replicator ¶
Object replicator daemon object
func (*Replicator) Background ¶
func (server *Replicator) Background(flags *flag.FlagSet) chan struct{}
func (*Replicator) Finalize ¶
func (server *Replicator) Finalize()
func (*Replicator) GetHandler ¶
func (*Replicator) HealthcheckHandler ¶
func (server *Replicator) HealthcheckHandler(writer http.ResponseWriter, request *http.Request)
func (*Replicator) LogRequest ¶
func (r *Replicator) LogRequest(next http.Handler) http.Handler
func (*Replicator) ProgressReportHandler ¶
func (r *Replicator) ProgressReportHandler(w http.ResponseWriter, req *http.Request)
ProgressReportHandler handles HTTP requests for current replication progress
func (*Replicator) Run ¶
func (r *Replicator) Run()
Run a single replication pass. (NOTE: we will prob get rid of this because of priorityRepl)
func (*Replicator) RunForever ¶
func (r *Replicator) RunForever()
Run replication passes in a loop until forever.
func (*Replicator) Type ¶
func (server *Replicator) Type() string
type SwiftEngine ¶
type SwiftEngine struct {
// contains filtered or unexported fields
}
func (*SwiftEngine) GetReplicationDevice ¶
func (f *SwiftEngine) GetReplicationDevice(oring ring.Ring, dev *ring.Device, policy int, r *Replicator) (ReplicationDevice, error)
func (*SwiftEngine) New ¶
func (f *SwiftEngine) New(vars map[string]string, needData bool, asyncWG *sync.WaitGroup) (Object, error)
New returns an instance of SwiftObject with the given parameters. Metadata is read in and if needData is true, the file is opened. AsyncWG is a waitgroup if the object spawns any async operations
type SwiftObject ¶
type SwiftObject struct {
// contains filtered or unexported fields
}
SwiftObject implements an Object that is compatible with Swift's object server.
func (*SwiftObject) Close ¶
func (o *SwiftObject) Close() error
Close releases any resources used by the instance of SwiftObject
func (*SwiftObject) Commit ¶
func (o *SwiftObject) Commit(metadata map[string]string) error
Commit commits an open data file to disk, given the metadata.
func (*SwiftObject) CommitMetadata ¶
func (o *SwiftObject) CommitMetadata(metadata map[string]string) error
func (*SwiftObject) ContentLength ¶
func (o *SwiftObject) ContentLength() int64
ContentLength parses and returns the Content-Length for the object.
func (*SwiftObject) Copy ¶
func (o *SwiftObject) Copy(dsts ...io.Writer) (written int64, err error)
Copy copies all data from the underlying .data file to the given writers.
func (*SwiftObject) CopyRange ¶
CopyRange copies data in the range of start to end from the underlying .data file to the writer.
func (*SwiftObject) Delete ¶
func (o *SwiftObject) Delete(metadata map[string]string) error
Delete deletes the object.
func (*SwiftObject) Exists ¶
func (o *SwiftObject) Exists() bool
Exists returns true if the object exists, that is if it has a .data file.
func (*SwiftObject) Metadata ¶
func (o *SwiftObject) Metadata() map[string]string
Metadata returns the object's metadata.
func (*SwiftObject) Quarantine ¶
func (o *SwiftObject) Quarantine() error
Quarantine removes the object's underlying files to the Quarantined directory on the device.
func (*SwiftObject) Repr ¶
func (o *SwiftObject) Repr() string
Repr returns a string that identifies the object in some useful way, used for logging.