Documentation ¶
Index ¶
- Variables
- func SortBucketBandwidthRollups(rollups []BucketBandwidthRollup)
- func SortStoragenodeBandwidthRollups(rollups []StoragenodeBandwidthRollup)
- func SplitBucketID(bucketID []byte) (projectID uuid.UUID, bucketName []byte, err error)
- type BucketBandwidthRollup
- type CacheData
- type CacheKey
- type Chore
- type Config
- type ConsumedSerial
- type DB
- type Endpoint
- type PendingSerial
- type ProcessOrderRequest
- type ProcessOrderResponse
- type Queue
- type RollupData
- type RollupsWriteCache
- func (cache *RollupsWriteCache) CloseAndFlush(ctx context.Context) error
- func (cache *RollupsWriteCache) CurrentData() RollupData
- func (cache *RollupsWriteCache) CurrentSize() int
- func (cache *RollupsWriteCache) Flush(ctx context.Context)
- func (cache *RollupsWriteCache) OnNextFlush() <-chan struct{}
- func (cache *RollupsWriteCache) UpdateBucketBandwidthAllocation(ctx context.Context, projectID uuid.UUID, bucketName []byte, ...) error
- func (cache *RollupsWriteCache) UpdateBucketBandwidthInline(ctx context.Context, projectID uuid.UUID, bucketName []byte, ...) error
- type Service
- func (service *Service) CreateAuditOrderLimit(ctx context.Context, bucketID []byte, nodeID storj.NodeID, pieceNum int32, ...) (limit *pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error)
- func (service *Service) CreateAuditOrderLimits(ctx context.Context, bucketID []byte, pointer *pb.Pointer, ...) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error)
- func (service *Service) CreateDeleteOrderLimits(ctx context.Context, bucketID []byte, pointer *pb.Pointer) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error)
- func (service *Service) CreateGetOrderLimits(ctx context.Context, bucketID []byte, pointer *pb.Pointer) (_ []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, err error)
- func (service *Service) CreateGetRepairOrderLimits(ctx context.Context, bucketID []byte, pointer *pb.Pointer, ...) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error)
- func (service *Service) CreateGracefulExitPutOrderLimit(ctx context.Context, bucketID []byte, nodeID storj.NodeID, pieceNum int32, ...) (limit *pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error)
- func (service *Service) CreatePutOrderLimits(ctx context.Context, bucketID []byte, nodes []*overlay.SelectedNode, ...) (_ storj.PieceID, _ []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, ...)
- func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, bucketID []byte, pointer *pb.Pointer, ...) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error)
- func (service *Service) RandomSampleOfOrderLimits(limits []*pb.AddressedOrderLimit, sampleSize int) ([]*pb.AddressedOrderLimit, error)
- func (service *Service) UpdateGetInlineOrder(ctx context.Context, projectID uuid.UUID, bucketName []byte, amount int64) (err error)
- func (service *Service) UpdatePutInlineOrder(ctx context.Context, projectID uuid.UUID, bucketName []byte, amount int64) (err error)
- func (service *Service) VerifyOrderLimitSignature(ctx context.Context, signed *pb.OrderLimit) (err error)
- type StoragenodeBandwidthRollup
- type Transaction
Constants ¶
This section is empty.
Variables ¶
var ( // Error the default orders errs class Error = errs.Class("orders error") // ErrUsingSerialNumber error class for serial number ErrUsingSerialNumber = errs.Class("serial number") )
var ErrDownloadFailedNotEnoughPieces = errs.Class("not enough pieces for download")
ErrDownloadFailedNotEnoughPieces is returned when download failed due to missing pieces
Functions ¶
func SortBucketBandwidthRollups ¶ added in v0.30.0
func SortBucketBandwidthRollups(rollups []BucketBandwidthRollup)
SortBucketBandwidthRollups sorts the rollups
func SortStoragenodeBandwidthRollups ¶ added in v0.30.0
func SortStoragenodeBandwidthRollups(rollups []StoragenodeBandwidthRollup)
SortStoragenodeBandwidthRollups sorts the rollups
Types ¶
type BucketBandwidthRollup ¶ added in v0.30.0
type BucketBandwidthRollup struct { ProjectID uuid.UUID BucketName string Action pb.PieceAction Inline int64 Allocated int64 Settled int64 }
BucketBandwidthRollup contains all the info needed for a bucket bandwidth rollup
type CacheData ¶ added in v0.29.8
CacheData stores the amount of inline and allocated data for a bucket bandwidth rollup
type CacheKey ¶ added in v0.29.8
type CacheKey struct { ProjectID uuid.UUID BucketName string Action pb.PieceAction }
CacheKey is the key information for the cached map below
type Chore ¶ added in v0.29.8
Chore for flushing orders write cache to the database.
architecture: Chore
func NewChore ¶ added in v0.29.8
func NewChore(log *zap.Logger, rollupsWriteCache *RollupsWriteCache, config Config) *Chore
NewChore creates new chore for flushing the orders write cache to the database.
type Config ¶ added in v0.14.0
type Config struct { Expiration time.Duration `help:"how long until an order expires" default:"48h"` // 2 days SettlementBatchSize int `help:"how many orders to batch per transaction" default:"250"` FlushBatchSize int `` /* 127-byte string literal not displayed */ FlushInterval time.Duration `help:"how often to flush the rollups write cache to the database" devDefault:"30s" releaseDefault:"1m"` ReportedRollupsReadBatchSize int `help:"how many records to read in a single transaction when calculating billable bandwidth" default:"1000"` NodeStatusLogging bool `help:"log the offline/disqualification status of nodes" default:"false"` }
Config is a configuration struct for orders Service.
type ConsumedSerial ¶ added in v0.34.1
type ConsumedSerial struct { NodeID storj.NodeID SerialNumber storj.SerialNumber ExpiresAt time.Time }
ConsumedSerial is a serial that has been consumed and its bandwidth recorded.
type DB ¶
type DB interface { // CreateSerialInfo creates serial number entry in database. CreateSerialInfo(ctx context.Context, serialNumber storj.SerialNumber, bucketID []byte, limitExpiration time.Time) error // UseSerialNumber creates a used serial number entry in database from an // existing serial number. // It returns the bucket ID associated to serialNumber. UseSerialNumber(ctx context.Context, serialNumber storj.SerialNumber, storageNodeID storj.NodeID) ([]byte, error) // UnuseSerialNumber removes pair serial number -> storage node id from database UnuseSerialNumber(ctx context.Context, serialNumber storj.SerialNumber, storageNodeID storj.NodeID) error // DeleteExpiredSerials deletes all expired serials in serial_number, used_serials, and consumed_serials table. DeleteExpiredSerials(ctx context.Context, now time.Time) (_ int, err error) // DeleteExpiredConsumedSerials deletes all expired serials in the consumed_serials table. DeleteExpiredConsumedSerials(ctx context.Context, now time.Time) (_ int, err error) // UpdateBucketBandwidthAllocation updates 'allocated' bandwidth for given bucket UpdateBucketBandwidthAllocation(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error // UpdateBucketBandwidthSettle updates 'settled' bandwidth for given bucket UpdateBucketBandwidthSettle(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error // UpdateBucketBandwidthInline updates 'inline' bandwidth for given bucket UpdateBucketBandwidthInline(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error // UpdateStoragenodeBandwidthSettle updates 'settled' bandwidth for given storage node UpdateStoragenodeBandwidthSettle(ctx context.Context, storageNode storj.NodeID, action pb.PieceAction, amount int64, intervalStart time.Time) error // GetBucketBandwidth gets total bucket bandwidth from period of time GetBucketBandwidth(ctx context.Context, projectID uuid.UUID, bucketName []byte, from, to time.Time) (int64, error) // GetStorageNodeBandwidth gets total storage node bandwidth from period of time GetStorageNodeBandwidth(ctx context.Context, nodeID storj.NodeID, from, to time.Time) (int64, error) // ProcessOrders takes a list of order requests and processes them in a batch ProcessOrders(ctx context.Context, requests []*ProcessOrderRequest) (responses []*ProcessOrderResponse, err error) // WithTransaction runs the callback and provides it with a Transaction. WithTransaction(ctx context.Context, cb func(ctx context.Context, tx Transaction) error) error // WithQueue runs the callback and provides it with a Queue. When the callback returns with // no error, any pending serials returned by the queue are removed from it. WithQueue(ctx context.Context, cb func(ctx context.Context, queue Queue) error) error }
DB implements saving order after receiving from storage node
architecture: Database
type Endpoint ¶
type Endpoint struct { DB DB // contains filtered or unexported fields }
Endpoint for orders receiving
architecture: Endpoint
func NewEndpoint ¶
func NewEndpoint(log *zap.Logger, satelliteSignee signing.Signee, db DB, settlementBatchSize int) *Endpoint
NewEndpoint new orders receiving endpoint
func (*Endpoint) Settlement ¶
func (endpoint *Endpoint) Settlement(stream pb.DRPCOrders_SettlementStream) (err error)
Settlement receives orders and handles them in batches.
type PendingSerial ¶ added in v0.34.1
type PendingSerial struct { NodeID storj.NodeID BucketID []byte Action uint SerialNumber storj.SerialNumber ExpiresAt time.Time Settled uint64 }
PendingSerial is a serial number reported by a storagenode waiting to be settled
type ProcessOrderRequest ¶ added in v0.18.0
type ProcessOrderRequest struct { Order *pb.Order OrderLimit *pb.OrderLimit }
ProcessOrderRequest for batch order processing
type ProcessOrderResponse ¶ added in v0.18.0
type ProcessOrderResponse struct { SerialNumber storj.SerialNumber Status pb.SettlementResponse_Status }
ProcessOrderResponse for batch order processing responses
type Queue ¶ added in v0.34.1
type Queue interface { // GetPendingSerialsBatch returns a batch of pending serials containing at most size // entries. It returns a boolean indicating true if the queue is empty. GetPendingSerialsBatch(ctx context.Context, size int) ([]PendingSerial, bool, error) }
Queue is an abstraction around a queue of pending serials.
type RollupData ¶ added in v0.29.8
RollupData contains the pending rollups waiting to be flushed to the db
type RollupsWriteCache ¶ added in v0.29.8
type RollupsWriteCache struct { DB // contains filtered or unexported fields }
RollupsWriteCache stores information needed to update bucket bandwidth rollups
func NewRollupsWriteCache ¶ added in v0.29.8
func NewRollupsWriteCache(log *zap.Logger, db DB, batchSize int) *RollupsWriteCache
NewRollupsWriteCache creates an RollupsWriteCache
func (*RollupsWriteCache) CloseAndFlush ¶ added in v0.31.0
func (cache *RollupsWriteCache) CloseAndFlush(ctx context.Context) error
CloseAndFlush flushes anything in the cache and marks the cache as stopped.
func (*RollupsWriteCache) CurrentData ¶ added in v0.29.8
func (cache *RollupsWriteCache) CurrentData() RollupData
CurrentData returns the contents of the cache.
func (*RollupsWriteCache) CurrentSize ¶ added in v0.29.8
func (cache *RollupsWriteCache) CurrentSize() int
CurrentSize returns the current size of the cache.
func (*RollupsWriteCache) Flush ¶ added in v0.31.0
func (cache *RollupsWriteCache) Flush(ctx context.Context)
Flush resets cache then flushes the everything in the rollups write cache to the database
func (*RollupsWriteCache) OnNextFlush ¶ added in v0.29.8
func (cache *RollupsWriteCache) OnNextFlush() <-chan struct{}
OnNextFlush waits until the next time a flush call is made, then closes the returned channel.
func (*RollupsWriteCache) UpdateBucketBandwidthAllocation ¶ added in v0.29.8
func (cache *RollupsWriteCache) UpdateBucketBandwidthAllocation(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error
UpdateBucketBandwidthAllocation updates the rollups cache adding allocated data for a bucket bandwidth rollup
func (*RollupsWriteCache) UpdateBucketBandwidthInline ¶ added in v0.29.8
func (cache *RollupsWriteCache) UpdateBucketBandwidthInline(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error
UpdateBucketBandwidthInline updates the rollups cache adding inline data for a bucket bandwidth rollup
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service for creating order limits.
architecture: Service
func NewService ¶
func NewService( log *zap.Logger, satellite signing.Signer, overlay *overlay.Service, orders DB, orderExpiration time.Duration, satelliteAddress *pb.NodeAddress, repairMaxExcessRateOptimalThreshold float64, nodeStatusLogging bool, ) *Service
NewService creates new service for creating order limits.
func (*Service) CreateAuditOrderLimit ¶ added in v0.13.0
func (service *Service) CreateAuditOrderLimit(ctx context.Context, bucketID []byte, nodeID storj.NodeID, pieceNum int32, rootPieceID storj.PieceID, shareSize int32) (limit *pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error)
CreateAuditOrderLimit creates an order limit for auditing a single the piece from a pointer.
func (*Service) CreateAuditOrderLimits ¶
func (service *Service) CreateAuditOrderLimits(ctx context.Context, bucketID []byte, pointer *pb.Pointer, skip map[storj.NodeID]bool) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error)
CreateAuditOrderLimits creates the order limits for auditing the pieces of pointer.
func (*Service) CreateDeleteOrderLimits ¶
func (service *Service) CreateDeleteOrderLimits(ctx context.Context, bucketID []byte, pointer *pb.Pointer) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error)
CreateDeleteOrderLimits creates the order limits for deleting the pieces of pointer.
func (*Service) CreateGetOrderLimits ¶
func (service *Service) CreateGetOrderLimits(ctx context.Context, bucketID []byte, pointer *pb.Pointer) (_ []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, err error)
CreateGetOrderLimits creates the order limits for downloading the pieces of pointer.
func (*Service) CreateGetRepairOrderLimits ¶
func (service *Service) CreateGetRepairOrderLimits(ctx context.Context, bucketID []byte, pointer *pb.Pointer, healthy []*pb.RemotePiece) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error)
CreateGetRepairOrderLimits creates the order limits for downloading the healthy pieces of pointer as the source for repair.
The length of the returned orders slice is the total number of pieces of the segment, setting to null the ones which don't correspond to a healthy piece. CreateGetRepairOrderLimits creates the order limits for downloading the healthy pieces of pointer as the source for repair.
func (*Service) CreateGracefulExitPutOrderLimit ¶ added in v0.24.0
func (service *Service) CreateGracefulExitPutOrderLimit(ctx context.Context, bucketID []byte, nodeID storj.NodeID, pieceNum int32, rootPieceID storj.PieceID, shareSize int32) (limit *pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error)
CreateGracefulExitPutOrderLimit creates an order limit for graceful exit put transfers.
func (*Service) CreatePutOrderLimits ¶
func (service *Service) CreatePutOrderLimits(ctx context.Context, bucketID []byte, nodes []*overlay.SelectedNode, expiration time.Time, maxPieceSize int64) (_ storj.PieceID, _ []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, err error)
CreatePutOrderLimits creates the order limits for uploading pieces to nodes.
func (*Service) CreatePutRepairOrderLimits ¶
func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, bucketID []byte, pointer *pb.Pointer, getOrderLimits []*pb.AddressedOrderLimit, newNodes []*overlay.SelectedNode) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error)
CreatePutRepairOrderLimits creates the order limits for uploading the repaired pieces of pointer to newNodes.
func (*Service) RandomSampleOfOrderLimits ¶ added in v0.33.2
func (service *Service) RandomSampleOfOrderLimits(limits []*pb.AddressedOrderLimit, sampleSize int) ([]*pb.AddressedOrderLimit, error)
RandomSampleOfOrderLimits returns a random sample of the order limits
func (*Service) UpdateGetInlineOrder ¶ added in v0.9.0
func (service *Service) UpdateGetInlineOrder(ctx context.Context, projectID uuid.UUID, bucketName []byte, amount int64) (err error)
UpdateGetInlineOrder updates amount of inline GET bandwidth for given bucket
func (*Service) UpdatePutInlineOrder ¶ added in v0.9.0
func (service *Service) UpdatePutInlineOrder(ctx context.Context, projectID uuid.UUID, bucketName []byte, amount int64) (err error)
UpdatePutInlineOrder updates amount of inline PUT bandwidth for given bucket
func (*Service) VerifyOrderLimitSignature ¶
func (service *Service) VerifyOrderLimitSignature(ctx context.Context, signed *pb.OrderLimit) (err error)
VerifyOrderLimitSignature verifies that the signature inside order limit belongs to the satellite.
type StoragenodeBandwidthRollup ¶ added in v0.30.0
type StoragenodeBandwidthRollup struct { NodeID storj.NodeID Action pb.PieceAction Allocated int64 Settled int64 }
StoragenodeBandwidthRollup contains all the info needed for a storagenode bandwidth rollup
type Transaction ¶ added in v0.30.0
type Transaction interface { // UpdateBucketBandwidthBatch updates all the bandwidth rollups in the database UpdateBucketBandwidthBatch(ctx context.Context, intervalStart time.Time, rollups []BucketBandwidthRollup) error // UpdateStoragenodeBandwidthBatch updates all the bandwidth rollups in the database UpdateStoragenodeBandwidthBatch(ctx context.Context, intervalStart time.Time, rollups []StoragenodeBandwidthRollup) error // CreateConsumedSerialsBatch creates the batch of ConsumedSerials. CreateConsumedSerialsBatch(ctx context.Context, consumedSerials []ConsumedSerial) (err error) // HasConsumedSerial returns true if the node and serial number have been consumed. HasConsumedSerial(ctx context.Context, nodeID storj.NodeID, serialNumber storj.SerialNumber) (bool, error) }
Transaction represents a database transaction but with higher level actions.