orders

package
v1.91.0-alpha Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 25, 2023 License: AGPL-3.0 Imports: 31 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// Error the default orders errs class.
	Error = errs.Class("orders")
	// ErrUsingSerialNumber error class for serial number.
	ErrUsingSerialNumber = errs.Class("serial number")
)
View Source
var (
	// ErrDownloadFailedNotEnoughPieces is returned when download failed due to missing pieces.
	ErrDownloadFailedNotEnoughPieces = errs.Class("not enough pieces for download")
	// ErrDecryptOrderMetadata is returned when a step of decrypting metadata fails.
	ErrDecryptOrderMetadata = errs.Class("decrytping order metadata")
)
View Source
var ErrEncryptionKey = errs.Class("order encryption key")

ErrEncryptionKey is error class used for keys.

View Source
var ErrSigner = errs.Class("signer")

ErrSigner is default error class for Signer.

Functions

func SortStoragenodeBandwidthRollups added in v0.30.0

func SortStoragenodeBandwidthRollups(rollups []StoragenodeBandwidthRollup)

SortStoragenodeBandwidthRollups sorts the rollups.

Types

type BucketBandwidthRollup added in v0.30.0

type BucketBandwidthRollup struct {
	ProjectID     uuid.UUID
	BucketName    string
	Action        pb.PieceAction
	IntervalStart time.Time
	Inline        int64
	Allocated     int64
	Settled       int64
	Dead          int64
}

BucketBandwidthRollup contains all the info needed for a bucket bandwidth rollup.

type CacheData added in v0.29.8

type CacheData struct {
	Inline    int64
	Allocated int64
	Settled   int64
	Dead      int64
}

CacheData stores the amount of inline and allocated data for a bucket bandwidth rollup.

type CacheKey added in v0.29.8

type CacheKey struct {
	ProjectID     uuid.UUID
	BucketName    string
	Action        pb.PieceAction
	IntervalStart int64
}

CacheKey is the key information for the cached map below.

type Chore added in v0.29.8

type Chore struct {
	Loop *sync2.Cycle
	// contains filtered or unexported fields
}

Chore for flushing orders write cache to the database.

architecture: Chore

func NewChore added in v0.29.8

func NewChore(log *zap.Logger, rollupsWriteCache *RollupsWriteCache, config Config) *Chore

NewChore creates new chore for flushing the orders write cache to the database.

func (*Chore) Close added in v0.29.8

func (chore *Chore) Close() error

Close stops the orders write cache chore.

func (*Chore) Run added in v0.29.8

func (chore *Chore) Run(ctx context.Context) (err error)

Run starts the orders write cache chore.

type Config added in v0.14.0

type Config struct {
	EncryptionKeys      EncryptionKeys `help:"encryption keys to encrypt info in orders" default:""`
	Expiration          time.Duration  `help:"how long until an order expires" default:"24h" testDefault:"168h"` // default is 1 day
	FlushBatchSize      int            ``                                                                        /* 143-byte string literal not displayed */
	FlushInterval       time.Duration  ``                                                                        /* 130-byte string literal not displayed */
	NodeStatusLogging   bool           `hidden:"true" help:"deprecated, log the offline/disqualification status of nodes" default:"false" testDefault:"true"`
	OrdersSemaphoreSize int            `help:"how many concurrent orders to process at once. zero is unlimited" default:"2"`
}

Config is a configuration struct for orders Service.

type ConsumedSerial added in v0.34.1

type ConsumedSerial struct {
	NodeID       storj.NodeID
	SerialNumber storj.SerialNumber
	ExpiresAt    time.Time
}

ConsumedSerial is a serial that has been consumed and its bandwidth recorded.

type DB

type DB interface {
	// UpdateBucketBandwidthAllocation updates 'allocated' bandwidth for given bucket
	UpdateBucketBandwidthAllocation(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error
	// UpdateBucketBandwidthSettle updates 'settled' bandwidth for given bucket
	UpdateBucketBandwidthSettle(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, settledAmount, deadAmount int64, intervalStart time.Time) error
	// UpdateBucketBandwidthInline updates 'inline' bandwidth for given bucket
	UpdateBucketBandwidthInline(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error
	// UpdateBandwidthBatch updates bucket and project bandwidth rollups in the database
	UpdateBandwidthBatch(ctx context.Context, rollups []BucketBandwidthRollup) error

	// UpdateStoragenodeBandwidthSettle updates 'settled' bandwidth for given storage node
	UpdateStoragenodeBandwidthSettle(ctx context.Context, storageNode storj.NodeID, action pb.PieceAction, amount int64, intervalStart time.Time) error
	// UpdateStoragenodeBandwidthSettleWithWindow updates 'settled' bandwidth for given storage node
	UpdateStoragenodeBandwidthSettleWithWindow(ctx context.Context, storageNodeID storj.NodeID, actionAmounts map[int32]int64, window time.Time) (status pb.SettlementWithWindowResponse_Status, alreadyProcessed bool, err error)

	// GetBucketBandwidth gets total bucket bandwidth from period of time
	GetBucketBandwidth(ctx context.Context, projectID uuid.UUID, bucketName []byte, from, to time.Time) (int64, error)
	// GetStorageNodeBandwidth gets total storage node bandwidth from period of time
	GetStorageNodeBandwidth(ctx context.Context, nodeID storj.NodeID, from, to time.Time) (int64, error)
}

DB implements saving order after receiving from storage node.

architecture: Database

func NewNoopDB added in v1.71.1

func NewNoopDB() DB

NewNoopDB creates noop orders DB.

type EncryptionKey added in v1.8.1

type EncryptionKey struct {
	ID  EncryptionKeyID
	Key storj.Key
}

EncryptionKey contains an identifier and an encryption key that is used to encrypt transient metadata in orders.

Can be used as a flag.

func (*EncryptionKey) Decrypt added in v1.8.1

func (key *EncryptionKey) Decrypt(ciphertext []byte, nonce storj.SerialNumber) ([]byte, error)

Decrypt decrypts data and nonce using the key.

func (*EncryptionKey) DecryptMetadata added in v1.13.1

func (key *EncryptionKey) DecryptMetadata(serial storj.SerialNumber, encrypted []byte) (*internalpb.OrderLimitMetadata, error)

DecryptMetadata decrypts order limit metadata.

func (*EncryptionKey) Encrypt added in v1.8.1

func (key *EncryptionKey) Encrypt(plaintext []byte, nonce storj.SerialNumber) []byte

Encrypt encrypts data and nonce using the key.

func (*EncryptionKey) EncryptMetadata added in v1.13.1

func (key *EncryptionKey) EncryptMetadata(serial storj.SerialNumber, metadata *internalpb.OrderLimitMetadata) ([]byte, error)

EncryptMetadata encrypts order limit metadata.

func (*EncryptionKey) IsZero added in v1.8.1

func (key *EncryptionKey) IsZero() bool

IsZero returns whether they key contains some data.

func (*EncryptionKey) Set added in v1.8.1

func (key *EncryptionKey) Set(s string) error

Set sets the value from an hex encoded string "hex(id)=hex(key)".

func (*EncryptionKey) String added in v1.8.1

func (key *EncryptionKey) String() string

String is required for pflag.Value.

func (EncryptionKey) Type added in v1.8.1

func (EncryptionKey) Type() string

Type implements pflag.Value.

type EncryptionKeyID added in v1.8.1

type EncryptionKeyID [8]byte

EncryptionKeyID is used to identify an encryption key.

func (EncryptionKeyID) IsZero added in v1.8.1

func (key EncryptionKeyID) IsZero() bool

IsZero returns whether the key contains no data.

type EncryptionKeys added in v1.8.1

type EncryptionKeys struct {
	Default EncryptionKey
	List    []EncryptionKey
	KeyByID map[EncryptionKeyID]storj.Key
}

EncryptionKeys contains a collection of keys.

Can be used as a flag.

func NewEncryptionKeys added in v1.18.1

func NewEncryptionKeys(keys ...EncryptionKey) (*EncryptionKeys, error)

NewEncryptionKeys creates a new EncrytpionKeys object with the provided keys.

func (*EncryptionKeys) Add added in v1.18.1

func (keys *EncryptionKeys) Add(ekey EncryptionKey) error

Add adds an encryption key to EncryptionsKeys object.

func (*EncryptionKeys) Clear added in v1.18.1

func (keys *EncryptionKeys) Clear()

Clear removes all keys.

func (*EncryptionKeys) Set added in v1.8.1

func (keys *EncryptionKeys) Set(s string) error

Set adds the values from a comma delimited hex encoded strings "hex(id1)=hex(key1),hex(id2)=hex(key2)".

func (*EncryptionKeys) String added in v1.8.1

func (keys *EncryptionKeys) String() string

String is required for pflag.Value.

func (EncryptionKeys) Type added in v1.8.1

func (EncryptionKeys) Type() string

Type implements pflag.Value.

type Endpoint

type Endpoint struct {
	pb.DRPCOrdersUnimplementedServer

	DB DB
	// contains filtered or unexported fields
}

Endpoint for orders receiving.

architecture: Endpoint

func NewEndpoint

func NewEndpoint(log *zap.Logger, satelliteSignee signing.Signee, db DB, nodeAPIVersionDB nodeapiversion.DB,
	ordersSemaphoreSize int, ordersService *Service) *Endpoint

NewEndpoint new orders receiving endpoint.

ordersSemaphoreSize controls the number of concurrent clients allowed to submit orders at once. A value of zero means unlimited.

func (*Endpoint) SettlementWithWindow added in v1.8.1

func (endpoint *Endpoint) SettlementWithWindow(stream pb.DRPCOrders_SettlementWithWindowStream) (err error)

SettlementWithWindow processes all orders that were created in a 1 hour window. Only one window is processed at a time. Batches are atomic, all orders are settled successfully or they all fail.

func (*Endpoint) SettlementWithWindowFinal added in v1.10.1

func (endpoint *Endpoint) SettlementWithWindowFinal(stream pb.DRPCOrders_SettlementWithWindowStream) (err error)

SettlementWithWindowFinal processes all orders that were created in a 1 hour window. Only one window is processed at a time. Batches are atomic, all orders are settled successfully or they all fail.

type Overlay added in v1.73.4

type Overlay interface {
	CachedGetOnlineNodesForGet(context.Context, []storj.NodeID) (map[storj.NodeID]*nodeselection.SelectedNode, error)
	GetOnlineNodesForAuditRepair(context.Context, []storj.NodeID) (map[storj.NodeID]*overlay.NodeReputation, error)
	Get(ctx context.Context, nodeID storj.NodeID) (*overlay.NodeDossier, error)
	IsOnline(node *overlay.NodeDossier) bool
}

Overlay defines the overlay dependency of orders.Service. use `go install github.com/golang/mock/mockgen@v1.6.0` if missing

type PendingSerial added in v0.34.1

type PendingSerial struct {
	NodeID       storj.NodeID
	BucketID     []byte
	Action       uint
	SerialNumber storj.SerialNumber
	ExpiresAt    time.Time
	Settled      uint64
}

PendingSerial is a serial number reported by a storagenode waiting to be settled.

type RollupData added in v0.29.8

type RollupData map[CacheKey]CacheData

RollupData contains the pending rollups waiting to be flushed to the db.

type RollupsWriteCache added in v0.29.8

type RollupsWriteCache struct {
	DB
	// contains filtered or unexported fields
}

RollupsWriteCache stores information needed to update bucket bandwidth rollups.

func NewRollupsWriteCache added in v0.29.8

func NewRollupsWriteCache(log *zap.Logger, db DB, batchSize int) *RollupsWriteCache

NewRollupsWriteCache creates an RollupsWriteCache.

func (*RollupsWriteCache) CloseAndFlush added in v0.31.0

func (cache *RollupsWriteCache) CloseAndFlush(ctx context.Context) error

CloseAndFlush flushes anything in the cache and marks the cache as stopped.

func (*RollupsWriteCache) CurrentData added in v0.29.8

func (cache *RollupsWriteCache) CurrentData() RollupData

CurrentData returns the contents of the cache.

func (*RollupsWriteCache) CurrentSize added in v0.29.8

func (cache *RollupsWriteCache) CurrentSize() int

CurrentSize returns the current size of the cache.

func (*RollupsWriteCache) Flush added in v0.31.0

func (cache *RollupsWriteCache) Flush(ctx context.Context)

Flush resets cache then flushes the everything in the rollups write cache to the database.

func (*RollupsWriteCache) OnNextFlush added in v0.29.8

func (cache *RollupsWriteCache) OnNextFlush() <-chan struct{}

OnNextFlush waits until the next time a flush call is made, then closes the returned channel.

func (*RollupsWriteCache) UpdateBucketBandwidthAllocation added in v0.29.8

func (cache *RollupsWriteCache) UpdateBucketBandwidthAllocation(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error

UpdateBucketBandwidthAllocation updates the rollups cache adding allocated data for a bucket bandwidth rollup.

func (*RollupsWriteCache) UpdateBucketBandwidthInline added in v0.29.8

func (cache *RollupsWriteCache) UpdateBucketBandwidthInline(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error

UpdateBucketBandwidthInline updates the rollups cache adding inline data for a bucket bandwidth rollup.

func (*RollupsWriteCache) UpdateBucketBandwidthSettle added in v1.8.1

func (cache *RollupsWriteCache) UpdateBucketBandwidthSettle(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, settledAmount, deadAmount int64, intervalStart time.Time) error

UpdateBucketBandwidthSettle updates the rollups cache adding settled data for a bucket bandwidth rollup - deadAmount is not used.

type SerialDeleteOptions added in v1.18.1

type SerialDeleteOptions struct {
	BatchSize int
}

SerialDeleteOptions are option when deleting from serial tables.

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service for creating order limits.

architecture: Service

func NewService

func NewService(
	log *zap.Logger, satellite signing.Signer, overlay Overlay,
	orders DB, placementRules overlay.PlacementRules, config Config,
) (*Service, error)

NewService creates new service for creating order limits.

func (*Service) CreateAuditOrderLimit added in v0.13.0

func (service *Service) CreateAuditOrderLimit(ctx context.Context, nodeID storj.NodeID, pieceNum uint16, rootPieceID storj.PieceID, shareSize int32) (limit *pb.AddressedOrderLimit, _ storj.PiecePrivateKey, nodeInfo *overlay.NodeReputation, err error)

CreateAuditOrderLimit creates an order limit for auditing a single piece from a segment.

func (*Service) CreateAuditOrderLimits

func (service *Service) CreateAuditOrderLimits(ctx context.Context, segment metabase.Segment, skip map[storj.NodeID]bool) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, cachedNodesInfo map[storj.NodeID]overlay.NodeReputation, err error)

CreateAuditOrderLimits creates the order limits for auditing the pieces of a segment.

func (*Service) CreateAuditPieceOrderLimit added in v1.67.1

func (service *Service) CreateAuditPieceOrderLimit(ctx context.Context, nodeID storj.NodeID, pieceNum uint16, rootPieceID storj.PieceID, pieceSize int32) (limit *pb.AddressedOrderLimit, _ storj.PiecePrivateKey, nodeInfo *overlay.NodeReputation, err error)

CreateAuditPieceOrderLimit creates an order limit for auditing a single piece from a segment, requesting that the original order limit and piece hash be included.

Unfortunately, because of the way the protocol works historically, we must use GET_REPAIR for this operation instead of GET_AUDIT.

func (*Service) CreateGetOrderLimits

func (service *Service) CreateGetOrderLimits(ctx context.Context, bucket metabase.BucketLocation, segment metabase.Segment, desiredNodes int32, overrideLimit int64) (_ []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, err error)

CreateGetOrderLimits creates the order limits for downloading the pieces of a segment.

func (*Service) CreateGetRepairOrderLimits

func (service *Service) CreateGetRepairOrderLimits(ctx context.Context, segment metabase.Segment, healthy metabase.Pieces) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, cachedNodesInfo map[storj.NodeID]overlay.NodeReputation, err error)

CreateGetRepairOrderLimits creates the order limits for downloading the healthy pieces of segment as the source for repair.

The length of the returned orders slice is the total number of pieces of the segment, setting to null the ones which don't correspond to a healthy piece.

func (*Service) CreateGracefulExitPutOrderLimit added in v0.24.0

func (service *Service) CreateGracefulExitPutOrderLimit(ctx context.Context, bucket metabase.BucketLocation, nodeID storj.NodeID, pieceNum int32, rootPieceID storj.PieceID, shareSize int32) (limit *pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error)

CreateGracefulExitPutOrderLimit creates an order limit for graceful exit put transfers.

func (*Service) CreatePutOrderLimits

func (service *Service) CreatePutOrderLimits(ctx context.Context, bucket metabase.BucketLocation, nodes []*nodeselection.SelectedNode, pieceExpiration time.Time, maxPieceSize int64) (_ storj.PieceID, _ []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, err error)

CreatePutOrderLimits creates the order limits for uploading pieces to nodes.

func (*Service) CreatePutRepairOrderLimits

func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, segment metabase.Segment, getOrderLimits []*pb.AddressedOrderLimit, healthySet map[uint16]struct{}, newNodes []*nodeselection.SelectedNode) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error)

