Documentation ¶
Index ¶
- Variables
- func SortBucketBandwidthRollups(rollups []BucketBandwidthRollup)
- func SortStoragenodeBandwidthRollups(rollups []StoragenodeBandwidthRollup)
- func SplitBucketID(bucketID []byte) (projectID *uuid.UUID, bucketName []byte, err error)
- type BucketBandwidthRollup
- type CacheData
- type CacheKey
- type Chore
- type Config
- type DB
- type Endpoint
- type ProcessOrderRequest
- type ProcessOrderResponse
- type RollupData
- type RollupsWriteCache
- func (cache *RollupsWriteCache) CloseAndFlush(ctx context.Context) error
- func (cache *RollupsWriteCache) CurrentData() RollupData
- func (cache *RollupsWriteCache) CurrentSize() int
- func (cache *RollupsWriteCache) Flush(ctx context.Context)
- func (cache *RollupsWriteCache) OnNextFlush() <-chan struct{}
- func (cache *RollupsWriteCache) UpdateBucketBandwidthAllocation(ctx context.Context, projectID uuid.UUID, bucketName []byte, ...) error
- func (cache *RollupsWriteCache) UpdateBucketBandwidthInline(ctx context.Context, projectID uuid.UUID, bucketName []byte, ...) error
- type Service
- func (service *Service) CreateAuditOrderLimit(ctx context.Context, bucketID []byte, nodeID storj.NodeID, pieceNum int32, ...) (limit *pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error)
- func (service *Service) CreateAuditOrderLimits(ctx context.Context, bucketID []byte, pointer *pb.Pointer, ...) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error)
- func (service *Service) CreateDeleteOrderLimits(ctx context.Context, bucketID []byte, pointer *pb.Pointer) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error)
- func (service *Service) CreateGetOrderLimits(ctx context.Context, bucketID []byte, pointer *pb.Pointer) (_ []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, err error)
- func (service *Service) CreateGetRepairOrderLimits(ctx context.Context, bucketID []byte, pointer *pb.Pointer, ...) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error)
- func (service *Service) CreateGracefulExitPutOrderLimit(ctx context.Context, bucketID []byte, nodeID storj.NodeID, pieceNum int32, ...) (limit *pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error)
- func (service *Service) CreatePutOrderLimits(ctx context.Context, bucketID []byte, nodes []*pb.Node, expiration time.Time, ...) (_ storj.PieceID, _ []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, ...)
- func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, bucketID []byte, pointer *pb.Pointer, ...) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error)
- func (service *Service) UpdateGetInlineOrder(ctx context.Context, projectID uuid.UUID, bucketName []byte, amount int64) (err error)
- func (service *Service) UpdatePutInlineOrder(ctx context.Context, projectID uuid.UUID, bucketName []byte, amount int64) (err error)
- func (service *Service) VerifyOrderLimitSignature(ctx context.Context, signed *pb.OrderLimit) (err error)
- type StoragenodeBandwidthRollup
- type Transaction
Constants ¶
This section is empty.
Variables ¶
var ( // Error the default orders errs class Error = errs.Class("orders error") // ErrUsingSerialNumber error class for serial number ErrUsingSerialNumber = errs.Class("serial number") )
var ErrDownloadFailedNotEnoughPieces = errs.Class("not enough pieces for download")
ErrDownloadFailedNotEnoughPieces is returned when download failed due to missing pieces
Functions ¶
func SortBucketBandwidthRollups ¶ added in v0.30.0
func SortBucketBandwidthRollups(rollups []BucketBandwidthRollup)
SortBucketBandwidthRollups sorts the rollups
func SortStoragenodeBandwidthRollups ¶ added in v0.30.0
func SortStoragenodeBandwidthRollups(rollups []StoragenodeBandwidthRollup)
SortStoragenodeBandwidthRollups sorts the rollups
Types ¶
type BucketBandwidthRollup ¶ added in v0.30.0
type BucketBandwidthRollup struct { ProjectID uuid.UUID BucketName string Action pb.PieceAction Inline int64 Allocated int64 Settled int64 }
BucketBandwidthRollup contains all the info needed for a bucket bandwidth rollup
type CacheData ¶ added in v0.29.8
CacheData stores the amount of inline and allocated data for a bucket bandwidth rollup
type CacheKey ¶ added in v0.29.8
type CacheKey struct { ProjectID uuid.UUID BucketName string Action pb.PieceAction }
CacheKey is the key information for the cached map below
type Chore ¶ added in v0.29.8
Chore for flushing orders write cache to the database.
architecture: Chore
func NewChore ¶ added in v0.29.8
func NewChore(log *zap.Logger, rollupsWriteCache *RollupsWriteCache, config Config) *Chore
NewChore creates new chore for flushing the orders write cache to the database.
type Config ¶ added in v0.14.0
type Config struct { Expiration time.Duration `help:"how long until an order expires" default:"48h"` // 2 days SettlementBatchSize int `help:"how many orders to batch per transaction" default:"250"` FlushBatchSize int `` /* 127-byte string literal not displayed */ FlushInterval time.Duration `help:"how often to flush the rollups write cache to the database" devDefault:"30s" releaseDefault:"1m"` ReportedRollupsReadBatchSize int `help:"how many records to read in a single transaction when calculating billable bandwidth" default:"1000"` NodeStatusLogging bool `help:"log the offline/disqualification status of nodes" default:"false"` }
Config is a configuration struct for orders Service.
type DB ¶
type DB interface { // CreateSerialInfo creates serial number entry in database CreateSerialInfo(ctx context.Context, serialNumber storj.SerialNumber, bucketID []byte, limitExpiration time.Time) error // UseSerialNumber creates serial number entry in database UseSerialNumber(ctx context.Context, serialNumber storj.SerialNumber, storageNodeID storj.NodeID) ([]byte, error) // UnuseSerialNumber removes pair serial number -> storage node id from database UnuseSerialNumber(ctx context.Context, serialNumber storj.SerialNumber, storageNodeID storj.NodeID) error // DeleteExpiredSerials deletes all expired serials in serial_number and used_serials table. DeleteExpiredSerials(ctx context.Context, now time.Time) (_ int, err error) // UpdateBucketBandwidthAllocation updates 'allocated' bandwidth for given bucket UpdateBucketBandwidthAllocation(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error // UpdateBucketBandwidthSettle updates 'settled' bandwidth for given bucket UpdateBucketBandwidthSettle(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error // UpdateBucketBandwidthInline updates 'inline' bandwidth for given bucket UpdateBucketBandwidthInline(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error // UpdateStoragenodeBandwidthSettle updates 'settled' bandwidth for given storage node UpdateStoragenodeBandwidthSettle(ctx context.Context, storageNode storj.NodeID, action pb.PieceAction, amount int64, intervalStart time.Time) error // GetBucketBandwidth gets total bucket bandwidth from period of time GetBucketBandwidth(ctx context.Context, projectID uuid.UUID, bucketName []byte, from, to time.Time) (int64, error) // GetStorageNodeBandwidth gets total storage node bandwidth from period of time GetStorageNodeBandwidth(ctx context.Context, nodeID storj.NodeID, from, to time.Time) (int64, error) // ProcessOrders takes a list of order requests and processes them in a batch ProcessOrders(ctx context.Context, requests []*ProcessOrderRequest, observedAt time.Time) (responses []*ProcessOrderResponse, err error) // GetBillableBandwidth gets total billable (expired reported serial) bandwidth for nodes and buckets for all actions. GetBillableBandwidth(ctx context.Context, now time.Time) (bucketRollups []BucketBandwidthRollup, storagenodeRollups []StoragenodeBandwidthRollup, err error) // WithTransaction runs the callback and provides it with a Transaction. WithTransaction(ctx context.Context, cb func(ctx context.Context, tx Transaction) error) error }
DB implements saving order after receiving from storage node
architecture: Database
type Endpoint ¶
type Endpoint struct { DB DB // contains filtered or unexported fields }
Endpoint for orders receiving
architecture: Endpoint
func NewEndpoint ¶
func NewEndpoint(log *zap.Logger, satelliteSignee signing.Signee, db DB, settlementBatchSize int) *Endpoint
NewEndpoint new orders receiving endpoint
func (*Endpoint) DRPC ¶ added in v0.21.0
func (endpoint *Endpoint) DRPC() pb.DRPCOrdersServer
DRPC returns a DRPC form of the endpoint.
func (*Endpoint) Settlement ¶
func (endpoint *Endpoint) Settlement(stream pb.Orders_SettlementServer) (err error)
Settlement receives orders and handles them in batches
type ProcessOrderRequest ¶ added in v0.18.0
type ProcessOrderRequest struct { Order *pb.Order OrderLimit *pb.OrderLimit }
ProcessOrderRequest for batch order processing
type ProcessOrderResponse ¶ added in v0.18.0
type ProcessOrderResponse struct { SerialNumber storj.SerialNumber Status pb.SettlementResponse_Status }
ProcessOrderResponse for batch order processing responses
type RollupData ¶ added in v0.29.8
RollupData contains the pending rollups waiting to be flushed to the db
type RollupsWriteCache ¶ added in v0.29.8
type RollupsWriteCache struct { DB // contains filtered or unexported fields }
RollupsWriteCache stores information needed to update bucket bandwidth rollups
func NewRollupsWriteCache ¶ added in v0.29.8
func NewRollupsWriteCache(log *zap.Logger, db DB, batchSize int) *RollupsWriteCache
NewRollupsWriteCache creates an RollupsWriteCache
func (*RollupsWriteCache) CloseAndFlush ¶ added in v0.31.0
func (cache *RollupsWriteCache) CloseAndFlush(ctx context.Context) error
CloseAndFlush flushes anything in the cache and marks the cache as stopped.
func (*RollupsWriteCache) CurrentData ¶ added in v0.29.8
func (cache *RollupsWriteCache) CurrentData() RollupData
CurrentData returns the contents of the cache.
func (*RollupsWriteCache) CurrentSize ¶ added in v0.29.8
func (cache *RollupsWriteCache) CurrentSize() int
CurrentSize returns the current size of the cache.
func (*RollupsWriteCache) Flush ¶ added in v0.31.0
func (cache *RollupsWriteCache) Flush(ctx context.Context)
Flush resets cache then flushes the everything in the rollups write cache to the database
func (*RollupsWriteCache) OnNextFlush ¶ added in v0.29.8
func (cache *RollupsWriteCache) OnNextFlush() <-chan struct{}
OnNextFlush waits until the next time a flush call is made, then closes the returned channel.
func (*RollupsWriteCache) UpdateBucketBandwidthAllocation ¶ added in v0.29.8
func (cache *RollupsWriteCache) UpdateBucketBandwidthAllocation(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error
UpdateBucketBandwidthAllocation updates the rollups cache adding allocated data for a bucket bandwidth rollup
func (*RollupsWriteCache) UpdateBucketBandwidthInline ¶ added in v0.29.8
func (cache *RollupsWriteCache) UpdateBucketBandwidthInline(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error
UpdateBucketBandwidthInline updates the rollups cache adding inline data for a bucket bandwidth rollup
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service for creating order limits.
architecture: Service
func NewService ¶
func NewService( log *zap.Logger, satellite signing.Signer, overlay *overlay.Service, orders DB, orderExpiration time.Duration, satelliteAddress *pb.NodeAddress, repairMaxExcessRateOptimalThreshold float64, nodeStatusLogging bool, ) *Service
NewService creates new service for creating order limits.
func (*Service) CreateAuditOrderLimit ¶ added in v0.13.0
func (service *Service) CreateAuditOrderLimit(ctx context.Context, bucketID []byte, nodeID storj.NodeID, pieceNum int32, rootPieceID storj.PieceID, shareSize int32) (limit *pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error)
CreateAuditOrderLimit creates an order limit for auditing a single the piece from a pointer.
func (*Service) CreateAuditOrderLimits ¶
func (service *Service) CreateAuditOrderLimits(ctx context.Context, bucketID []byte, pointer *pb.Pointer, skip map[storj.NodeID]bool) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error)
CreateAuditOrderLimits creates the order limits for auditing the pieces of pointer.
func (*Service) CreateDeleteOrderLimits ¶
func (service *Service) CreateDeleteOrderLimits(ctx context.Context, bucketID []byte, pointer *pb.Pointer) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error)
CreateDeleteOrderLimits creates the order limits for deleting the pieces of pointer.
func (*Service) CreateGetOrderLimits ¶
func (service *Service) CreateGetOrderLimits(ctx context.Context, bucketID []byte, pointer *pb.Pointer) (_ []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, err error)
CreateGetOrderLimits creates the order limits for downloading the pieces of pointer.
func (*Service) CreateGetRepairOrderLimits ¶
func (service *Service) CreateGetRepairOrderLimits(ctx context.Context, bucketID []byte, pointer *pb.Pointer, healthy []*pb.RemotePiece) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error)
CreateGetRepairOrderLimits creates the order limits for downloading the healthy pieces of pointer as the source for repair.
The length of the returned orders slice is the total number of pieces of the segment, setting to null the ones which don't correspond to a healthy piece. CreateGetRepairOrderLimits creates the order limits for downloading the healthy pieces of pointer as the source for repair.
func (*Service) CreateGracefulExitPutOrderLimit ¶ added in v0.24.0
func (service *Service) CreateGracefulExitPutOrderLimit(ctx context.Context, bucketID []byte, nodeID storj.NodeID, pieceNum int32, rootPieceID storj.PieceID, shareSize int32) (limit *pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error)
CreateGracefulExitPutOrderLimit creates an order limit for graceful exit put transfers.
func (*Service) CreatePutOrderLimits ¶
func (service *Service) CreatePutOrderLimits(ctx context.Context, bucketID []byte, nodes []*pb.Node, expiration time.Time, maxPieceSize int64) (_ storj.PieceID, _ []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, err error)
CreatePutOrderLimits creates the order limits for uploading pieces to nodes.
func (*Service) CreatePutRepairOrderLimits ¶
func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, bucketID []byte, pointer *pb.Pointer, getOrderLimits []*pb.AddressedOrderLimit, newNodes []*pb.Node) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error)
CreatePutRepairOrderLimits creates the order limits for uploading the repaired pieces of pointer to newNodes.
func (*Service) UpdateGetInlineOrder ¶ added in v0.9.0
func (service *Service) UpdateGetInlineOrder(ctx context.Context, projectID uuid.UUID, bucketName []byte, amount int64) (err error)
UpdateGetInlineOrder updates amount of inline GET bandwidth for given bucket
func (*Service) UpdatePutInlineOrder ¶ added in v0.9.0
func (service *Service) UpdatePutInlineOrder(ctx context.Context, projectID uuid.UUID, bucketName []byte, amount int64) (err error)
UpdatePutInlineOrder updates amount of inline PUT bandwidth for given bucket
func (*Service) VerifyOrderLimitSignature ¶
func (service *Service) VerifyOrderLimitSignature(ctx context.Context, signed *pb.OrderLimit) (err error)
VerifyOrderLimitSignature verifies that the signature inside order limit belongs to the satellite.
type StoragenodeBandwidthRollup ¶ added in v0.30.0
type StoragenodeBandwidthRollup struct { NodeID storj.NodeID Action pb.PieceAction Allocated int64 Settled int64 }
StoragenodeBandwidthRollup contains all the info needed for a storagenode bandwidth rollup
type Transaction ¶ added in v0.30.0
type Transaction interface { // UpdateBucketBandwidthBatch updates all the bandwidth rollups in the database UpdateBucketBandwidthBatch(ctx context.Context, intervalStart time.Time, rollups []BucketBandwidthRollup) error // UpdateStoragenodeBandwidthBatch updates all the bandwidth rollups in the database UpdateStoragenodeBandwidthBatch(ctx context.Context, intervalStart time.Time, rollups []StoragenodeBandwidthRollup) error // DeleteExpiredReportedSerials deletes any expired reported serials as of now. DeleteExpiredReportedSerials(ctx context.Context, now time.Time) (err error) }
Transaction represents a database transaction but with higher level actions.