Documentation ¶
Overview ¶
Package objectserver provides a Object Server implementation.
Hummingbird Replication ¶
The hummingbird object server is backwards-compatible with the python-swift replicator, but hummingbird-to-hummingbird replication uses its own simple protocol that it sends over a hijacked HTTP connection.
Messages are 32-bit length prefixed JSON serialized structures. The basic flow looks like:
replicator sends a BeginReplicationRequest{Device string, Partition string, NeedHashes bool} server responds with a BeginReplicationResponse{Hashes map[string]string} for each object file in the partition where suffix hash doesn't match { replicator sends a SyncFileRequest{Path string, Xattrs string, Size int} server responds with a SyncFileResponse{Exists bool, NewerExists bool, GoAhead bool} if response.GoAhead is true { replicator sends raw file body server responds with a FileUploadResponse{Success bool} } }
The replicator limits concurrency per-device and overall. When the server gets a BeginReplicationRequest, it'll wait up to 60 seconds for a slot to open up before rejecting it.
Unlike python-swift, the replicator will only read each filesystem once per pass.
Index ¶
- Constants
- Variables
- func AsyncDir(policy int) string
- func GetHashes(driveRoot string, device string, partition string, recalculate []string, ...) (map[string]string, error)
- func HashCleanupListDir(hashDir string, reclaimAge int64) ([]string, error)
- func InvalidateHash(hashDir string) error
- func MoveParts(args []string, cnf srv.ConfigLoader)
- func NewReplicator(serverconf conf.Config, flags *flag.FlagSet, cnf srv.ConfigLoader) (ipPort *srv.IpPort, server srv.Server, logger srv.LowLevelLogger, err error)
- func NewServer(serverconf conf.Config, flags *flag.FlagSet, cnf srv.ConfigLoader) (*srv.IpPort, srv.Server, srv.LowLevelLogger, error)
- func ObjHash(vars map[string]string, hashPathPrefix string, hashPathSuffix string) string
- func ObjHashDir(vars map[string]string, driveRoot string, hashPathPrefix string, ...) string
- func ObjectFiles(directory string) (string, string)
- func ObjectMetadata(dataFile string, metaFile string) (map[string]string, error)
- func OneTimeChan() chan time.Time
- func OpenObjectMetadata(fd uintptr, metaFile string) (map[string]string, error)
- func PolicyDir(policy int) string
- func QuarantineHash(hashDir string) error
- func RawReadMetadata(fileNameOrFd interface{}) ([]byte, error)
- func RawWriteMetadata(fd uintptr, buf []byte) error
- func ReadMetadata(fileNameOrFd interface{}) (map[string]string, error)
- func RecalculateSuffixHash(suffixDir string, reclaimAge int64) (string, error)
- func RegisterObjectEngine(name string, newEngine ObjectEngineConstructor)
- func RescueParts(args []string, cnf srv.ConfigLoader)
- func RestoreDevice(args []string, cnf srv.ConfigLoader)
- func SendPriRepJob(job *PriorityRepJob, client *http.Client) (string, bool)
- func TempDirPath(driveRoot string, device string) string
- func UnPolicyDir(dir string) (int, error)
- func WriteMetadata(fd uintptr, v map[string]string) error
- type Auditor
- type AuditorDaemon
- type BeginReplicationRequest
- type BeginReplicationResponse
- type DeviceStats
- type FileUploadResponse
- type NoMoreNodes
- type NurseryObjectEngine
- type Object
- type ObjectEngine
- type ObjectEngineConstructor
- type ObjectServer
- func (server *ObjectServer) AcquireDevice(next http.Handler) http.Handler
- func (server *ObjectServer) Background(flags *flag.FlagSet) chan struct{}
- func (server *ObjectServer) DiskUsageHandler(writer http.ResponseWriter, request *http.Request)
- func (server *ObjectServer) Finalize()
- func (server *ObjectServer) GetHandler(config conf.Config, metricsPrefix string) http.Handler
- func (server *ObjectServer) HealthcheckHandler(writer http.ResponseWriter, request *http.Request)
- func (server *ObjectServer) LogRequest(next http.Handler) http.Handler
- func (server *ObjectServer) ObjDeleteHandler(writer http.ResponseWriter, request *http.Request)
- func (server *ObjectServer) ObjGetHandler(writer http.ResponseWriter, request *http.Request)
- func (server *ObjectServer) ObjPostHandler(writer http.ResponseWriter, request *http.Request)
- func (server *ObjectServer) ObjPutHandler(writer http.ResponseWriter, request *http.Request)
- func (server *ObjectServer) OptionsHandler(writer http.ResponseWriter, request *http.Request)
- func (server *ObjectServer) ReconHandler(writer http.ResponseWriter, request *http.Request)
- func (server *ObjectServer) Type() string
- type ObjectStabilizer
- type PolicyHandlerRegistrator
- type PriorityRepJob
- type RepConn
- type ReplicationDevice
- type Replicator
- func (server *Replicator) Background(flags *flag.FlagSet) chan struct{}
- func (server *Replicator) Finalize()
- func (r *Replicator) GetHandler(config conf.Config, metricsPrefix string) http.Handler
- func (server *Replicator) HealthcheckHandler(writer http.ResponseWriter, request *http.Request)
- func (r *Replicator) LogRequest(next http.Handler) http.Handler
- func (r *Replicator) ProgressReportHandler(w http.ResponseWriter, req *http.Request)
- func (r *Replicator) Run()
- func (r *Replicator) RunForever()
- func (server *Replicator) Type() string
- type SwiftEngine
- type SwiftObject
- func (o *SwiftObject) Close() error
- func (o *SwiftObject) Commit(metadata map[string]string) error
- func (o *SwiftObject) CommitMetadata(metadata map[string]string) error
- func (o *SwiftObject) ContentLength() int64
- func (o *SwiftObject) Copy(dsts ...io.Writer) (written int64, err error)
- func (o *SwiftObject) CopyRange(w io.Writer, start int64, end int64) (int64, error)
- func (o *SwiftObject) Delete(metadata map[string]string) error
- func (o *SwiftObject) Exists() bool
- func (o *SwiftObject) Metadata() map[string]string
- func (o *SwiftObject) Quarantine() error
- func (o *SwiftObject) Repr() string
- func (o *SwiftObject) SetData(size int64) (io.Writer, error)
- type SyncFileRequest
- type SyncFileResponse
Constants ¶
const METADATA_CHUNK_SIZE = 65536
Variables ¶
var AuditForeverInterval = 30 * time.Second
AuditForeverInterval represents how often a auditor check should be performed.
var DriveFullError = errors.New("Drive Full")
DriveFullError can be returned by Object.SetData and Object.Delete if the disk is too full for the operation.
var LockPathError = errors.New("Error locking path")
var PathNotDirError = errors.New("Path is not a directory")
var RepUnmountedError = fmt.Errorf("Device unmounted")
Functions ¶
func HashCleanupListDir ¶
func InvalidateHash ¶
InvalidateHash invalidates the hashdir's suffix hash, indicating it needs to be recalculated.
func MoveParts ¶
func MoveParts(args []string, cnf srv.ConfigLoader)
MoveParts takes two object .ring.gz files as []string{oldRing, newRing} and dispatches priority replication jobs to rebalance data in line with any ring changes.
func NewReplicator ¶
func ObjHashDir ¶
func ObjectFiles ¶
func ObjectMetadata ¶
func OneTimeChan ¶
OneTimeChan returns a channel that will yield the current time once, then is closed.
func OpenObjectMetadata ¶
func QuarantineHash ¶
func RawReadMetadata ¶
func RawWriteMetadata ¶
func ReadMetadata ¶
func RecalculateSuffixHash ¶
func RegisterObjectEngine ¶
func RegisterObjectEngine(name string, newEngine ObjectEngineConstructor)
RegisterObjectEngine lets you tell hummingbird about a new object engine.
func RescueParts ¶
func RescueParts(args []string, cnf srv.ConfigLoader)
func RestoreDevice ¶
func RestoreDevice(args []string, cnf srv.ConfigLoader)
RestoreDevice takes an IP address and device name such as []string{"172.24.0.1", "sda1"} and attempts to restores its data from peers.
func SendPriRepJob ¶ added in v0.0.2
func SendPriRepJob(job *PriorityRepJob, client *http.Client) (string, bool)
func TempDirPath ¶
func UnPolicyDir ¶
Types ¶
type Auditor ¶
type Auditor struct { *AuditorDaemon // contains filtered or unexported fields }
Auditor keeps track of general audit data.
type AuditorDaemon ¶
type AuditorDaemon struct {
// contains filtered or unexported fields
}
AuditorDaemon keeps track of object specific audit data.
func NewAuditorDaemon ¶ added in v0.0.2
func NewAuditorDaemon(serverconf conf.Config, flags *flag.FlagSet, cnf srv.ConfigLoader) (*AuditorDaemon, error)
NewAuditor returns a new AuditorDaemon with the given conf.
func (*AuditorDaemon) RunForever ¶
func (d *AuditorDaemon) RunForever()
RunForever triggering audit passes every time AuditForeverInterval has passed.
type BeginReplicationRequest ¶
type DeviceStats ¶ added in v0.0.2
type DeviceStats struct { Stats map[string]int64 LastCheckin time.Time PassStarted time.Time DeviceStarted time.Time LastPassFinishDate time.Time LastPassDuration time.Duration CancelCount int64 FilesSent int64 BytesSent int64 PartitionsDone int64 PartitionsTotal int64 TotalPasses int64 PriorityRepsDone int64 }
type FileUploadResponse ¶
type NoMoreNodes ¶
type NoMoreNodes struct{}
func (*NoMoreNodes) Next ¶
func (n *NoMoreNodes) Next() *ring.Device
type NurseryObjectEngine ¶ added in v0.0.2
type NurseryObjectEngine interface { ObjectEngine GetNurseryObjects(device string, c chan ObjectStabilizer, cancel chan struct{}) }
type Object ¶
type Object interface { // Exists determines whether or not there is an object to serve. Deleted objects do not exist, even if there is a tombstone. Exists() bool // Quarantine removes the file's data, presumably after determining it's been corrupted. Quarantine() error // Metadata returns the object's metadata. Will be nil if the object doesn't exist. Metadata() map[string]string // ContentLength returns the object's content-length. ContentLength() int64 // CopyRange copies a range of data from the object to the writer. CopyRange(io.Writer, int64, int64) (int64, error) // Copy copies an object's entire contents to the writer(s). Copy(...io.Writer) (int64, error) // SetData sets the data for the object, given the size (if known). It returns a writer and an error if any. SetData(size int64) (io.Writer, error) // Commit saves a new object data that was started with SetData. Commit(metadata map[string]string) error // CommitMetadata updates the object's metadata. CommitMetadata(metadata map[string]string) error // Delete deletes the object. Delete(metadata map[string]string) error // Close releases any resources held by the Object instance. Close() error // Repr returns a representation of the object, used for logging. Repr() string }
type ObjectEngine ¶
type ObjectEngine interface { // New creates a new instance of the Object, for interacting with a single object. New(vars map[string]string, needData bool, asyncWG *sync.WaitGroup) (Object, error) }
ObjectEngine is the type you have to give hummingbird to create a new object engine.
func SwiftEngineConstructor ¶
func SwiftEngineConstructor(config conf.Config, policy *conf.Policy, flags *flag.FlagSet) (ObjectEngine, error)
SwiftEngineConstructor creates a SwiftEngine given the object server configs.
type ObjectEngineConstructor ¶
ObjectEngineConstructor> is a function that, given configs and flags, returns an ObjectEngine
func FindEngine ¶
func FindEngine(name string) (ObjectEngineConstructor, error)
FindEngine returns the registered object engine with the given name.
type ObjectServer ¶
type ObjectServer struct {
// contains filtered or unexported fields
}
func (*ObjectServer) AcquireDevice ¶
func (server *ObjectServer) AcquireDevice(next http.Handler) http.Handler
func (*ObjectServer) Background ¶ added in v0.0.2
func (server *ObjectServer) Background(flags *flag.FlagSet) chan struct{}
func (*ObjectServer) DiskUsageHandler ¶
func (server *ObjectServer) DiskUsageHandler(writer http.ResponseWriter, request *http.Request)
func (*ObjectServer) Finalize ¶
func (server *ObjectServer) Finalize()
func (*ObjectServer) GetHandler ¶
func (*ObjectServer) HealthcheckHandler ¶
func (server *ObjectServer) HealthcheckHandler(writer http.ResponseWriter, request *http.Request)
func (*ObjectServer) LogRequest ¶
func (server *ObjectServer) LogRequest(next http.Handler) http.Handler
func (*ObjectServer) ObjDeleteHandler ¶
func (server *ObjectServer) ObjDeleteHandler(writer http.ResponseWriter, request *http.Request)
func (*ObjectServer) ObjGetHandler ¶
func (server *ObjectServer) ObjGetHandler(writer http.ResponseWriter, request *http.Request)
func (*ObjectServer) ObjPostHandler ¶
func (server *ObjectServer) ObjPostHandler(writer http.ResponseWriter, request *http.Request)
func (*ObjectServer) ObjPutHandler ¶
func (server *ObjectServer) ObjPutHandler(writer http.ResponseWriter, request *http.Request)
func (*ObjectServer) OptionsHandler ¶
func (server *ObjectServer) OptionsHandler(writer http.ResponseWriter, request *http.Request)
func (*ObjectServer) ReconHandler ¶
func (server *ObjectServer) ReconHandler(writer http.ResponseWriter, request *http.Request)
func (*ObjectServer) Type ¶ added in v0.0.2
func (server *ObjectServer) Type() string
type ObjectStabilizer ¶ added in v0.0.2
type PolicyHandlerRegistrator ¶ added in v0.0.2
type PolicyHandlerRegistrator interface {
RegisterHandlers(addRoute func(method, path string, handler http.HandlerFunc))
}
type PriorityRepJob ¶
type RepConn ¶
type RepConn interface { SendMessage(v interface{}) error RecvMessage(v interface{}) error Write(data []byte) (l int, err error) Flush() error Read(data []byte) (l int, err error) Disconnected() bool Close() }
func NewIncomingRepConn ¶
type ReplicationDevice ¶
type ReplicationDevice interface { Replicate() ReplicateLoop() Key() string Cancel() PriorityReplicate(pri PriorityRepJob, timeout time.Duration) bool }
type Replicator ¶
Object replicator daemon object
func (*Replicator) Background ¶ added in v0.0.2
func (server *Replicator) Background(flags *flag.FlagSet) chan struct{}
func (*Replicator) Finalize ¶ added in v0.0.2
func (server *Replicator) Finalize()
func (*Replicator) GetHandler ¶
func (*Replicator) HealthcheckHandler ¶ added in v0.0.2
func (server *Replicator) HealthcheckHandler(writer http.ResponseWriter, request *http.Request)
func (*Replicator) LogRequest ¶
func (r *Replicator) LogRequest(next http.Handler) http.Handler
func (*Replicator) ProgressReportHandler ¶
func (r *Replicator) ProgressReportHandler(w http.ResponseWriter, req *http.Request)
ProgressReportHandler handles HTTP requests for current replication progress
func (*Replicator) Run ¶
func (r *Replicator) Run()
Run a single replication pass. (NOTE: we will prob get rid of this because of priorityRepl)
func (*Replicator) RunForever ¶
func (r *Replicator) RunForever()
Run replication passes in a loop until forever.
func (*Replicator) Type ¶ added in v0.0.2
func (server *Replicator) Type() string
type SwiftEngine ¶
type SwiftEngine struct {
// contains filtered or unexported fields
}
func (*SwiftEngine) New ¶
func (f *SwiftEngine) New(vars map[string]string, needData bool, asyncWG *sync.WaitGroup) (Object, error)
New returns an instance of SwiftObject with the given parameters. Metadata is read in and if needData is true, the file is opened. AsyncWG is a waitgroup if the object spawns any async operations
type SwiftObject ¶
type SwiftObject struct {
// contains filtered or unexported fields
}
SwiftObject implements an Object that is compatible with Swift's object server.
func (*SwiftObject) Close ¶
func (o *SwiftObject) Close() error
Close releases any resources used by the instance of SwiftObject
func (*SwiftObject) Commit ¶
func (o *SwiftObject) Commit(metadata map[string]string) error
Commit commits an open data file to disk, given the metadata.
func (*SwiftObject) CommitMetadata ¶ added in v0.0.2
func (o *SwiftObject) CommitMetadata(metadata map[string]string) error
func (*SwiftObject) ContentLength ¶
func (o *SwiftObject) ContentLength() int64
ContentLength parses and returns the Content-Length for the object.
func (*SwiftObject) Copy ¶
func (o *SwiftObject) Copy(dsts ...io.Writer) (written int64, err error)
Copy copies all data from the underlying .data file to the given writers.
func (*SwiftObject) CopyRange ¶
CopyRange copies data in the range of start to end from the underlying .data file to the writer.
func (*SwiftObject) Delete ¶
func (o *SwiftObject) Delete(metadata map[string]string) error
Delete deletes the object.
func (*SwiftObject) Exists ¶
func (o *SwiftObject) Exists() bool
Exists returns true if the object exists, that is if it has a .data file.
func (*SwiftObject) Metadata ¶
func (o *SwiftObject) Metadata() map[string]string
Metadata returns the object's metadata.
func (*SwiftObject) Quarantine ¶
func (o *SwiftObject) Quarantine() error
Quarantine removes the object's underlying files to the Quarantined directory on the device.
func (*SwiftObject) Repr ¶
func (o *SwiftObject) Repr() string
Repr returns a string that identifies the object in some useful way, used for logging.