CreatePutRepairOrderLimits creates the order limits for uploading the repaired pieces of segment to newNodes.

func (*Service) DecryptOrderMetadata added in v1.19.1

func (service *Service) DecryptOrderMetadata(ctx context.Context, order *pb.OrderLimit) (_ *internalpb.OrderLimitMetadata, err error)

DecryptOrderMetadata decrypts the order metadata.

func (*Service) ReplacePutOrderLimits added in v1.72.2

func (service *Service) ReplacePutOrderLimits(ctx context.Context, rootPieceID storj.PieceID, addressedLimits []*pb.AddressedOrderLimit, nodes []*nodeselection.SelectedNode, pieceNumbers []int32) (_ []*pb.AddressedOrderLimit, err error)

ReplacePutOrderLimits replaces order limits for uploading pieces to nodes.

func (*Service) UpdateGetInlineOrder added in v0.9.0

func (service *Service) UpdateGetInlineOrder(ctx context.Context, bucket metabase.BucketLocation, amount int64) (err error)

UpdateGetInlineOrder updates amount of inline GET bandwidth for given bucket.

func (*Service) UpdatePutInlineOrder added in v0.9.0

func (service *Service) UpdatePutInlineOrder(ctx context.Context, bucket metabase.BucketLocation, amount int64) (err error)

UpdatePutInlineOrder updates amount of inline PUT bandwidth for given bucket.

func (*Service) VerifyOrderLimitSignature

func (service *Service) VerifyOrderLimitSignature(ctx context.Context, signed *pb.OrderLimit) (err error)

VerifyOrderLimitSignature verifies that the signature inside order limit belongs to the satellite.

type Signer added in v1.11.1

type Signer struct {
	// TODO: should this be a ref to the necessary pieces instead of the service?
	Service *Service

	Bucket metabase.BucketLocation

	// TODO: use a Template pb.OrderLimit here?
	RootPieceID storj.PieceID

	PieceExpiration time.Time
	OrderCreation   time.Time
	OrderExpiration time.Time

	PublicKey  storj.PiecePublicKey
	PrivateKey storj.PiecePrivateKey

	Serial storj.SerialNumber
	Action pb.PieceAction
	Limit  int64

	EncryptedMetadataKeyID []byte
	EncryptedMetadata      []byte

	AddressedLimits []*pb.AddressedOrderLimit
	// contains filtered or unexported fields
}

Signer implements signing of order limits.

func NewSigner added in v1.11.1

func NewSigner(service *Service, rootPieceID storj.PieceID, pieceExpiration time.Time, orderCreation time.Time, limit int64, action pb.PieceAction, bucket metabase.BucketLocation) (*Signer, error)

NewSigner creates an order limit signer.

func NewSignerAudit added in v1.11.1

func NewSignerAudit(service *Service, rootPieceID storj.PieceID, orderCreation time.Time, pieceSize int64, bucket metabase.BucketLocation) (*Signer, error)

NewSignerAudit creates a new signer for audit orders.

func NewSignerDelete added in v1.11.1

func NewSignerDelete(service *Service, rootPieceID storj.PieceID, orderCreation time.Time, bucket metabase.BucketLocation) (*Signer, error)

NewSignerDelete creates a new signer for delete orders.

func NewSignerGet added in v1.11.1

func NewSignerGet(service *Service, rootPieceID storj.PieceID, orderCreation time.Time, limit int64, bucket metabase.BucketLocation) (*Signer, error)

NewSignerGet creates a new signer for get orders.

func NewSignerGracefulExit added in v1.11.1

func NewSignerGracefulExit(service *Service, rootPieceID storj.PieceID, orderCreation time.Time, shareSize int32, bucket metabase.BucketLocation) (*Signer, error)

NewSignerGracefulExit creates a new signer for graceful exit orders.

func NewSignerPut added in v1.11.1

func NewSignerPut(service *Service, pieceExpiration time.Time, orderCreation time.Time, limit int64, bucket metabase.BucketLocation) (*Signer, error)

NewSignerPut creates a new signer for put orders.

func NewSignerRepairGet added in v1.11.1

func NewSignerRepairGet(service *Service, rootPieceID storj.PieceID, orderCreation time.Time, pieceSize int64, bucket metabase.BucketLocation) (*Signer, error)

NewSignerRepairGet creates a new signer for get repair orders.

func NewSignerRepairPut added in v1.11.1

func NewSignerRepairPut(service *Service, rootPieceID storj.PieceID, pieceExpiration time.Time, orderCreation time.Time, pieceSize int64, bucket metabase.BucketLocation) (*Signer, error)

NewSignerRepairPut creates a new signer for put repair orders.

func (*Signer) Sign added in v1.11.1

func (signer *Signer) Sign(ctx context.Context, node *pb.Node, pieceNum int32) (_ *pb.AddressedOrderLimit, err error)

Sign signs an order limit for the specified node.

type StoragenodeBandwidthRollup added in v0.30.0

type StoragenodeBandwidthRollup struct {
	NodeID    storj.NodeID
	Action    pb.PieceAction
	Allocated int64
	Settled   int64
}

StoragenodeBandwidthRollup contains all the info needed for a storagenode bandwidth rollup.

type WindowEndpointRolloutPhase added in v1.10.1

type WindowEndpointRolloutPhase int

WindowEndpointRolloutPhase controls the phase of the new orders endpoint rollout.

const (
	// WindowEndpointRolloutPhase1 is when both the old and new endpoint are enabled and
	// the new endpoint places orders in the queue just like the old endpoint.
	WindowEndpointRolloutPhase1 WindowEndpointRolloutPhase = 1 + iota

	// WindowEndpointRolloutPhase2 is when the old endpoint is disabled and the new endpint
	// places orders in the queue just like the old endpoint used to.
	WindowEndpointRolloutPhase2

	// WindowEndpointRolloutPhase3 is when the old endpoint is disabled and the new endpoint
	// does not use a queue and just does direct insertion of rollup values.
	WindowEndpointRolloutPhase3
)

func (*WindowEndpointRolloutPhase) Set added in v1.10.1

func (phase *WindowEndpointRolloutPhase) Set(s string) error

Set implements flag.Value interface.

func (WindowEndpointRolloutPhase) String added in v1.10.1

func (phase WindowEndpointRolloutPhase) String() string

String provides a human readable form of the rollout phase.

func (WindowEndpointRolloutPhase) Type added in v1.10.1

Type implements pflag.Value.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL