orders

package
v0.31.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 29, 2020 License: AGPL-3.0 Imports: 19 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// Error the default orders errs class
	Error = errs.Class("orders error")
	// ErrUsingSerialNumber error class for serial number
	ErrUsingSerialNumber = errs.Class("serial number")
)
View Source
var ErrDownloadFailedNotEnoughPieces = errs.Class("not enough pieces for download")

ErrDownloadFailedNotEnoughPieces is returned when download failed due to missing pieces

Functions

func SortBucketBandwidthRollups added in v0.30.0

func SortBucketBandwidthRollups(rollups []BucketBandwidthRollup)

SortBucketBandwidthRollups sorts the rollups

func SortStoragenodeBandwidthRollups added in v0.30.0

func SortStoragenodeBandwidthRollups(rollups []StoragenodeBandwidthRollup)

SortStoragenodeBandwidthRollups sorts the rollups

func SplitBucketID added in v0.15.0

func SplitBucketID(bucketID []byte) (projectID *uuid.UUID, bucketName []byte, err error)

SplitBucketID takes a bucketID, splits on /, and returns a projectID and bucketName

Types

type BucketBandwidthRollup added in v0.30.0

type BucketBandwidthRollup struct {
	ProjectID  uuid.UUID
	BucketName string
	Action     pb.PieceAction
	Inline     int64
	Allocated  int64
	Settled    int64
}

BucketBandwidthRollup contains all the info needed for a bucket bandwidth rollup

type CacheData added in v0.29.8

type CacheData struct {
	Inline    int64
	Allocated int64
}

CacheData stores the amount of inline and allocated data for a bucket bandwidth rollup

type CacheKey added in v0.29.8

type CacheKey struct {
	ProjectID  uuid.UUID
	BucketName string
	Action     pb.PieceAction
}

CacheKey is the key information for the cached map below

type Chore added in v0.29.8

type Chore struct {
	Loop *sync2.Cycle
	// contains filtered or unexported fields
}

Chore for flushing orders write cache to the database.

architecture: Chore

func NewChore added in v0.29.8

func NewChore(log *zap.Logger, rollupsWriteCache *RollupsWriteCache, config Config) *Chore

NewChore creates new chore for flushing the orders write cache to the database.

func (*Chore) Close added in v0.29.8

func (chore *Chore) Close() error

Close stops the orders write cache chore.

func (*Chore) Run added in v0.29.8

func (chore *Chore) Run(ctx context.Context) (err error)

Run starts the orders write cache chore.

type Config added in v0.14.0

type Config struct {
	Expiration                   time.Duration `help:"how long until an order expires" default:"48h"` // 2 days
	SettlementBatchSize          int           `help:"how many orders to batch per transaction" default:"250"`
	FlushBatchSize               int           `` /* 127-byte string literal not displayed */
	FlushInterval                time.Duration `help:"how often to flush the rollups write cache to the database" devDefault:"30s" releaseDefault:"1m"`
	ReportedRollupsReadBatchSize int           `help:"how many records to read in a single transaction when calculating billable bandwidth" default:"1000"`
	NodeStatusLogging            bool          `help:"log the offline/disqualification status of nodes" default:"false"`
}

Config is a configuration struct for orders Service.

type DB

