dsset

package
v0.0.0-...-4f35217 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 8, 2025 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Overview

Package dsset implements a particular flavor of Datastore-on-Firestore backed set.

Due to its internal structure, it requires some maintenance on behalf of the caller to periodically cleanup removed items (aka tombstones).

Items added to the set should have unique IDs, at least for the duration of some configurable time interval, as defined by TombstonesDelay property. It means removed items can't be added back to the set right away (the set will think they are already there). This is required to make 'Add' operation idempotent.

TombstonesDelay is assumed to be much larger than time scale of all "fast" processes in the system, in particular all List+Pop processes. For example, if List+Pop is expected to take 1 min, TombstonesDelay should be >> 1 min (e.g. 5 min). Setting TombstonesDelay to very large value is harmful though, since it may slow down 'List' and 'Pop' (by allowing more garbage that will have to be filtered out).

Properties (where N is current size of the set):

  • Batch 'Add', O(1) performance.
  • Transactional consistent 'Pop' (1 QPS limit), O(N) performance.
  • Non-transactional consistent 'List', O(N) performance.
  • Popped items can't be re-added until their tombstones expire.

These properties make dsset suitable for multiple producers, single consumer queues, where order of items is not important, each item has a unique identifier, and the queue size is small.

Structurally dsset places 2 kinds of entities under provided Set's parent entity:

  • items of the set.
  • tombstones, recording deleted items.

This code is a fork of dsset for classic Datastore, which had to work around 1 write per second per entity group limit using shards. See go.chromium.org/luci/scheduler/appengine/engine/dsset.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CleanupGarbage

func CleanupGarbage(ctx context.Context, cleanup ...Garbage) (err error)

CleanupGarbage deletes entities used to store items under given tombstones.

This is datastore's MultiDelete RPC in disguise. Must be called outside of transactions. Idempotent.

Can handle tombstones from multiple different sets at once. This is preferred over calling 'CleanupGarbage' multiple times (once per set), since it collapses multiple datastore RPCs into one.

This MUST be called before tombstones returned by 'List' are removed in 'Pop'. Failure to do so will make items reappear in the set.

Returns only transient errors. There's no way to know which items were removed and which weren't in case of an error.

func FinishPop

func FinishPop(ctx context.Context, ops ...*PopOp) error

FinishPop completes one or more pop operations (for different sets) by submitting changes to datastore.

Must be called within the same transaction that called BeginPop.

Returns only transient errors.

Types

type Garbage

type Garbage []*tombstone

Garbage is a list of tombstones to cleanup.

type Item

type Item struct {
	ID    string // unique in time identifier of the item
	Value []byte // arbitrary value (<1 MB, but preferably much smaller)
}

Item is what's stored in the set.

type Listing

type Listing struct {
	Items   []Item  // all items in the set, in arbitrary order
	Garbage Garbage // tombstones that can be cleaned up now
	// contains filtered or unexported fields
}

Listing is returned by 'List' call.

It contains actual listing of items in the set, as well as a bunch of service information used by other operations ('CleanupGarbage' and 'Pop') to keep the set in a garbage-free and consistent state.

The only way to construct a correct Listing is to call 'List' method.

See comments for Set struct and List method for more info.

type PopOp

type PopOp struct {
	// contains filtered or unexported fields
}

PopOp is an in-progress 'Pop' operation.

See BeginPop.

func (*PopOp) CanPop

func (p *PopOp) CanPop(id string) bool

CanPop returns true if the given item can be popped from the set.

Returns false if this item has been popped before (perhaps in another transaction), or it's not in the listing passed to BeginPop.

func (*PopOp) Pop

func (p *PopOp) Pop(id string) bool

Pop removed the item from the set and returns true if it was there.

Returns false if this item has been popped before (perhaps in another transaction), or it's not in the listing passed to BeginPop.

type Set

type Set struct {
	// Parent points to the datastore owning the set.
	//
	// Set's Datastore entities will be placed with this parent.
	Parent *datastore.Key
	// TombstonesDelay is how long to keep tombstones in the set.
	TombstonesDelay time.Duration
}

Set holds a set of Items and uses tombstones to achieve idempotency of Add.

Producers just call Add(...).

The consumer must run more elaborate algorithm that ensures atomicity of 'Pop' and takes care of cleaning up of the garbage. This requires a mix of transactional and non-transactional actions:

listing, err := set.List(ctx)
if err != nil {
  return err
}

if err := dsset.CleanupGarbage(ctx, listing.Garbage); err != nil {
  return err
}

... Fetch any additional info associated with 'listing.Items' ...

err = datastore.RunInTransaction(ctx, func(ctx context.Context) error {
  op, err := set.BeginPop(ctx, listing)
  if err != nil {
    return err
  }
  for _, itm := range listing.items {
    if op.Pop(item.ID) {
      // The item was indeed in the set and we've just removed it!
    } else {
      // Some other transaction has popped it already.
    }
  }
  return dsset.FinishPop(ctx, op)
}, nil)
return err

func (*Set) Add

func (s *Set) Add(c context.Context, items []Item) error

Add idempotently adds a bunch of items to the set.

If items with given keys are already in the set, or have been deleted recently, they won't be re-added. No error is returned in this case. When retrying the call like that, the caller is responsible to pass exact same Item.Value, otherwise 'List' may return random variant of the added item.

If called outside of a transaction and the call fails, may add only some subset of items. Running inside a transaction makes this operation atomic.

Returns only transient errors.

func (*Set) BeginPop

func (s *Set) BeginPop(c context.Context, listing *Listing) (*PopOp, error)

BeginPop initiates 'Pop' operation.

Pop operation is used to transactionally remove items from the set, as well as cleanup old tombstones. It must be finished with 'dsset.FinishPop', even if no items have been popped: the internal state still can change in this case, since 'BeginPop' cleans up old tombstones. Even more, it is necessary to do 'Pop' if listing contains non-empty set of tombstones (regardless of whether the caller wants to actually pop any items from the set). This is part of the required set maintenance.

Requires a transaction. Modifies Tombstone entity. Requires a txndefer to be installed in context. This is already done by default in luci/server.

Returns only transient errors. Such errors usually mean that the entire pop sequence ('List' + 'Pop') should be retried.

func (*Set) Delete

func (s *Set) Delete(ctx context.Context, nextID func() string) (err error)

Delete deletes items from the set non-transactionally.

Use at your own risk. If in doubt, use expected BeginPop() instead.

Calls nextID() to get next ID to delete until nextID() returns "".

func (*Set) List

func (s *Set) List(ctx context.Context, maxEvents int) (l *Listing, err error)

List returns all items that are currently in the set (in arbitrary order), as well as a set of tombstones that points to items that were previously popped and can be cleaned up now.

Returns error if it is called outside of transactions.

The set of tombstones to cleanup should be passed to 'CleanupGarbage', and later to 'BeginPop' (via Listing), in that order. Not doing so will lead to accumulation of a garbage in the set that will slow down 'List' and 'Pop'.

Returns only transient errors.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL