Documentation ¶
Index ¶
- Variables
- func NewCollectionCleaner(config CollectionConfig) *collectionCleaner
- func NewStashCleaner(config CollectionConfig) *collectionCleaner
- func SupportsServerSideTransactions(db *mgo.Database) bool
- func TestHooks(runner Runner) chan ([]TestHook)
- type CleanAndPruneArgs
- type CleanupStats
- type Clock
- type CollectionConfig
- type CollectionStats
- type DBOracle
- type IncrementalPruneArgs
- type IncrementalPruner
- type MemOracle
- type Oracle
- type OracleIterator
- type ProgressMessage
- type PruneOptions
- type PrunerStats
- type Remover
- type Runner
- type RunnerParams
- type TestHook
- type Transaction
- type TransactionSource
Constants ¶
This section is empty.
Variables ¶
var ( // ErrExcessiveContention is used to signal that even after retrying, the transaction operations // could not be successfully applied due to database contention. ErrExcessiveContention = stderrors.New("state changing too quickly; try again soon") // ErrNoOperations is returned by TransactionSource implementations to signal that // no transaction operations are available to run. ErrNoOperations = stderrors.New("no transaction operations are available") // ErrNoOperations is returned by TransactionSource implementations to signal that // the transaction list could not be built but the caller should retry. ErrTransientFailure = stderrors.New("transient failure") )
var EOF = fmt.Errorf("end of transaction ids")
Functions ¶
func NewCollectionCleaner ¶
func NewCollectionCleaner(config CollectionConfig) *collectionCleaner
NewCollectionCleaner creates an object that can remove transaction tokens from document queues when the transactions have been marked as completed.
func NewStashCleaner ¶
func NewStashCleaner(config CollectionConfig) *collectionCleaner
NewStashCleaner returns an object suitable for cleaning up the txns.stash collection. It is different because when we find all references from a document have been removed, we can remove the document.
func SupportsServerSideTransactions ¶
func SupportsServerSideTransactions(db *mgo.Database) bool
SupportsServerSideTransactions lets you know if the given database can support server-side transactions.
Types ¶
type CleanAndPruneArgs ¶
type CleanAndPruneArgs struct { // Txns is the collection that holds all of the transactions that we // might want to prune. We will also make use of Txns.Database to find // all of the collections that might make use of transactions from that // collection. Txns *mgo.Collection // TxnsCount is a hint from Txns.Count() to avoid having to call it again // to determine whether it is ok to hold the set of transactions in memory. // It is optional, as we will call Txns.Count() if it is not supplied. TxnsCount int // MaxTime is a timestamp that provides a threshold of transactions // that we will actually prune. Only transactions that were created // before this threshold will be pruned. MaxTime time.Time // MaxTransactionsToProcess defines how many completed transactions that we will evaluate in this batch. // A value of 0 indicates we should evaluate all completed transactions. MaxTransactionsToProcess int // Multithreaded will start multiple pruning passes concurrently Multithreaded bool // TxnBatchSize is how many transaction to process at once. TxnBatchSize int // TxnBatchSleepTime is how long we should sleep between processing transaction // batches, to allow other parts of the system to operate (avoid consuming // all resources) // The default is to not sleep at all, but this can be configured to reduce // load while pruning. TxnBatchSleepTime time.Duration }
CleanAndPruneArgs specifies the parameters required by CleanAndPrune.
type CleanupStats ¶
type CleanupStats struct { // CollectionsInspected is the total number of collections we looked at for documents CollectionsInspected int // DocsInspected is how many documents we loaded to evaluate their txn queues DocsInspected int // DocsCleaned is how many documents we Updated to remove entries from their txn queue. DocsCleaned int // StashDocumentsRemoved is how many total documents we remove from txns.stash StashDocumentsRemoved int // StashDocumentsRemoved is how many documents we remove from txns TransactionsRemoved int // ShouldRetry indicates that we think this cleanup was not complete due to too many txns to process. We recommend running it again. ShouldRetry bool }
CleanupStats gives some numbers as to what work was done as part of CleanupAndPrune.
func CleanAndPrune ¶
func CleanAndPrune(args CleanAndPruneArgs) (CleanupStats, error)
CleanAndPrune runs the cleanup steps, and then follows up with pruning all of the transactions that are no longer referenced.
type Clock ¶
Clock is a simplified form of juju/clock.Clock, since we don't need all the methods and this allows us to be compatible with both juju/clock.Clock and juju/utils/clock.Clock
type CollectionConfig ¶
type CollectionConfig struct { // Oracle is an Oracle that we can use to determine if a given // transaction token should be considered a 'completed' transaction. Oracle Oracle // Source is the mongo collection holding documents created and managed // by transactions. Source *mgo.Collection // NumBatchTokens is the number of tokens that we will cache before // doing a query to find out whether their referenced transactions are // completed. It is useful to have a number in the hundreds so that we // efficiently query the mongo transaction database. NumBatchTokens int // MaxRemoveQueue is the maximum number of document ids that we will // hold on to in memory before we go back to the database to purge those // documents. This only affects StashCollectionCleaner, as the generic // cleaner never removes documents. MaxRemoveQueue int // LogInterval defines how often we will show progress LogInterval time.Duration }
CollectionConfig is the definition of what we will be cleaning up.
type CollectionStats ¶
type CollectionStats struct { // DocCount is the total number of documents evaluated. DocCount int // TokenCount is the total number of transaction tokens that were // referenced by the documents. TokenCount int // CompletedTokenCount is the number of unique tokens that referenced // completed transactions. CompletedTokenCount int // CompletedTxnCount is the number of completed transactions that we // looked up. CompletedTxnCount int // UpdatedDocCount is the number of documents we modified without // removing them UpdatedDocCount int // PulledCount is the number of tokens that were removed from documents. PulledTokenCount int // RemovedCount represents the number of txns.stash documents that we // decided to remove entirely. RemovedCount int }
CollectionStats tracks various counters that signal how the collector operated.
func (CollectionStats) Details ¶
func (stats CollectionStats) Details() string
func (CollectionStats) HasChanges ¶
func (stats CollectionStats) HasChanges() bool
type DBOracle ¶
type DBOracle struct {
// contains filtered or unexported fields
}
DBOracle uses a temporary table on disk to track what transactions are considered completed and purgeable.
func NewDBOracle ¶
func NewDBOracle(txns *mgo.Collection, thresholdTime time.Time, maxTxns int) (*DBOracle, func(), error)
NewDBOracle uses a database collection to manage the queue of remaining transactions. The caller is responsible to call the returned cleanup() function, to ensure that any resources are freed. thresholdTime is used to omit transactions that are newer than this time (eg, don't consider transactions that are less than 1 hr old to be considered completed yet.)
func (*DBOracle) CompletedTokens ¶
CompletedTokens looks at the list of tokens and finds what referenced txns are completed, and then returns the set of tokens that are completed.
func (*DBOracle) IterTxns ¶
func (o *DBOracle) IterTxns() (OracleIterator, error)
IterTxns lets you iterate over all of the transactions that have not been removed.
type IncrementalPruneArgs ¶
type IncrementalPruneArgs struct { // MaxTime is a timestamp that provides a threshold of transactions // that we will actually prune. Only transactions that were created // before this threshold will be pruned. // MaxTime can be set to the Zero value to indicate all transactions. MaxTime time.Time // If ProgressChannel is not nil, this will send updates when documents are // processed and transactions are pruned. ProgressChannel chan ProgressMessage // ReverseOrder indicates we should process transactions from newest to // oldest instead of form oldest to newest. ReverseOrder bool // TxnBatchSize is how many transactions to process at once. TxnBatchSize int // TxnBatchSleepTime is how long we should sleep between processing transaction // batches, to allow other parts of the system to operate (avoid consuming // all resources) // The default is to not sleep at all, but this can be configured to reduce // load while pruning. TxnBatchSleepTime time.Duration }
IncrementalPruneArgs specifies the parameters for running incremental cleanup steps.
type IncrementalPruner ¶
type IncrementalPruner struct { ProgressChan chan ProgressMessage // contains filtered or unexported fields }
IncrementalPruner reads the transaction table incrementally, seeing if it can remove the current set of transactions, and then moves on to newer transactions. It only thinks about 1k txns at a time, because that is the batch size that can be deleted. Instead, it caches documents that it has seen.
func NewIncrementalPruner ¶
func NewIncrementalPruner(args IncrementalPruneArgs) *IncrementalPruner
func (*IncrementalPruner) Prune ¶
func (p *IncrementalPruner) Prune(txns *mgo.Collection) (PrunerStats, error)
type MemOracle ¶
type MemOracle struct {
// contains filtered or unexported fields
}
MemOracle uses an in-memory cache to track what transactions are considered completed and purgeable.
func NewMemOracle ¶
func NewMemOracle(txns *mgo.Collection, thresholdTime time.Time, maxTxns int) (*MemOracle, func(), error)
NewMemOracle uses an in-memory map to manage the queue of remaining transactions.
func (*MemOracle) CompletedTokens ¶
CompletedTokens looks at the list of tokens and finds what referenced txns are completed, and then returns the set of tokens that are completed.
func (*MemOracle) IterTxns ¶
func (o *MemOracle) IterTxns() (OracleIterator, error)
IterTxns lets you iterate over all of the transactions that have not been removed.
type Oracle ¶
type Oracle interface { // Count returns the number of transactions that we are working with Count() int // CompletedTokens is called with a list of tokens to be checked. The // returned map will have a 'true' for any token that references a // completed transaction. CompletedTokens(tokens []string) (map[string]bool, error) // RemoveTxns can be used to flag that a given transaction should not // be considered part of the valid set. RemoveTxns(txnIds []bson.ObjectId) (int, error) // IterTxns lets you iterate over all of the transactions that have // not been removed. IterTxns() (OracleIterator, error) }
Oracle is the general interface that is used to track what transactions are considered completed, and can be pruned.
type OracleIterator ¶
type OracleIterator interface { // Grab the next transaction id. Will return nil if there are no // more transactions. Next() (bson.ObjectId, error) }
OracleIterator is used to walk over the remaining transactions. See the mgo.Iter as a similar iteration mechanism. Standard use is to do: iter := oracle.IterTxns() return EOF when we get to the end of the iterator, or some other error if there is another failure. for txnId := iter.Next(); err != nil; txnId := iter.Next() { } if err != txn.EOF { }
type ProgressMessage ¶
type PruneOptions ¶
type PruneOptions struct { // PruneFactor will trigger a prune when the current count of // transactions in the database is greater than old*PruneFactor PruneFactor float32 // MinNewTransactions will skip a prune even if pruneFactor is true // if there are less than MinNewTransactions that might be cleaned up. MinNewTransactions int // MaxNewTransactions will force a prune if it sees more than // MaxNewTransactions since the last run. MaxNewTransactions int // MaxTime sets a threshold for 'completed' transactions. Transactions // will be considered completed only if they are both older than // MaxTime and have a status of Completed or Aborted. Passing the // zero Time will cause us to only filter on the Status field. MaxTime time.Time // MaxBatchTransactions is the most transactions that we will prune in a single pass. // It is possible to pass 0 to prune all transactions in a pass. Note // that MaybePruneTransactions will always process all transactions, it // is just whether we do so in multiple passes, or whether it is done // all at once. MaxBatchTransactions int // MaxBatches is the maximum number of passes we will attempt. 0 or // negative values are treated as do a single pass. MaxBatches int // SmallBatchTransactionCount is the number of transactions to read at a time. // A value of 1000 seems to be a good balance between how much time we spend // processing, and how many documents we evaluate at one time. (a value of // 100 empirically processes slower, and a value of 10,000 wasn't any faster) SmallBatchTransactionCount int // BatchTransactionSleepTime is an amount of time that we will sleep between // processing batches of transactions. This allows us to avoid excess load // on the system while pruning. BatchTransactionSleepTime time.Duration }
PruneOptions controls when we will trigger a database prune.
type PrunerStats ¶
type PrunerStats struct { CacheLookupTime time.Duration DocReadTime time.Duration DocLookupTime time.Duration DocCleanupTime time.Duration StashLookupTime time.Duration StashRemoveTime time.Duration TxnReadTime time.Duration TxnRemoveTime time.Duration DocCacheHits int64 DocCacheMisses int64 DocMissingCacheHit int64 DocsMissing int64 CollectionQueries int64 DocReads int64 DocStillMissing int64 StashQueries int64 StashDocReads int64 StashDocsRemoved int64 DocQueuesCleaned int64 DocTokensCleaned int64 DocsAlreadyClean int64 TxnsRemoved int64 TxnsNotRemoved int64 StrCacheHits int64 StrCacheMisses int64 }
PrunerStats collects statistics about how the prune progressed
func CombineStats ¶
func CombineStats(a, b PrunerStats) PrunerStats
CombineStats aggregates two stats into a single value
func (PrunerStats) String ¶
func (ps PrunerStats) String() string
type Runner ¶
type Runner interface { // RunTransaction applies the specified transaction operations to a database. RunTransaction(*Transaction) error // Run calls the nominated function to get the transaction operations to apply to a database. // If there is a failure due to a txn.ErrAborted error, the attempt is retried up to nrRetries times. Run(transactions TransactionSource) error // ResumeTransactions resumes all pending transactions. ResumeTransactions() error // MaybePruneTransactions removes data for completed transactions // from mgo/txn's transaction collection. It is intended to be // called periodically. // // Pruning is an I/O heavy activity so it will only be undertaken // if: // // txn_count >= pruneFactor * txn_count_at_last_prune // MaybePruneTransactions(pruneOpts PruneOptions) error }
Runner instances applies operations to collections in a database.
func NewRunner ¶
func NewRunner(params RunnerParams) Runner
NewRunner returns a Runner which runs transactions for the database specified in params. Collection names used to manage the transactions and change log may also be specified in params, but if not, default values will be used.
type RunnerParams ¶
type RunnerParams struct { // Database is the mgo database for which the transaction runner will be used. Database *mgo.Database // TransactionCollectionName is the name of the collection // used to initialise the underlying mgo transaction runner, // defaults to "txns" if unspecified. TransactionCollectionName string // ChangeLogName is the mgo transaction runner change log, // defaults to "txns.log" if unspecified. ChangeLogName string // RunTransactionObserver, if non-nil, will be called when // a Run or RunTransaction call has completed. It will be // passed the txn.Ops and the error result. RunTransactionObserver func(Transaction) // Clock is an optional clock to use. If Clock is nil, clock.WallClock will // be used. Clock Clock // ServerSideTransactions indicates that if SSTXNs are available, use them. // Note that we will check if they are supported server side, and fall // back to client-side transactions if they are not supported. ServerSideTransactions bool }
RunnerParams are used to construct a new transaction runner. Only the Database value is mandatory, defaults will be used for the other attributes if not specified.
type TestHook ¶
type TestHook struct { Before func() After func() }
TestHook holds a pair of functions to be called before and after a mgo/txn transaction is run. Exported only for testing.
type Transaction ¶
type Transaction struct { // Ops is the operations that were performed Ops []txn.Op // Error is the error returned from running the operation, might be nil Error error // Duration is length of time it took to run the operation Duration time.Duration // Attempt is the current attempt to apply the operation. Attempt int }
Transaction is a struct that is passed to RunTransactionObserver whenever a transaction is run.