type DB interface {
	// CreateSerialInfo creates serial number entry in database
	CreateSerialInfo(ctx context.Context, serialNumber storj.SerialNumber, bucketID []byte, limitExpiration time.Time) error
	// UseSerialNumber creates serial number entry in database
	UseSerialNumber(ctx context.Context, serialNumber storj.SerialNumber, storageNodeID storj.NodeID) ([]byte, error)
	// UnuseSerialNumber removes pair serial number -> storage node id from database
	UnuseSerialNumber(ctx context.Context, serialNumber storj.SerialNumber, storageNodeID storj.NodeID) error
	// DeleteExpiredSerials deletes all expired serials in serial_number and used_serials table.
	DeleteExpiredSerials(ctx context.Context, now time.Time) (_ int, err error)

	// UpdateBucketBandwidthAllocation updates 'allocated' bandwidth for given bucket
	UpdateBucketBandwidthAllocation(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error
	// UpdateBucketBandwidthSettle updates 'settled' bandwidth for given bucket
	UpdateBucketBandwidthSettle(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error
	// UpdateBucketBandwidthInline updates 'inline' bandwidth for given bucket
	UpdateBucketBandwidthInline(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error

	// UpdateStoragenodeBandwidthSettle updates 'settled' bandwidth for given storage node
	UpdateStoragenodeBandwidthSettle(ctx context.Context, storageNode storj.NodeID, action pb.PieceAction, amount int64, intervalStart time.Time) error

	// GetBucketBandwidth gets total bucket bandwidth from period of time
	GetBucketBandwidth(ctx context.Context, projectID uuid.UUID, bucketName []byte, from, to time.Time) (int64, error)
	// GetStorageNodeBandwidth gets total storage node bandwidth from period of time
	GetStorageNodeBandwidth(ctx context.Context, nodeID storj.NodeID, from, to time.Time) (int64, error)

	// ProcessOrders takes a list of order requests and processes them in a batch
	ProcessOrders(ctx context.Context, requests []*ProcessOrderRequest, observedAt time.Time) (responses []*ProcessOrderResponse, err error)

	// GetBillableBandwidth gets total billable (expired reported serial) bandwidth for nodes and buckets for all actions.
	GetBillableBandwidth(ctx context.Context, now time.Time) (bucketRollups []BucketBandwidthRollup, storagenodeRollups []StoragenodeBandwidthRollup, err error)

	// WithTransaction runs the callback and provides it with a Transaction.
	WithTransaction(ctx context.Context, cb func(ctx context.Context, tx Transaction) error) error
}

DB implements saving order after receiving from storage node

architecture: Database

type Endpoint

type Endpoint struct {
	DB DB
	// contains filtered or unexported fields
}

Endpoint for orders receiving

architecture: Endpoint

func NewEndpoint

func NewEndpoint(log *zap.Logger, satelliteSignee signing.Signee, db DB, settlementBatchSize int) *Endpoint

NewEndpoint new orders receiving endpoint

func (*Endpoint) DRPC added in v0.21.0

func (endpoint *Endpoint) DRPC() pb.DRPCOrdersServer

DRPC returns a DRPC form of the endpoint.

func (*Endpoint) Settlement

func (endpoint *Endpoint) Settlement(stream pb.Orders_SettlementServer) (err error)

Settlement receives orders and handles them in batches

type ProcessOrderRequest added in v0.18.0

type ProcessOrderRequest struct {
	Order      *pb.Order
	OrderLimit *pb.OrderLimit
}

ProcessOrderRequest for batch order processing

type ProcessOrderResponse added in v0.18.0

type ProcessOrderResponse struct {
	SerialNumber storj.SerialNumber
	Status       pb.SettlementResponse_Status
}

ProcessOrderResponse for batch order processing responses

type RollupData added in v0.29.8

type RollupData map[CacheKey]CacheData

RollupData contains the pending rollups waiting to be flushed to the db

type RollupsWriteCache added in v0.29.8

type RollupsWriteCache struct {
	DB
	// contains filtered or unexported fields
}

RollupsWriteCache stores information needed to update bucket bandwidth rollups

func NewRollupsWriteCache added in v0.29.8

func NewRollupsWriteCache(log *zap.Logger, db DB, batchSize int) *RollupsWriteCache

NewRollupsWriteCache creates an RollupsWriteCache

func (*RollupsWriteCache) CloseAndFlush added in v0.31.0

func (cache *RollupsWriteCache) CloseAndFlush(ctx context.Context) error

CloseAndFlush flushes anything in the cache and marks the cache as stopped.

func (*RollupsWriteCache) CurrentData added in v0.29.8

func (cache *RollupsWriteCache) CurrentData() RollupData

CurrentData returns the contents of the cache.

func (*RollupsWriteCache) CurrentSize added in v0.29.8

func (cache *RollupsWriteCache) CurrentSize() int

CurrentSize returns the current size of the cache.

func (*RollupsWriteCache) Flush added in v0.31.0

func (cache *RollupsWriteCache) Flush(ctx context.Context)

Flush resets cache then flushes the everything in the rollups write cache to the database

func (*RollupsWriteCache) OnNextFlush added in v0.29.8

func (cache *RollupsWriteCache) OnNextFlush() <-chan struct{}

OnNextFlush waits until the next time a flush call is made, then closes the returned channel.

func (*RollupsWriteCache) UpdateBucketBandwidthAllocation added in v0.29.8

func (cache *RollupsWriteCache) UpdateBucketBandwidthAllocation(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error

UpdateBucketBandwidthAllocation updates the rollups cache adding allocated data for a bucket bandwidth rollup

func (*RollupsWriteCache) UpdateBucketBandwidthInline added in v0.29.8

func (cache *RollupsWriteCache) UpdateBucketBandwidthInline(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error

UpdateBucketBandwidthInline updates the rollups cache adding inline data for a bucket bandwidth rollup

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service for creating order limits.

architecture: Service

func NewService

func NewService(
	log *zap.Logger, satellite signing.Signer, overlay *overlay.Service,
	orders DB, orderExpiration time.Duration, satelliteAddress *pb.NodeAddress,
	repairMaxExcessRateOptimalThreshold float64, nodeStatusLogging bool,
) *Service

NewService creates new service for creating order limits.

func (*Service) CreateAuditOrderLimit added in v0.13.0

func (service *Service) CreateAuditOrderLimit(ctx context.Context, bucketID []byte, nodeID storj.NodeID, pieceNum int32, rootPieceID storj.PieceID, shareSize int32) (limit *pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error)

CreateAuditOrderLimit creates an order limit for auditing a single the piece from a pointer.

func (*Service) CreateAuditOrderLimits

func (service *Service) CreateAuditOrderLimits(ctx context.Context, bucketID []byte, pointer *pb.Pointer, skip map[storj.NodeID]bool) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error)

CreateAuditOrderLimits creates the order limits for auditing the pieces of pointer.

func (*Service) CreateDeleteOrderLimits

func (service *Service) CreateDeleteOrderLimits(ctx context.Context, bucketID []byte, pointer *pb.Pointer) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error)

CreateDeleteOrderLimits creates the order limits for deleting the pieces of pointer.

func (*Service) CreateGetOrderLimits

func (service *Service) CreateGetOrderLimits(ctx context.Context, bucketID []byte, pointer *pb.Pointer) (_ []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, err error)

CreateGetOrderLimits creates the order limits for downloading the pieces of pointer.

func (*Service) CreateGetRepairOrderLimits

func (service *Service) CreateGetRepairOrderLimits(ctx context.Context, bucketID []byte, pointer *pb.Pointer, healthy []*pb.RemotePiece) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error)

CreateGetRepairOrderLimits creates the order limits for downloading the healthy pieces of pointer as the source for repair.

The length of the returned orders slice is the total number of pieces of the segment, setting to null the ones which don't correspond to a healthy piece. CreateGetRepairOrderLimits creates the order limits for downloading the healthy pieces of pointer as the source for repair.

func (*Service) CreateGracefulExitPutOrderLimit added in v0.24.0

func (service *Service) CreateGracefulExitPutOrderLimit(ctx context.Context, bucketID []byte, nodeID storj.NodeID, pieceNum int32, rootPieceID storj.PieceID, shareSize int32) (limit *pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error)

CreateGracefulExitPutOrderLimit creates an order limit for graceful exit put transfers.

func (*Service) CreatePutOrderLimits

func (service *Service) CreatePutOrderLimits(ctx context.Context, bucketID []byte, nodes []*pb.Node, expiration time.Time, maxPieceSize int64) (_ storj.PieceID, _ []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, err error)

CreatePutOrderLimits creates the order limits for uploading pieces to nodes.

func (*Service) CreatePutRepairOrderLimits

func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, bucketID []byte, pointer *pb.Pointer, getOrderLimits []*pb.AddressedOrderLimit, newNodes []*pb.Node) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error)

CreatePutRepairOrderLimits creates the order limits for uploading the repaired pieces of pointer to newNodes.

func (*Service) UpdateGetInlineOrder added in v0.9.0

func (service *Service) UpdateGetInlineOrder(ctx context.Context, projectID uuid.UUID, bucketName []byte, amount int64) (err error)

UpdateGetInlineOrder updates amount of inline GET bandwidth for given bucket

func (*Service) UpdatePutInlineOrder added in v0.9.0

func (service *Service) UpdatePutInlineOrder(ctx context.Context, projectID uuid.UUID, bucketName []byte, amount int64) (err error)

UpdatePutInlineOrder updates amount of inline PUT bandwidth for given bucket

func (*Service) VerifyOrderLimitSignature

func (service *Service) VerifyOrderLimitSignature(ctx context.Context, signed *pb.OrderLimit) (err error)

VerifyOrderLimitSignature verifies that the signature inside order limit belongs to the satellite.

type StoragenodeBandwidthRollup added in v0.30.0

type StoragenodeBandwidthRollup struct {
	NodeID    storj.NodeID
	Action    pb.PieceAction
	Allocated int64
	Settled   int64
}

StoragenodeBandwidthRollup contains all the info needed for a storagenode bandwidth rollup

type Transaction added in v0.30.0

type Transaction interface {
	// UpdateBucketBandwidthBatch updates all the bandwidth rollups in the database
	UpdateBucketBandwidthBatch(ctx context.Context, intervalStart time.Time, rollups []BucketBandwidthRollup) error
	// UpdateStoragenodeBandwidthBatch updates all the bandwidth rollups in the database
	UpdateStoragenodeBandwidthBatch(ctx context.Context, intervalStart time.Time, rollups []StoragenodeBandwidthRollup) error
	// DeleteExpiredReportedSerials deletes any expired reported serials as of now.
	DeleteExpiredReportedSerials(ctx context.Context, now time.Time) (err error)
}

Transaction represents a database transaction but with higher level actions.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL