orders

package
v1.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 22, 2020 License: AGPL-3.0 Imports: 21 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// Error the default orders errs class
	Error = errs.Class("orders error")
	// ErrUsingSerialNumber error class for serial number
	ErrUsingSerialNumber = errs.Class("serial number")
)
View Source
var ErrDownloadFailedNotEnoughPieces = errs.Class("not enough pieces for download")

ErrDownloadFailedNotEnoughPieces is returned when download failed due to missing pieces

Functions

func SortBucketBandwidthRollups added in v0.30.0

func SortBucketBandwidthRollups(rollups []BucketBandwidthRollup)

SortBucketBandwidthRollups sorts the rollups

func SortStoragenodeBandwidthRollups added in v0.30.0

func SortStoragenodeBandwidthRollups(rollups []StoragenodeBandwidthRollup)

SortStoragenodeBandwidthRollups sorts the rollups

func SplitBucketID added in v0.15.0

func SplitBucketID(bucketID []byte) (projectID uuid.UUID, bucketName []byte, err error)

SplitBucketID takes a bucketID, splits on /, and returns a projectID and bucketName

Types

type BucketBandwidthRollup added in v0.30.0

type BucketBandwidthRollup struct {
	ProjectID  uuid.UUID
	BucketName string
	Action     pb.PieceAction
	Inline     int64
	Allocated  int64
	Settled    int64
}

BucketBandwidthRollup contains all the info needed for a bucket bandwidth rollup

type CacheData added in v0.29.8

type CacheData struct {
	Inline    int64
	Allocated int64
}

CacheData stores the amount of inline and allocated data for a bucket bandwidth rollup

type CacheKey added in v0.29.8

type CacheKey struct {
	ProjectID  uuid.UUID
	BucketName string
	Action     pb.PieceAction
}

CacheKey is the key information for the cached map below

type Chore added in v0.29.8

type Chore struct {
	Loop *sync2.Cycle
	// contains filtered or unexported fields
}

Chore for flushing orders write cache to the database.

architecture: Chore

func NewChore added in v0.29.8

func NewChore(log *zap.Logger, rollupsWriteCache *RollupsWriteCache, config Config) *Chore

NewChore creates new chore for flushing the orders write cache to the database.

func (*Chore) Close added in v0.29.8

func (chore *Chore) Close() error

Close stops the orders write cache chore.

func (*Chore) Run added in v0.29.8

func (chore *Chore) Run(ctx context.Context) (err error)

Run starts the orders write cache chore.

type Config added in v0.14.0

type Config struct {
	Expiration                   time.Duration `help:"how long until an order expires" default:"48h"` // 2 days
	SettlementBatchSize          int           `help:"how many orders to batch per transaction" default:"250"`
	FlushBatchSize               int           `` /* 127-byte string literal not displayed */
	FlushInterval                time.Duration `help:"how often to flush the rollups write cache to the database" devDefault:"30s" releaseDefault:"1m"`
	ReportedRollupsReadBatchSize int           `help:"how many records to read in a single transaction when calculating billable bandwidth" default:"1000"`
	NodeStatusLogging            bool          `help:"log the offline/disqualification status of nodes" default:"false"`
}

Config is a configuration struct for orders Service.

type ConsumedSerial added in v0.34.1

type ConsumedSerial struct {
	NodeID       storj.NodeID
	SerialNumber storj.SerialNumber
	ExpiresAt    time.Time
}

ConsumedSerial is a serial that has been consumed and its bandwidth recorded.

type DB

type DB interface {
	// CreateSerialInfo creates serial number entry in database.
	CreateSerialInfo(ctx context.Context, serialNumber storj.SerialNumber, bucketID []byte, limitExpiration time.Time) error
	// UseSerialNumber creates a used serial number entry in database from an
	// existing serial number.
	// It returns the bucket ID associated to serialNumber.
	UseSerialNumber(ctx context.Context, serialNumber storj.SerialNumber, storageNodeID storj.NodeID) ([]byte, error)
	// UnuseSerialNumber removes pair serial number -> storage node id from database
	UnuseSerialNumber(ctx context.Context, serialNumber storj.SerialNumber, storageNodeID storj.NodeID) error
	// DeleteExpiredSerials deletes all expired serials in serial_number, used_serials, and consumed_serials table.
	DeleteExpiredSerials(ctx context.Context, now time.Time) (_ int, err error)
	// DeleteExpiredConsumedSerials deletes all expired serials in the consumed_serials table.
	DeleteExpiredConsumedSerials(ctx context.Context, now time.Time) (_ int, err error)

	// UpdateBucketBandwidthAllocation updates 'allocated' bandwidth for given bucket
	UpdateBucketBandwidthAllocation(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error
	// UpdateBucketBandwidthSettle updates 'settled' bandwidth for given bucket
	UpdateBucketBandwidthSettle(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error
	// UpdateBucketBandwidthInline updates 'inline' bandwidth for given bucket
	UpdateBucketBandwidthInline(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error

	// UpdateStoragenodeBandwidthSettle updates 'settled' bandwidth for given storage node
	UpdateStoragenodeBandwidthSettle(ctx context.Context, storageNode storj.NodeID, action pb.PieceAction, amount int64, intervalStart time.Time) error

	// GetBucketBandwidth gets total bucket bandwidth from period of time
	GetBucketBandwidth(ctx context.Context, projectID uuid.UUID, bucketName []byte, from, to time.Time) (int64, error)
	// GetStorageNodeBandwidth gets total storage node bandwidth from period of time
	GetStorageNodeBandwidth(ctx context.Context, nodeID storj.NodeID, from, to time.Time) (int64, error)

	// ProcessOrders takes a list of order requests and processes them in a batch
	ProcessOrders(ctx context.Context, requests []*ProcessOrderRequest) (responses []*ProcessOrderResponse, err error)

	// WithTransaction runs the callback and provides it with a Transaction.
	WithTransaction(ctx context.Context, cb func(ctx context.Context, tx Transaction) error) error
	// WithQueue runs the callback and provides it with a Queue. When the callback returns with
	// no error, any pending serials returned by the queue are removed from it.
	WithQueue(ctx context.Context, cb func(ctx context.Context, queue Queue) error) error
}

DB implements saving order after receiving from storage node

architecture: Database

type Endpoint

type Endpoint struct {
	DB DB
	// contains filtered or unexported fields
}

Endpoint for orders receiving

architecture: Endpoint

func NewEndpoint

func NewEndpoint(log *zap.Logger, satelliteSignee signing.Signee, db DB, settlementBatchSize int) *Endpoint

NewEndpoint new orders receiving endpoint

func (*Endpoint) DRPC added in v0.21.0

func (endpoint *Endpoint) DRPC() pb.DRPCOrdersServer

DRPC returns a DRPC form of the endpoint.

func (*Endpoint) Settlement

func (endpoint *Endpoint) Settlement(stream pbgrpc.Orders_SettlementServer) (err error)

Settlement receives orders and handles them in batches

type PendingSerial added in v0.34.1

type PendingSerial struct {
	NodeID       storj.NodeID
	BucketID     []byte
	Action       uint
	SerialNumber storj.SerialNumber
	ExpiresAt    time.Time
	Settled      uint64
}

PendingSerial is a serial number reported by a storagenode waiting to be settled

type ProcessOrderRequest added in v0.18.0

type ProcessOrderRequest struct {
	Order      *pb.Order
	OrderLimit *pb.OrderLimit
}

ProcessOrderRequest for batch order processing

type ProcessOrderResponse added in v0.18.0

type ProcessOrderResponse struct {
	SerialNumber storj.SerialNumber
	Status       pb.SettlementResponse_Status
}

ProcessOrderResponse for batch order processing responses

type Queue added in v0.34.1

type Queue interface {
	// GetPendingSerialsBatch returns a batch of pending serials containing at most size
	// entries. It returns a boolean indicating true if the queue is empty.
	GetPendingSerialsBatch(ctx context.Context, size int) ([]PendingSerial, bool, error)
}

Queue is an abstraction around a queue of pending serials.

type RollupData added in v0.29.8

type RollupData map[CacheKey]CacheData

RollupData contains the pending rollups waiting to be flushed to the db

type RollupsWriteCache added in v0.29.8

type RollupsWriteCache struct {
	DB
	// contains filtered or unexported fields
}

RollupsWriteCache stores information needed to update bucket bandwidth rollups

func NewRollupsWriteCache added in v0.29.8

func NewRollupsWriteCache(log *zap.Logger, db DB, batchSize int) *RollupsWriteCache

NewRollupsWriteCache creates an RollupsWriteCache

func (*RollupsWriteCache) CloseAndFlush added in v0.31.0

func (cache *RollupsWriteCache) CloseAndFlush(ctx context.Context) error

CloseAndFlush flushes anything in the cache and marks the cache as stopped.

func (*RollupsWriteCache) CurrentData added in v0.29.8

func (cache *RollupsWriteCache) CurrentData() RollupData

CurrentData returns the contents of the cache.

func (*RollupsWriteCache) CurrentSize added in v0.29.8

func (cache *RollupsWriteCache) CurrentSize() int

CurrentSize returns the current size of the cache.

func (*RollupsWriteCache) Flush added in v0.31.0

func (cache *RollupsWriteCache) Flush(ctx context.Context)

Flush resets cache then flushes the everything in the rollups write cache to the database

func (*RollupsWriteCache) OnNextFlush added in v0.29.8

func (cache *RollupsWriteCache) OnNextFlush() <-chan struct{}

OnNextFlush waits until the next time a flush call is made, then closes the returned channel.

func (*RollupsWriteCache) UpdateBucketBandwidthAllocation added in v0.29.8

func (cache *RollupsWriteCache) UpdateBucketBandwidthAllocation(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error

UpdateBucketBandwidthAllocation updates the rollups cache adding allocated data for a bucket bandwidth rollup

func (*RollupsWriteCache) UpdateBucketBandwidthInline added in v0.29.8

func (cache *RollupsWriteCache) UpdateBucketBandwidthInline(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error

UpdateBucketBandwidthInline updates the rollups cache adding inline data for a bucket bandwidth rollup

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service for creating order limits.

architecture: Service

func NewService

func NewService(
	log *zap.Logger, satellite signing.Signer, overlay *overlay.Service,
	orders DB, orderExpiration time.Duration, satelliteAddress *pb.NodeAddress,
	repairMaxExcessRateOptimalThreshold float64, nodeStatusLogging bool,
) *Service

NewService creates new service for creating order limits.

func (*Service) CreateAuditOrderLimit added in v0.13.0

func (service *Service) CreateAuditOrderLimit(ctx context.Context, bucketID []byte, nodeID storj.NodeID, pieceNum int32, rootPieceID storj.PieceID, shareSize int32) (limit *pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error)

CreateAuditOrderLimit creates an order limit for auditing a single the piece from a pointer.

func (*Service) CreateAuditOrderLimits

func (service *Service) CreateAuditOrderLimits(ctx context.Context, bucketID []byte, pointer *pb.Pointer, skip map[storj.NodeID]bool) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error)

CreateAuditOrderLimits creates the order limits for auditing the pieces of pointer.

func (*Service) CreateDeleteOrderLimits

func (service *Service) CreateDeleteOrderLimits(ctx context.Context, bucketID []byte, pointer *pb.Pointer) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error)

CreateDeleteOrderLimits creates the order limits for deleting the pieces of pointer.

func (*Service) CreateGetOrderLimits

func (service *Service) CreateGetOrderLimits(ctx context.Context, bucketID []byte, pointer *pb.Pointer) (_ []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, err error)

CreateGetOrderLimits creates the order limits for downloading the pieces of pointer.

func (*Service) CreateGetOrderLimitsOld added in v0.33.2

func (service *Service) CreateGetOrderLimitsOld(ctx context.Context, bucketID []byte, pointer *pb.Pointer) (_ []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, err error)

CreateGetOrderLimitsOld creates the order limits for downloading the pieces of pointer for backwards compatibility

func (*Service) CreateGetRepairOrderLimits

func (service *Service) CreateGetRepairOrderLimits(ctx context.Context, bucketID []byte, pointer *pb.Pointer, healthy []*pb.RemotePiece) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error)

CreateGetRepairOrderLimits creates the order limits for downloading the healthy pieces of pointer as the source for repair.

The length of the returned orders slice is the total number of pieces of the segment, setting to null the ones which don't correspond to a healthy piece. CreateGetRepairOrderLimits creates the order limits for downloading the healthy pieces of pointer as the source for repair.

func (*Service) CreateGracefulExitPutOrderLimit added in v0.24.0

func (service *Service) CreateGracefulExitPutOrderLimit(ctx context.Context, bucketID []byte, nodeID storj.NodeID, pieceNum int32, rootPieceID storj.PieceID, shareSize int32) (limit *pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error)

CreateGracefulExitPutOrderLimit creates an order limit for graceful exit put transfers.

func (*Service) CreatePutOrderLimits

func (service *Service) CreatePutOrderLimits(ctx context.Context, bucketID []byte, nodes []*overlay.SelectedNode, expiration time.Time, maxPieceSize int64) (_ storj.PieceID, _ []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, err error)

CreatePutOrderLimits creates the order limits for uploading pieces to nodes.

func (*Service) CreatePutRepairOrderLimits

func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, bucketID []byte, pointer *pb.Pointer, getOrderLimits []*pb.AddressedOrderLimit, newNodes []*overlay.SelectedNode) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error)

CreatePutRepairOrderLimits creates the order limits for uploading the repaired pieces of pointer to newNodes.

func (*Service) RandomSampleOfOrderLimits added in v0.33.2

func (service *Service) RandomSampleOfOrderLimits(limits []*pb.AddressedOrderLimit, sampleSize int) ([]*pb.AddressedOrderLimit, error)

RandomSampleOfOrderLimits returns a random sample of the order limits

func (*Service) UpdateGetInlineOrder added in v0.9.0

func (service *Service) UpdateGetInlineOrder(ctx context.Context, projectID uuid.UUID, bucketName []byte, amount int64) (err error)

UpdateGetInlineOrder updates amount of inline GET bandwidth for given bucket

func (*Service) UpdatePutInlineOrder added in v0.9.0

func (service *Service) UpdatePutInlineOrder(ctx context.Context, projectID uuid.UUID, bucketName []byte, amount int64) (err error)

UpdatePutInlineOrder updates amount of inline PUT bandwidth for given bucket

func (*Service) VerifyOrderLimitSignature

func (service *Service) VerifyOrderLimitSignature(ctx context.Context, signed *pb.OrderLimit) (err error)

VerifyOrderLimitSignature verifies that the signature inside order limit belongs to the satellite.

type StoragenodeBandwidthRollup added in v0.30.0

type StoragenodeBandwidthRollup struct {
	NodeID    storj.NodeID
	Action    pb.PieceAction
	Allocated int64
	Settled   int64
}

StoragenodeBandwidthRollup contains all the info needed for a storagenode bandwidth rollup

type Transaction added in v0.30.0

type Transaction interface {
	// UpdateBucketBandwidthBatch updates all the bandwidth rollups in the database
	UpdateBucketBandwidthBatch(ctx context.Context, intervalStart time.Time, rollups []BucketBandwidthRollup) error

	// UpdateStoragenodeBandwidthBatch updates all the bandwidth rollups in the database
	UpdateStoragenodeBandwidthBatch(ctx context.Context, intervalStart time.Time, rollups []StoragenodeBandwidthRollup) error

	// CreateConsumedSerialsBatch creates the batch of ConsumedSerials.
	CreateConsumedSerialsBatch(ctx context.Context, consumedSerials []ConsumedSerial) (err error)

	// HasConsumedSerial returns true if the node and serial number have been consumed.
	HasConsumedSerial(ctx context.Context, nodeID storj.NodeID, serialNumber storj.SerialNumber) (bool, error)
}

Transaction represents a database transaction but with higher level actions.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL