objectserver

package
v0.0.0-...-6bbe1e0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 7, 2017 License: Apache-2.0 Imports: 33 Imported by: 0

Documentation

Overview

Package objectserver provides a Object Server implementation.

Hummingbird Replication

The hummingbird object server is backwards-compatible with the python-swift replicator, but hummingbird-to-hummingbird replication uses its own simple protocol that it sends over a hijacked HTTP connection.

Messages are 32-bit length prefixed JSON serialized structures. The basic flow looks like:

replicator sends a BeginReplicationRequest{Device string, Partition string, NeedHashes bool}
server responds with a BeginReplicationResponse{Hashes map[string]string}
for each object file in the partition where suffix hash doesn't match {
	replicator sends a SyncFileRequest{Path string, Xattrs string, Size int}
	server responds with a SyncFileResponse{Exists bool, NewerExists bool, GoAhead bool}
	if response.GoAhead is true {
		replicator sends raw file body
		server responds with a FileUploadResponse{Success bool}
	}
}

The replicator limits concurrency per-device and overall. When the server gets a BeginReplicationRequest, it'll wait up to 60 seconds for a slot to open up before rejecting it.

Unlike python-swift, the replicator will only read each filesystem once per pass.

Index

Constants

View Source
const METADATA_CHUNK_SIZE = 65536

Variables

View Source
var (
	StatsReportInterval    = 10 * time.Minute
	TmpEmptyTime           = 24 * time.Hour
	ReplicateDeviceTimeout = 4 * time.Hour
	// GetRing is a local pointer to the hummingbird function, for overriding in tests
	GetRing = ring.GetRing
)
View Source
var AuditForeverInterval = 30 * time.Second

AuditForeverInterval represents how often a auditor check should be performed.

View Source
var DriveFullError = errors.New("Drive Full")

DriveFullError can be returned by Object.SetData and Object.Delete if the disk is too full for the operation.

View Source
var LockPathError = errors.New("Error locking path")
View Source
var PathNotDirError = errors.New("Path is not a directory")
View Source
var RepUnmountedError = fmt.Errorf("Device unmounted")

Functions

func GetHashes

func GetHashes(driveRoot string, device string, partition string, recalculate []string, reclaimAge int64, policy int, logger srv.LowLevelLogger) (map[string]string, error)

func GetServer

func GetServer(serverconf conf.Config, flags *flag.FlagSet) (bindIP string, bindPort int, serv srv.Server, logger srv.LowLevelLogger, err error)

func HashCleanupListDir

func HashCleanupListDir(hashDir string, reclaimAge int64) ([]string, error)

func InvalidateHash

func InvalidateHash(hashDir string) error

InvalidateHash invalidates the hashdir's suffix hash, indicating it needs to be recalculated.

func MoveParts

func MoveParts(args []string)

MoveParts takes two object .ring.gz files as []string{oldRing, newRing} and dispatches priority replication jobs to rebalance data in line with any ring changes.

func NewAuditor

func NewAuditor(serverconf conf.Config, flags *flag.FlagSet) (srv.Daemon, srv.LowLevelLogger, error)

NewAuditor returns a new AuditorDaemon with the given conf.

func NewReplicator

func NewReplicator(serverconf conf.Config, flags *flag.FlagSet) (srv.Daemon, srv.LowLevelLogger, error)

func ObjHashDir

func ObjHashDir(vars map[string]string, driveRoot string, hashPathPrefix string, hashPathSuffix string, policy int) string

func ObjectFiles

func ObjectFiles(directory string) (string, string)

func ObjectMetadata

func ObjectMetadata(dataFile string, metaFile string) (map[string]string, error)

func OneTimeChan

func OneTimeChan() chan time.Time

OneTimeChan returns a channel that will yield the current time once, then is closed.

func OpenObjectMetadata

func OpenObjectMetadata(fd uintptr, metaFile string) (map[string]string, error)

func PolicyDir

func PolicyDir(policy int) string

func QuarantineHash

func QuarantineHash(hashDir string) error

func RawReadMetadata

func RawReadMetadata(fileNameOrFd interface{}) ([]byte, error)

func RawWriteMetadata

func RawWriteMetadata(fd uintptr, buf []byte) error

func ReadMetadata

func ReadMetadata(fileNameOrFd interface{}) (map[string]string, error)

func RecalculateSuffixHash

func RecalculateSuffixHash(suffixDir string, reclaimAge int64) (string, error)

func RegisterObjectEngine

func RegisterObjectEngine(name string, newEngine ObjectEngineConstructor)

RegisterObjectEngine lets you tell hummingbird about a new object engine.

func RescueParts

func RescueParts(args []string)

func RestoreDevice

func RestoreDevice(args []string)

RestoreDevice takes an IP address and device name such as []string{"172.24.0.1", "sda1"} and attempts to restores its data from peers.

func TempDirPath

func TempDirPath(driveRoot string, device string) string

func UnPolicyDir

func UnPolicyDir(dir string) (int, error)

func WriteMetadata

func WriteMetadata(fd uintptr, v map[string]string) error

Types

type Auditor

type Auditor struct {
	*AuditorDaemon
	// contains filtered or unexported fields
}

Auditor keeps track of general audit data.

type AuditorDaemon

type AuditorDaemon struct {
	// contains filtered or unexported fields
}

AuditorDaemon keeps track of object specific audit data.

func (*AuditorDaemon) Run

func (d *AuditorDaemon) Run()

Run a single audit pass.

func (*AuditorDaemon) RunForever

func (d *AuditorDaemon) RunForever()

RunForever triggering audit passes every time AuditForeverInterval has passed.

type BeginReplicationRequest

type BeginReplicationRequest struct {
	Device     string
	Partition  string
	NeedHashes bool
}

type BeginReplicationResponse

type BeginReplicationResponse struct {
	Hashes map[string]string
}

type FileUploadResponse

type FileUploadResponse struct {
	Success bool
	Msg     string
}

type NoMoreNodes

type NoMoreNodes struct{}

func (*NoMoreNodes) Next

func (n *NoMoreNodes) Next() *ring.Device

type Object

type Object interface {
	// Exists determines whether or not there is an object to serve. Deleted objects do not exist, even if there is a tombstone.
	Exists() bool
	// Quarantine removes the file's data, presumably after determining it's been corrupted.
	Quarantine() error
	// Metadata returns the object's metadata.  Will be nil if the object doesn't exist.
	Metadata() map[string]string
	// ContentLength returns the object's content-length.
	ContentLength() int64
	// CopyRange copies a range of data from the object to the writer.
	CopyRange(io.Writer, int64, int64) (int64, error)
	// Copy copies an object's entire contents to the writer(s).
	Copy(...io.Writer) (int64, error)
	// SetData sets the data for the object, given the size (if known).  It returns a writer and an error if any.
	SetData(size int64) (io.Writer, error)
	// Commit saves a new object data that was started with SetData.
	Commit(metadata map[string]string) error
	// Delete deletes the object.
	Delete(metadata map[string]string) error
	// Close releases any resources held by the Object instance.
	Close() error
	// Repr returns a representation of the object, used for logging.
	Repr() string
}

type ObjectEngine

type ObjectEngine interface {
	// New creates a new instance of the Object, for interacting with a single object.
	New(vars map[string]string, needData bool, asyncWG *sync.WaitGroup) (Object, error)
}

ObjectEngine is the type you have to give hummingbird to create a new object engine.

func SwiftEngineConstructor

func SwiftEngineConstructor(config conf.Config, policy *conf.Policy, flags *flag.FlagSet) (ObjectEngine, error)

SwiftEngineConstructor creates a SwiftEngine given the object server configs.

type ObjectEngineConstructor

type ObjectEngineConstructor func(conf.Config, *conf.Policy, *flag.FlagSet) (ObjectEngine, error)

ObjectEngineConstructor> is a function that, given configs and flags, returns an ObjectEngine

func FindEngine

func FindEngine(name string) (ObjectEngineConstructor, error)

FindEngine returns the registered object engine with the given name.

type ObjectServer

type ObjectServer struct {
	// contains filtered or unexported fields
}

func (*ObjectServer) AcquireDevice

func (server *ObjectServer) AcquireDevice(next http.Handler) http.Handler

func (*ObjectServer) DiskUsageHandler

func (server *ObjectServer) DiskUsageHandler(writer http.ResponseWriter, request *http.Request)

func (*ObjectServer) Finalize

func (server *ObjectServer) Finalize()

func (*ObjectServer) GetHandler

func (server *ObjectServer) GetHandler(config conf.Config) http.Handler

func (*ObjectServer) HealthcheckHandler

func (server *ObjectServer) HealthcheckHandler(writer http.ResponseWriter, request *http.Request)

func (*ObjectServer) LogRequest

func (server *ObjectServer) LogRequest(next http.Handler) http.Handler

func (*ObjectServer) ObjDeleteHandler

func (server *ObjectServer) ObjDeleteHandler(writer http.ResponseWriter, request *http.Request)

func (*ObjectServer) ObjGetHandler

func (server *ObjectServer) ObjGetHandler(writer http.ResponseWriter, request *http.Request)

func (*ObjectServer) ObjPutHandler

func (server *ObjectServer) ObjPutHandler(writer http.ResponseWriter, request *http.Request)

func (*ObjectServer) OptionsHandler

func (server *ObjectServer) OptionsHandler(writer http.ResponseWriter, request *http.Request)

func (*ObjectServer) ReconHandler

func (server *ObjectServer) ReconHandler(writer http.ResponseWriter, request *http.Request)

type PriorityRepJob

type PriorityRepJob struct {
	Partition  uint64         `json:"partition"`
	FromDevice *ring.Device   `json:"from_device"`
	ToDevices  []*ring.Device `json:"to_devices"`
	Policy     int            `json:"policy"`
}

type RepConn

type RepConn interface {
	SendMessage(v interface{}) error
	RecvMessage(v interface{}) error
	Write(data []byte) (l int, err error)
	Flush() error
	Read(data []byte) (l int, err error)
	Disconnected() bool
	Close()
}

func NewIncomingRepConn

func NewIncomingRepConn(rw *bufio.ReadWriter, c net.Conn) RepConn

func NewRepConn

func NewRepConn(dev *ring.Device, partition string, policy int) (RepConn, error)

type ReplicationDevice

type ReplicationDevice interface {
	Replicate()
	ReplicateLoop()
	Key() string
	Cancel()
	PriorityReplicate(pri PriorityRepJob, timeout time.Duration) bool
	Stats() *ReplicationDeviceStats
}

type ReplicationDeviceStats

type ReplicationDeviceStats struct {
	Stats            map[string]int64
	LastCheckin      time.Time
	RunStarted       time.Time
	DeviceStarted    time.Time
	LastPassDate     time.Time
	LastPassDuration time.Duration
	TotalPasses      int64
}

type ReplicationManager

type ReplicationManager struct {
	// contains filtered or unexported fields
}

ReplicationManager is used by the object server to limit replication concurrency

func NewReplicationManager

func NewReplicationManager(limitPerDisk int64, limitOverall int64) *ReplicationManager

func (*ReplicationManager) Begin

func (r *ReplicationManager) Begin(device string, timeout time.Duration) bool

Begin gives or rejects permission for a new replication session on the given device.

func (*ReplicationManager) Done

func (r *ReplicationManager) Done(device string)

Done marks the session completed, removing it from any accounting.

type Replicator

type Replicator struct {
	Rings map[int]replicationRing
	// contains filtered or unexported fields
}

Object replicator daemon object

func (*Replicator) GetHandler

func (r *Replicator) GetHandler() http.Handler

func (*Replicator) LogRequest

func (r *Replicator) LogRequest(next http.Handler) http.Handler

func (*Replicator) ProgressReportHandler

func (r *Replicator) ProgressReportHandler(w http.ResponseWriter, req *http.Request)

ProgressReportHandler handles HTTP requests for current replication progress

func (*Replicator) Run

func (r *Replicator) Run()

Run a single replication pass. (NOTE: we will prob get rid of this because of priorityRepl)

func (*Replicator) RunForever

func (r *Replicator) RunForever()

Run replication passes in a loop until forever.

type SwiftEngine

type SwiftEngine struct {
	// contains filtered or unexported fields
}

func (*SwiftEngine) New

func (f *SwiftEngine) New(vars map[string]string, needData bool, asyncWG *sync.WaitGroup) (Object, error)

New returns an instance of SwiftObject with the given parameters. Metadata is read in and if needData is true, the file is opened. AsyncWG is a waitgroup if the object spawns any async operations

type SwiftObject

type SwiftObject struct {
	// contains filtered or unexported fields
}

SwiftObject implements an Object that is compatible with Swift's object server.

func (*SwiftObject) Close

func (o *SwiftObject) Close() error

Close releases any resources used by the instance of SwiftObject

func (*SwiftObject) Commit

func (o *SwiftObject) Commit(metadata map[string]string) error

Commit commits an open data file to disk, given the metadata.

func (*SwiftObject) ContentLength

func (o *SwiftObject) ContentLength() int64

ContentLength parses and returns the Content-Length for the object.

func (*SwiftObject) Copy

func (o *SwiftObject) Copy(dsts ...io.Writer) (written int64, err error)

Copy copies all data from the underlying .data file to the given writers.

func (*SwiftObject) CopyRange

func (o *SwiftObject) CopyRange(w io.Writer, start int64, end int64) (int64, error)

CopyRange copies data in the range of start to end from the underlying .data file to the writer.

func (*SwiftObject) Delete

func (o *SwiftObject) Delete(metadata map[string]string) error

Delete deletes the object.

func (*SwiftObject) Exists

func (o *SwiftObject) Exists() bool

Exists returns true if the object exists, that is if it has a .data file.

func (*SwiftObject) Metadata

func (o *SwiftObject) Metadata() map[string]string

Metadata returns the object's metadata.

func (*SwiftObject) Quarantine

func (o *SwiftObject) Quarantine() error

Quarantine removes the object's underlying files to the Quarantined directory on the device.

func (*SwiftObject) Repr

func (o *SwiftObject) Repr() string

Repr returns a string that identifies the object in some useful way, used for logging.

func (*SwiftObject) SetData

func (o *SwiftObject) SetData(size int64) (io.Writer, error)

SetData is called to set the object's data. It takes a size (if available, otherwise set to zero).

type SyncFileRequest

type SyncFileRequest struct {
	Path   string
	Xattrs string
	Size   int64
	Check  bool
	Ping   bool
	Done   bool
}

type SyncFileResponse

type SyncFileResponse struct {
	Exists      bool
	NewerExists bool
	GoAhead     bool
	Msg         string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL