objectserver

package
v0.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 18, 2018 License: Apache-2.0 Imports: 45 Imported by: 0

Documentation

Overview

Package objectserver provides a Object Server implementation.

Hummingbird Replication

The hummingbird object server is backwards-compatible with the python-swift replicator, but hummingbird-to-hummingbird replication uses its own simple protocol that it sends over a hijacked HTTP connection.

Messages are 32-bit length prefixed JSON serialized structures. The basic flow looks like:

replicator sends a BeginReplicationRequest{Device string, Partition string, NeedHashes bool}
server responds with a BeginReplicationResponse{Hashes map[string]string}
for each object file in the partition where suffix hash doesn't match {
	replicator sends a SyncFileRequest{Path string, Xattrs string, Size int}
	server responds with a SyncFileResponse{Exists bool, NewerExists bool, GoAhead bool}
	if response.GoAhead is true {
		replicator sends raw file body
		server responds with a FileUploadResponse{Success bool}
	}
}

The replicator limits concurrency per-device and overall. When the server gets a BeginReplicationRequest, it'll wait up to 60 seconds for a slot to open up before rejecting it.

Unlike python-swift, the replicator will only read each filesystem once per pass.

Index

Constants

View Source
const METADATA_CHUNK_SIZE = 65536

Variables

View Source
var AuditForeverInterval = 30 * time.Second

AuditForeverInterval represents how often a auditor check should be performed.

View Source
var DriveFullError = errors.New("Drive Full")

DriveFullError can be returned by Object.SetData and Object.Delete if the disk is too full for the operation.

View Source
var LockPathError = errors.New("Error locking path")
View Source
var PathNotDirError = errors.New("Path is not a directory")
View Source
var RepUnmountedError = fmt.Errorf("Device unmounted")

Functions

func AsyncDir added in v0.0.2

func AsyncDir(policy int) string

func GetHashes

func GetHashes(driveRoot string, device string, partition string, recalculate []string, reclaimAge int64, policy int, logger srv.LowLevelLogger) (map[string]string, error)

func HashCleanupListDir

func HashCleanupListDir(hashDir string, reclaimAge int64) ([]string, error)

func InvalidateHash

func InvalidateHash(hashDir string) error

InvalidateHash invalidates the hashdir's suffix hash, indicating it needs to be recalculated.

func MoveParts

func MoveParts(args []string, cnf srv.ConfigLoader)

MoveParts takes two object .ring.gz files as []string{oldRing, newRing} and dispatches priority replication jobs to rebalance data in line with any ring changes.

func NewReplicator

func NewReplicator(serverconf conf.Config, flags *flag.FlagSet, cnf srv.ConfigLoader) (ipPort *srv.IpPort, server srv.Server, logger srv.LowLevelLogger, err error)

func NewServer added in v0.0.2

func NewServer(serverconf conf.Config, flags *flag.FlagSet, cnf srv.ConfigLoader) (*srv.IpPort, srv.Server, srv.LowLevelLogger, error)

func ObjHash added in v0.0.2

func ObjHash(vars map[string]string, hashPathPrefix string, hashPathSuffix string) string

func ObjHashDir

func ObjHashDir(vars map[string]string, driveRoot string, hashPathPrefix string, hashPathSuffix string, policy int) string

func ObjectFiles

func ObjectFiles(directory string) (string, string)

func ObjectMetadata

func ObjectMetadata(dataFile string, metaFile string) (map[string]string, error)

func OneTimeChan

func OneTimeChan() chan time.Time

OneTimeChan returns a channel that will yield the current time once, then is closed.

func OpenObjectMetadata

func OpenObjectMetadata(fd uintptr, metaFile string) (map[string]string, error)

func PolicyDir

func PolicyDir(policy int) string

func QuarantineHash

func QuarantineHash(hashDir string) error

func RawReadMetadata

func RawReadMetadata(fileNameOrFd interface{}) ([]byte, error)

func RawWriteMetadata

func RawWriteMetadata(fd uintptr, buf []byte) error

func ReadMetadata

func ReadMetadata(fileNameOrFd interface{}) (map[string]string, error)

func RecalculateSuffixHash

func RecalculateSuffixHash(suffixDir string, reclaimAge int64) (string, error)

func RegisterObjectEngine

func RegisterObjectEngine(name string, newEngine ObjectEngineConstructor)

RegisterObjectEngine lets you tell hummingbird about a new object engine.

func RescueParts

func RescueParts(args []string, cnf srv.ConfigLoader)

func RestoreDevice

func RestoreDevice(args []string, cnf srv.ConfigLoader)

RestoreDevice takes an IP address and device name such as []string{"172.24.0.1", "sda1"} and attempts to restores its data from peers.

func SendPriRepJob added in v0.0.2

func SendPriRepJob(job *PriorityRepJob, client *http.Client) (string, bool)

func TempDirPath

func TempDirPath(driveRoot string, device string) string

func UnPolicyDir

func UnPolicyDir(dir string) (int, error)

func WriteMetadata

func WriteMetadata(fd uintptr, v map[string]string) error

Types

type Auditor

type Auditor struct {
	*AuditorDaemon
	// contains filtered or unexported fields
}

Auditor keeps track of general audit data.

type AuditorDaemon

type AuditorDaemon struct {
	// contains filtered or unexported fields
}

AuditorDaemon keeps track of object specific audit data.

func NewAuditorDaemon added in v0.0.2

func NewAuditorDaemon(serverconf conf.Config, flags *flag.FlagSet, cnf srv.ConfigLoader) (*AuditorDaemon, error)

NewAuditor returns a new AuditorDaemon with the given conf.

func (*AuditorDaemon) Run

func (d *AuditorDaemon) Run()

Run a single audit pass.

func (*AuditorDaemon) RunForever

func (d *AuditorDaemon) RunForever()

RunForever triggering audit passes every time AuditForeverInterval has passed.

type BeginReplicationRequest

type BeginReplicationRequest struct {
	Device     string
	Partition  string
	NeedHashes bool
}

type BeginReplicationResponse

type BeginReplicationResponse struct {
	Hashes map[string]string
}

type DeviceStats added in v0.0.2

type DeviceStats struct {
	Stats              map[string]int64
	LastCheckin        time.Time
	PassStarted        time.Time
	DeviceStarted      time.Time
	LastPassFinishDate time.Time
	LastPassDuration   time.Duration
	CancelCount        int64
	FilesSent          int64
	BytesSent          int64
	PartitionsDone     int64
	PartitionsTotal    int64
	TotalPasses        int64
	PriorityRepsDone   int64
}

type FileUploadResponse

type FileUploadResponse struct {
	Success bool
	Msg     string
}

type NoMoreNodes

type NoMoreNodes struct{}

func (*NoMoreNodes) Next

func (n *NoMoreNodes) Next() *ring.Device

type NurseryObjectEngine added in v0.0.2

type NurseryObjectEngine interface {
	ObjectEngine
	GetNurseryObjects(device string, c chan ObjectStabilizer, cancel chan struct{})
}

type Object

type Object interface {
	// Exists determines whether or not there is an object to serve. Deleted objects do not exist, even if there is a tombstone.
	Exists() bool
	// Quarantine removes the file's data, presumably after determining it's been corrupted.
	Quarantine() error
	// Metadata returns the object's metadata.  Will be nil if the object doesn't exist.
	Metadata() map[string]string
	// ContentLength returns the object's content-length.
	ContentLength() int64
	// CopyRange copies a range of data from the object to the writer.
	CopyRange(io.Writer, int64, int64) (int64, error)
	// Copy copies an object's entire contents to the writer(s).
	Copy(...io.Writer) (int64, error)
	// SetData sets the data for the object, given the size (if known).  It returns a writer and an error if any.
	SetData(size int64) (io.Writer, error)
	// Commit saves a new object data that was started with SetData.
	Commit(metadata map[string]string) error
	// CommitMetadata updates the object's metadata.
	CommitMetadata(metadata map[string]string) error
	// Delete deletes the object.
	Delete(metadata map[string]string) error
	// Close releases any resources held by the Object instance.
	Close() error
	// Repr returns a representation of the object, used for logging.
	Repr() string
}

type ObjectEngine

type ObjectEngine interface {
	// New creates a new instance of the Object, for interacting with a single object.
	New(vars map[string]string, needData bool, asyncWG *sync.WaitGroup) (Object, error)
}

ObjectEngine is the type you have to give hummingbird to create a new object engine.

func SwiftEngineConstructor

func SwiftEngineConstructor(config conf.Config, policy *conf.Policy, flags *flag.FlagSet) (ObjectEngine, error)

SwiftEngineConstructor creates a SwiftEngine given the object server configs.

type ObjectEngineConstructor

type ObjectEngineConstructor func(conf.Config, *conf.Policy, *flag.FlagSet) (ObjectEngine, error)

ObjectEngineConstructor> is a function that, given configs and flags, returns an ObjectEngine

func FindEngine

func FindEngine(name string) (ObjectEngineConstructor, error)

FindEngine returns the registered object engine with the given name.

type ObjectServer

type ObjectServer struct {
	// contains filtered or unexported fields
}

func (*ObjectServer) AcquireDevice

func (server *ObjectServer) AcquireDevice(next http.Handler) http.Handler

func (*ObjectServer) Background added in v0.0.2

func (server *ObjectServer) Background(flags *flag.FlagSet) chan struct{}

func (*ObjectServer) DiskUsageHandler

func (server *ObjectServer) DiskUsageHandler(writer http.ResponseWriter, request *http.Request)

func (*ObjectServer) Finalize

func (server *ObjectServer) Finalize()

func (*ObjectServer) GetHandler

func (server *ObjectServer) GetHandler(config conf.Config, metricsPrefix string) http.Handler

func (*ObjectServer) HealthcheckHandler

func (server *ObjectServer) HealthcheckHandler(writer http.ResponseWriter, request *http.Request)

func (*ObjectServer) LogRequest

func (server *ObjectServer) LogRequest(next http.Handler) http.Handler

func (*ObjectServer) ObjDeleteHandler

func (server *ObjectServer) ObjDeleteHandler(writer http.ResponseWriter, request *http.Request)

func (*ObjectServer) ObjGetHandler

func (server *ObjectServer) ObjGetHandler(writer http.ResponseWriter, request *http.Request)

func (*ObjectServer) ObjPostHandler

func (server *ObjectServer) ObjPostHandler(writer http.ResponseWriter, request *http.Request)

func (*ObjectServer) ObjPutHandler

func (server *ObjectServer) ObjPutHandler(writer http.ResponseWriter, request *http.Request)

func (*ObjectServer) OptionsHandler

func (server *ObjectServer) OptionsHandler(writer http.ResponseWriter, request *http.Request)

func (*ObjectServer) ReconHandler

func (server *ObjectServer) ReconHandler(writer http.ResponseWriter, request *http.Request)

func (*ObjectServer) Type added in v0.0.2

func (server *ObjectServer) Type() string

type ObjectStabilizer added in v0.0.2

type ObjectStabilizer interface {
	Object
	// Stabilize object- move to stable location / erasure code / do nothing / etc
	Stabilize(ring.Ring, *ring.Device, int) error
}

type PolicyHandlerRegistrator added in v0.0.2

type PolicyHandlerRegistrator interface {
	RegisterHandlers(addRoute func(method, path string, handler http.HandlerFunc))
}

type PriorityRepJob

type PriorityRepJob struct {
	Partition  uint64         `json:"partition"`
	FromDevice *ring.Device   `json:"from_device"`
	ToDevices  []*ring.Device `json:"to_devices"`
	Policy     int            `json:"policy"`
}

type RepConn

type RepConn interface {
	SendMessage(v interface{}) error
	RecvMessage(v interface{}) error
	Write(data []byte) (l int, err error)
	Flush() error
	Read(data []byte) (l int, err error)
	Disconnected() bool
	Close()
}

func NewIncomingRepConn

func NewIncomingRepConn(rw *bufio.ReadWriter, c net.Conn, rcTimeout time.Duration) RepConn

func NewRepConn

func NewRepConn(dev *ring.Device, partition string, policy int, headers map[string]string, certFile, keyFile string, rcTimeout time.Duration) (RepConn, error)

type ReplicationDevice

type ReplicationDevice interface {
	Replicate()
	ReplicateLoop()
	Key() string
	Cancel()
	PriorityReplicate(pri PriorityRepJob, timeout time.Duration) bool
}

type Replicator

type Replicator struct {
	CertFile string
	KeyFile  string
	// contains filtered or unexported fields
}

Object replicator daemon object

func (*Replicator) Background added in v0.0.2

func (server *Replicator) Background(flags *flag.FlagSet) chan struct{}

func (*Replicator) Finalize added in v0.0.2

func (server *Replicator) Finalize()

func (*Replicator) GetHandler

func (r *Replicator) GetHandler(config conf.Config, metricsPrefix string) http.Handler

func (*Replicator) HealthcheckHandler added in v0.0.2

func (server *Replicator) HealthcheckHandler(writer http.ResponseWriter, request *http.Request)

func (*Replicator) LogRequest

func (r *Replicator) LogRequest(next http.Handler) http.Handler

func (*Replicator) ProgressReportHandler

func (r *Replicator) ProgressReportHandler(w http.ResponseWriter, req *http.Request)

ProgressReportHandler handles HTTP requests for current replication progress

func (*Replicator) Run

func (r *Replicator) Run()

Run a single replication pass. (NOTE: we will prob get rid of this because of priorityRepl)

func (*Replicator) RunForever

func (r *Replicator) RunForever()

Run replication passes in a loop until forever.

func (*Replicator) Type added in v0.0.2

func (server *Replicator) Type() string

type SwiftEngine

type SwiftEngine struct {
	// contains filtered or unexported fields
}

func (*SwiftEngine) New

func (f *SwiftEngine) New(vars map[string]string, needData bool, asyncWG *sync.WaitGroup) (Object, error)

New returns an instance of SwiftObject with the given parameters. Metadata is read in and if needData is true, the file is opened. AsyncWG is a waitgroup if the object spawns any async operations

type SwiftObject

type SwiftObject struct {
	// contains filtered or unexported fields
}

SwiftObject implements an Object that is compatible with Swift's object server.

func (*SwiftObject) Close

func (o *SwiftObject) Close() error

Close releases any resources used by the instance of SwiftObject

func (*SwiftObject) Commit

func (o *SwiftObject) Commit(metadata map[string]string) error

Commit commits an open data file to disk, given the metadata.

func (*SwiftObject) CommitMetadata added in v0.0.2

func (o *SwiftObject) CommitMetadata(metadata map[string]string) error

func (*SwiftObject) ContentLength

func (o *SwiftObject) ContentLength() int64

ContentLength parses and returns the Content-Length for the object.

func (*SwiftObject) Copy

func (o *SwiftObject) Copy(dsts ...io.Writer) (written int64, err error)

Copy copies all data from the underlying .data file to the given writers.

func (*SwiftObject) CopyRange

func (o *SwiftObject) CopyRange(w io.Writer, start int64, end int64) (int64, error)

CopyRange copies data in the range of start to end from the underlying .data file to the writer.

func (*SwiftObject) Delete

func (o *SwiftObject) Delete(metadata map[string]string) error

Delete deletes the object.

func (*SwiftObject) Exists

func (o *SwiftObject) Exists() bool

Exists returns true if the object exists, that is if it has a .data file.

func (*SwiftObject) Metadata

func (o *SwiftObject) Metadata() map[string]string

Metadata returns the object's metadata.

func (*SwiftObject) Quarantine

func (o *SwiftObject) Quarantine() error

Quarantine removes the object's underlying files to the Quarantined directory on the device.

func (*SwiftObject) Repr

func (o *SwiftObject) Repr() string

Repr returns a string that identifies the object in some useful way, used for logging.

func (*SwiftObject) SetData

func (o *SwiftObject) SetData(size int64) (io.Writer, error)

SetData is called to set the object's data. It takes a size (if available, otherwise set to zero).

type SyncFileRequest

type SyncFileRequest struct {
	Path   string
	Xattrs string
	Size   int64
	Check  bool
	Ping   bool
	Done   bool
}

type SyncFileResponse

type SyncFileResponse struct {
	Exists      bool
	NewerExists bool
	GoAhead     bool
	Msg         string
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL