objectserver

package
v0.0.0-...-32f2820 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 15, 2018 License: Apache-2.0 Imports: 54 Imported by: 0

Documentation

Overview

Package objectserver provides a Object Server implementation.

Hummingbird Replication

The hummingbird object server is backwards-compatible with the python-swift replicator, but hummingbird-to-hummingbird replication uses its own simple protocol that it sends over a hijacked HTTP connection.

Messages are 32-bit length prefixed JSON serialized structures. The basic flow looks like:

replicator sends a BeginReplicationRequest{Device string, Partition string, NeedHashes bool}
server responds with a BeginReplicationResponse{Hashes map[string]string}
for each object file in the partition where suffix hash doesn't match {
	replicator sends a SyncFileRequest{Path string, Xattrs string, Size int}
	server responds with a SyncFileResponse{Exists bool, NewerExists bool, GoAhead bool}
	if response.GoAhead is true {
		replicator sends raw file body
		server responds with a FileUploadResponse{Success bool}
	}
}

The replicator limits concurrency per-device and overall. When the server gets a BeginReplicationRequest, it'll wait up to 60 seconds for a slot to open up before rejecting it.

Unlike python-swift, the replicator will only read each filesystem once per pass.

Index

Constants

This section is empty.

Variables

View Source
var AuditForeverInterval = 30 * time.Second

AuditForeverInterval represents how often a auditor check should be performed.

View Source
var DriveFullError = errors.New("Drive Full")

DriveFullError can be returned by Object.SetData and Object.Delete if the disk is too full for the operation.

View Source
var LockPathError = errors.New("Error locking path")
View Source
var PathNotDirError = errors.New("Path is not a directory")
View Source
var RepUnmountedError = fmt.Errorf("Device unmounted")

Functions

func AsyncDir

func AsyncDir(policy int) string

func Expired

func Expired(metadata map[string]string) bool

func GetHashes

func GetHashes(driveRoot string, device string, partition string, recalculate []string, reclaimAge int64, policy int, logger srv.LowLevelLogger) (map[string]string, error)

func HashCleanupListDir

func HashCleanupListDir(hashDir string, reclaimAge int64) ([]string, error)

func InvalidateHash

func InvalidateHash(hashDir string) error

InvalidateHash invalidates the hashdir's suffix hash, indicating it needs to be recalculated.

func MetadataHash

func MetadataHash(metadata map[string]string) string

MetadataHash returns a hash of the contents of the metadata.

func MetadataMerge

func MetadataMerge(a map[string]string, b map[string]string) map[string]string

MetadataMerge will return the result of merging the a and b metadata sets; neither a nor b should be used after calling this method.

func MoveParts

func MoveParts(args []string, cnf srv.ConfigLoader)

MoveParts takes two object .ring.gz files as []string{oldRing, newRing} and dispatches priority replication jobs to rebalance data in line with any ring changes.

func NewReplicator

func NewReplicator(serverconf conf.Config, flags *flag.FlagSet, cnf srv.ConfigLoader) (ipPort *srv.IpPort, server srv.Server, logger srv.LowLevelLogger, err error)

func NewServer

func NewServer(serverconf conf.Config, flags *flag.FlagSet, cnf srv.ConfigLoader) (*srv.IpPort, srv.Server, srv.LowLevelLogger, error)

func ObjHash

func ObjHash(vars map[string]string, hashPathPrefix string, hashPathSuffix string) string

func ObjHashDir

func ObjHashDir(vars map[string]string, driveRoot string, hashPathPrefix string, hashPathSuffix string, policy int) string

func ObjectFiles

func ObjectFiles(directory string) (string, string)

func ObjectMetadata

func ObjectMetadata(dataFile string, metaFile string) (map[string]string, error)

func OneTimeChan

func OneTimeChan() chan time.Time

OneTimeChan returns a channel that will yield the current time once, then is closed.

func OpenObjectMetadata

func OpenObjectMetadata(fd uintptr, metaFile string) (map[string]string, error)

func PolicyDir

func PolicyDir(policy int) string

func QuarantineHash

func QuarantineHash(hashDir string) error

func QuarantineItem

func QuarantineItem(db *IndexDB, item *IndexDBItem) error

func RecalculateSuffixHash

func RecalculateSuffixHash(suffixDir string, reclaimAge int64) (string, error)

func RegisterObjectEngine

func RegisterObjectEngine(name string, newEngine ObjectEngineConstructor)

RegisterObjectEngine lets you tell hummingbird about a new object engine.

func RestoreDevice

func RestoreDevice(args []string, cnf srv.ConfigLoader)

RestoreDevice takes an IP address and device name such as []string{"172.24.0.1", "sda1"} and attempts to restores its data from peers.

func SendPriRepJob

func SendPriRepJob(job *PriorityRepJob, client common.HTTPClient, userAgent string) (string, bool)

func TempDirPath

func TempDirPath(driveRoot string, device string) string

func UnPolicyDir

func UnPolicyDir(dir string) (int, error)

func ValidateHash

func ValidateHash(hsh string, ringPartPower, dbPartPower uint, subdirs int) (hshOut string, ringPart, dbPart, dirNm int, err error)

Types

type Auditor

type Auditor struct {
	*AuditorDaemon
	// contains filtered or unexported fields
}

Auditor keeps track of general audit data.

type AuditorDaemon

type AuditorDaemon struct {
	// contains filtered or unexported fields
}

AuditorDaemon keeps track of object specific audit data.

func NewAuditorDaemon

func NewAuditorDaemon(serverconf conf.Config, flags *flag.FlagSet, cnf srv.ConfigLoader) (*AuditorDaemon, error)

NewAuditor returns a new AuditorDaemon with the given conf.

func (*AuditorDaemon) Run

func (d *AuditorDaemon) Run()

Run a single audit pass.

func (*AuditorDaemon) RunForever

func (d *AuditorDaemon) RunForever()

RunForever triggering audit passes every time AuditForeverInterval has passed.

type BeginReplicationRequest

type BeginReplicationRequest struct {
	Device     string
	Partition  string
	NeedHashes bool
}

type BeginReplicationResponse

type BeginReplicationResponse struct {
	Hashes map[string]string
}

type DeviceStats

type DeviceStats struct {
	Stats              map[string]int64
	LastCheckin        time.Time
	PassStarted        time.Time
	DeviceStarted      time.Time
	LastPassFinishDate time.Time
	LastPassDuration   time.Duration

	// We can probably drop these at some point. But right now, recon uses
	// these results. We'd need to update the recon tools to use the /metrics
	// endpoint instead and make sure no other tools are using the /recon
	// endpoint. Probably a deprecation phase or something. For now, double the
	// stats work.
	CancelCount      int64
	FilesSent        int64
	BytesSent        int64
	PartitionsDone   int64
	PartitionsTotal  int64
	TotalPasses      int64
	PriorityRepsDone int64
	// contains filtered or unexported fields
}

type FileUploadResponse

type FileUploadResponse struct {
	Success bool
	Msg     string
}

type IndexDB

type IndexDB struct {
	RingPartPower uint // GLH: Temp exported for fakelist
	// contains filtered or unexported fields
}

IndexDB will track a set of objects.

This is the "index.db" per disk. Right now it just handles whole objects, but eventually we'd like to add either slab support or direct database embedding for small objects. But, those details should be transparent from users of a IndexDB.

This is different from the standard Swift full replica object tracking in that the directory structure is much shallower, there are a configurable number of databases per drive instead of a ton of hashes.pkl files, and the version tracking / consolidation is much simpler.

The IndexDB stores the newest object contents it knows about and discards any older ones, like the standard Swift's .data files. It does not have .meta files at all, and certainly not stacked to infinity .meta files. Instead the metadata is stored in the database as JSON.

A given IndexDB may not even store any metadata, such as in an EC system, with just "key" IndexDBs storing the metadata.

func NewIndexDB

func NewIndexDB(dbpath, filepath, temppath string, ringPartPower, dbPartPower, subdirs int, reserve int64, logger srv.LowLevelLogger, auditor IndexDBAuditor) (*IndexDB, error)

NewIndexDB creates a IndexDB to manage a set of objects.

The ringPartPower is defined by the ring in use, but should be greater than the dbPartPower. The dbPartPower will define how many databases are created (e.g. dbPartPower = 6 gives 64 databases). The subdirs value will define how many subdirectories are created where object content files are placed.

func (*IndexDB) Close

func (ot *IndexDB) Close()

Close closes all the underlying databases for the IndexDB; you should discard the IndexDB instance after this call.

func (*IndexDB) Commit

func (ot *IndexDB) Commit(f fs.AtomicFileWriter, hsh string, shard int, timestamp int64, method string, metadata map[string]string, nursery bool, shardhash string) error

Commit moves the temporary file (from TempFile) into place and records its information in the database. It may actually discard it completely if there is already a newer object information in place for the hash:shard.

Shard is mostly for EC type policies; just use 0 if you're using a full replica policy.

Timestamp is the timestamp for the object contents, not necessarily the metadata.

func (*IndexDB) ExpireObjects

func (ot *IndexDB) ExpireObjects() error

func (*IndexDB) List

func (ot *IndexDB) List(startHash, stopHash, marker string, limit int) ([]*IndexDBItem, error)

List returns the items for the ringPart given.

This is for replication, auditing, that sort of thing. NOTE: List does not populate item.Path for some reason- maybe size of listing? Maybe we should change that later.

func (*IndexDB) ListObjectsToStabilize

func (ot *IndexDB) ListObjectsToStabilize() ([]*IndexDBItem, error)

ListObjectsToStabilize lists all objects that are in the nursery or set to restabilzed

func (*IndexDB) Lookup

func (ot *IndexDB) Lookup(hsh string, shard int, justStable bool) (*IndexDBItem, error)

Lookup returns the stored information for the hsh and shard. Will return (nil, error) if there is an error. (nil, nil) if not found

func (*IndexDB) Remove

func (ot *IndexDB) Remove(hsh string, shard int, timestamp int64, nursery bool) error

Remove removes an entry from the database and its backing disk file.

func (*IndexDB) RingPartRange

func (ot *IndexDB) RingPartRange(ringPart int) (string, string)

func (*IndexDB) SetStabilized

func (ot *IndexDB) SetStabilized(hsh string, shard int, timestamp int64, stabilizePath bool) error

func (*IndexDB) StablePost

func (ot *IndexDB) StablePost(hsh string, shardIndex int, request *http.Request) error

func (*IndexDB) StablePut

func (ot *IndexDB) StablePut(hsh string, shardIndex int, request *http.Request) error

func (*IndexDB) TempFile

func (ot *IndexDB) TempFile(hsh string, shard int, timestamp int64, sizeHint int64, newWriteToNursery bool) (fs.AtomicFileWriter, error)

TempFile returns a temporary file to write to for eventually adding the hash:shard to the IndexDB with Commit; may return (nil, nil) if there is already a newer or equal timestamp in place for the hash:shard.

func (*IndexDB) WholeObjectPath

func (ot *IndexDB) WholeObjectPath(hsh string, shard int, timestamp int64, nursery bool) (string, error)

type IndexDBAuditor

type IndexDBAuditor interface {
	AuditItem(path string, item *IndexDBItem, md5BytesPerSec int64) (int64, error)
}

type IndexDBItem

type IndexDBItem struct {
	Hash        string
	Shard       int
	Timestamp   int64
	Metahash    string `json:"-"`
	Nursery     bool
	Metabytes   []byte `json:"-"`
	Deletion    bool
	Path        string
	ShardHash   string
	Restabilize bool
	Expires     *int64
}

IndexDBItem is a single item returned by List.

type NoMoreNodes

type NoMoreNodes struct{}

func (*NoMoreNodes) Next

func (n *NoMoreNodes) Next() *ring.Device

type NurseryObjectEngine

type NurseryObjectEngine interface {
	ObjectEngine
	GetObjectsToStabilize(device string, c chan ObjectStabilizer, cancel chan struct{})
	GetObjectsToReplicate(prirep PriorityRepJob, c chan ObjectStabilizer, cancel chan struct{})
}

type Object

type Object interface {
	// Exists determines whether or not there is an object to serve. Deleted objects do not exist, even if there is a tombstone.
	Exists() bool
	// Quarantine removes the file's data, presumably after determining it's been corrupted.
	Quarantine() error
	// Metadata returns the object's metadata.  Will be nil if the object doesn't exist.
	Metadata() map[string]string
	// ContentLength returns the object's content-length.
	ContentLength() int64
	// CopyRange copies a range of data from the object to the writer.
	CopyRange(io.Writer, int64, int64) (int64, error)
	// Copy copies an object's entire contents to the writer(s).
	Copy(...io.Writer) (int64, error)
	// SetData sets the data for the object, given the size (if known).  It returns a writer and an error if any.
	SetData(size int64) (io.Writer, error)
	// Commit saves a new object data that was started with SetData.
	Commit(metadata map[string]string) error
	// CommitMetadata updates the object's metadata.
	CommitMetadata(metadata map[string]string) error
	// Delete deletes the object.
	Delete(metadata map[string]string) error
	// Close releases any resources held by the Object instance.
	Close() error
	// Repr returns a representation of the object, used for logging.
	Repr() string
}

type ObjectEngine

type ObjectEngine interface {
	// New creates a new instance of the Object, for interacting with a single object.
	New(vars map[string]string, needData bool, asyncWG *sync.WaitGroup) (Object, error)
	GetReplicationDevice(oring ring.Ring, dev *ring.Device, r *Replicator) (ReplicationDevice, error)
}

ObjectEngine is the type you have to give hummingbird to create a new object engine.

func SwiftEngineConstructor

func SwiftEngineConstructor(config conf.Config, policy *conf.Policy, flags *flag.FlagSet) (ObjectEngine, error)

SwiftEngineConstructor creates a SwiftEngine given the object server configs.

type ObjectEngineConstructor

type ObjectEngineConstructor func(conf.Config, *conf.Policy, *flag.FlagSet) (ObjectEngine, error)

ObjectEngineConstructor> is a function that, given configs and flags, returns an ObjectEngine

func FindEngine

func FindEngine(name string) (ObjectEngineConstructor, error)

FindEngine returns the registered object engine with the given name.

type ObjectServer

type ObjectServer struct {
	// contains filtered or unexported fields
}

func (*ObjectServer) AcquireDevice

func (server *ObjectServer) AcquireDevice(next http.Handler) http.Handler

func (*ObjectServer) Background

func (server *ObjectServer) Background(flags *flag.FlagSet) chan struct{}

func (*ObjectServer) DiskUsageHandler

func (server *ObjectServer) DiskUsageHandler(writer http.ResponseWriter, request *http.Request)

func (*ObjectServer) Finalize

func (server *ObjectServer) Finalize()

func (*ObjectServer) GetHandler

func (server *ObjectServer) GetHandler(config conf.Config, metricsPrefix string) http.Handler

func (*ObjectServer) HealthcheckHandler

func (server *ObjectServer) HealthcheckHandler(writer http.ResponseWriter, request *http.Request)

func (*ObjectServer) LogRequest

func (server *ObjectServer) LogRequest(next http.Handler) http.Handler

func (*ObjectServer) ObjDeleteHandler

func (server *ObjectServer) ObjDeleteHandler(writer http.ResponseWriter, request *http.Request)

func (*ObjectServer) ObjGetHandler

func (server *ObjectServer) ObjGetHandler(writer http.ResponseWriter, request *http.Request)

func (*ObjectServer) ObjPostHandler

func (server *ObjectServer) ObjPostHandler(writer http.ResponseWriter, request *http.Request)

func (*ObjectServer) ObjPutHandler

func (server *ObjectServer) ObjPutHandler(writer http.ResponseWriter, request *http.Request)

func (*ObjectServer) OptionsHandler

func (server *ObjectServer) OptionsHandler(writer http.ResponseWriter, request *http.Request)

func (*ObjectServer) ReconHandler

func (server *ObjectServer) ReconHandler(writer http.ResponseWriter, request *http.Request)

func (*ObjectServer) Type

func (server *ObjectServer) Type() string

type ObjectStabilizer

type ObjectStabilizer interface {
	Object
	// Stabilize object- move to stable location / erasure code / do nothing / etc
	Stabilize(*ring.Device) error
	Replicate(PriorityRepJob) error
}

type PolicyHandlerRegistrator

type PolicyHandlerRegistrator interface {
	RegisterHandlers(addRoute func(method, path string, handler http.HandlerFunc))
}

type PriorityRepJob

type PriorityRepJob struct {
	Partition  uint64       `json:"partition"`
	FromDevice *ring.Device `json:"from_device"`
	ToDevice   *ring.Device `json:"to_device"`
	Policy     int          `json:"policy"`
}

type PriorityReplicationResult

type PriorityReplicationResult struct {
	ObjectsReplicated int64
	ObjectsErrored    int64
	Success           bool
	ErrorMsg          string
}

type RepConn

type RepConn interface {
	SendMessage(v interface{}) error
	RecvMessage(v interface{}) error
	Write(data []byte) (l int, err error)
	Flush() error
	Read(data []byte) (l int, err error)
	Disconnected() bool
	Close()
}

func NewIncomingRepConn

func NewIncomingRepConn(rw *bufio.ReadWriter, c net.Conn, rcTimeout time.Duration) RepConn

func NewRepConn

func NewRepConn(dev *ring.Device, partition string, policy int, headers map[string]string, certFile, keyFile string, rcTimeout time.Duration) (RepConn, error)

type ReplicationDevice

type ReplicationDevice interface {
	Scan()
	ScanLoop()
	Key() string
	Cancel()
	PriorityReplicate(w http.ResponseWriter, pri PriorityRepJob)
	UpdateStat(string, int64)
	Type() string
}

func GetNurseryDevice

func GetNurseryDevice(oring ring.Ring, dev *ring.Device, policy int, r *Replicator, f NurseryObjectEngine) (ReplicationDevice, error)

type Replicator

type Replicator struct {
	CertFile string
	KeyFile  string
	// contains filtered or unexported fields
}

Object replicator daemon object

func (*Replicator) Background

func (server *Replicator) Background(flags *flag.FlagSet) chan struct{}

func (*Replicator) Finalize

func (server *Replicator) Finalize()

func (*Replicator) GetHandler

func (r *Replicator) GetHandler(config conf.Config, metricsPrefix string) http.Handler

func (*Replicator) HealthcheckHandler

func (server *Replicator) HealthcheckHandler(writer http.ResponseWriter, request *http.Request)

func (*Replicator) LogRequest

func (r *Replicator) LogRequest(next http.Handler) http.Handler

func (*Replicator) ProgressReportHandler

func (r *Replicator) ProgressReportHandler(w http.ResponseWriter, req *http.Request)

ProgressReportHandler handles HTTP requests for current replication progress

func (*Replicator) Run

func (r *Replicator) Run()

Run a single replication pass. (NOTE: we will prob get rid of this because of priorityRepl)

func (*Replicator) RunForever

func (r *Replicator) RunForever()

Run replication passes in a loop until forever.

func (*Replicator) Type

func (server *Replicator) Type() string

type SwiftEngine

type SwiftEngine struct {
	// contains filtered or unexported fields
}

func (*SwiftEngine) GetReplicationDevice

func (f *SwiftEngine) GetReplicationDevice(oring ring.Ring, dev *ring.Device, r *Replicator) (ReplicationDevice, error)

func (*SwiftEngine) New

func (f *SwiftEngine) New(vars map[string]string, needData bool, asyncWG *sync.WaitGroup) (Object, error)

New returns an instance of SwiftObject with the given parameters. Metadata is read in and if needData is true, the file is opened. AsyncWG is a waitgroup if the object spawns any async operations

type SwiftObject

type SwiftObject struct {
	// contains filtered or unexported fields
}

SwiftObject implements an Object that is compatible with Swift's object server.

func (*SwiftObject) Close

func (o *SwiftObject) Close() error

Close releases any resources used by the instance of SwiftObject

func (*SwiftObject) Commit

func (o *SwiftObject) Commit(metadata map[string]string) error

Commit commits an open data file to disk, given the metadata.

func (*SwiftObject) CommitMetadata

func (o *SwiftObject) CommitMetadata(metadata map[string]string) error

func (*SwiftObject) ContentLength

func (o *SwiftObject) ContentLength() int64

ContentLength parses and returns the Content-Length for the object.

func (*SwiftObject) Copy

func (o *SwiftObject) Copy(dsts ...io.Writer) (written int64, err error)

Copy copies all data from the underlying .data file to the given writers.

func (*SwiftObject) CopyRange

func (o *SwiftObject) CopyRange(w io.Writer, start int64, end int64) (int64, error)

CopyRange copies data in the range of start to end from the underlying .data file to the writer.

func (*SwiftObject) Delete

func (o *SwiftObject) Delete(metadata map[string]string) error

Delete deletes the object.

func (*SwiftObject) Exists

func (o *SwiftObject) Exists() bool

Exists returns true if the object exists, that is if it has a .data file.

func (*SwiftObject) Metadata

func (o *SwiftObject) Metadata() map[string]string

Metadata returns the object's metadata.

func (*SwiftObject) Quarantine

func (o *SwiftObject) Quarantine() error

Quarantine removes the object's underlying files to the Quarantined directory on the device.

func (*SwiftObject) Repr

func (o *SwiftObject) Repr() string

Repr returns a string that identifies the object in some useful way, used for logging.

func (*SwiftObject) SetData

func (o *SwiftObject) SetData(size int64) (io.Writer, error)

SetData is called to set the object's data. It takes a size (if available, otherwise set to zero).

type SyncFileRequest

type SyncFileRequest struct {
	Path   string
	Xattrs string
	Size   int64
	Check  bool
	Ping   bool
	Done   bool
}

type SyncFileResponse

type SyncFileResponse struct {
	Exists      bool
	NewerExists bool
	GoAhead     bool
	Msg         string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL