cache

package
v1.9.35 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 20, 2023 License: Apache-2.0 Imports: 12 Imported by: 4

Documentation

Index

Examples

Constants

View Source
const (
	ErrNotFound errors.ErrorCode = "NOT_FOUND"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type AutoRefresh

type AutoRefresh interface {
	// Starts background refresh of items. To shutdown the cache, cancel the context.
	Start(ctx context.Context) error

	// Get item by id.
	Get(id ItemID) (Item, error)

	// Get object if exists else create it.
	GetOrCreate(id ItemID, item Item) (Item, error)

	// DeleteDelayed queues an item for deletion. It Will get deleted as part of the next Sync cycle. Until the next sync
	// cycle runs, Get and GetOrCreate will continue to return the Item in its previous state.
	DeleteDelayed(id ItemID) error
}

AutoRefresh with regular GetOrCreate and Delete along with background asynchronous refresh. Caller provides callbacks for create, refresh and delete item. The cache doesn't provide apis to update items.

func NewAutoRefreshBatchedCache

func NewAutoRefreshBatchedCache(name string, createBatches CreateBatchesFunc, syncCb SyncFunc, syncRateLimiter workqueue.RateLimiter,
	resyncPeriod time.Duration, parallelizm, size int, scope promutils.Scope) (AutoRefresh, error)

Instantiates a new AutoRefresh Cache that syncs items in batches.

func NewAutoRefreshCache

func NewAutoRefreshCache(name string, syncCb SyncFunc, syncRateLimiter workqueue.RateLimiter, resyncPeriod time.Duration,
	parallelizm, size int, scope promutils.Scope) (AutoRefresh, error)

Instantiates a new AutoRefresh Cache that syncs items periodically.

Example
package main

import (
	"context"
	"fmt"
	"sync"
	"time"

	"k8s.io/client-go/util/workqueue"

	"github.com/flyteorg/flyte/flytestdlib/errors"
	"github.com/flyteorg/flyte/flytestdlib/promutils"
)

type ExampleItemStatus string

const (
	ExampleStatusNotStarted ExampleItemStatus = "Not-started"
	ExampleStatusStarted    ExampleItemStatus = "Started"
	ExampleStatusSucceeded  ExampleItemStatus = "Completed"
)

type ExampleCacheItem struct {
	status ExampleItemStatus
	id     string
}

func (e *ExampleCacheItem) IsTerminal() bool {
	return e.status == ExampleStatusSucceeded
}

func (e *ExampleCacheItem) ID() string {
	return e.id
}

type ExampleService struct {
	jobStatus map[string]ExampleItemStatus
	lock      sync.RWMutex
}

func newExampleService() *ExampleService {
	return &ExampleService{
		jobStatus: make(map[string]ExampleItemStatus),
		lock:      sync.RWMutex{},
	}
}

// advance the status to next, and return
func (f *ExampleService) getStatus(id string) *ExampleCacheItem {
	f.lock.Lock()
	defer f.lock.Unlock()
	if _, ok := f.jobStatus[id]; !ok {
		f.jobStatus[id] = ExampleStatusStarted
	}

	f.jobStatus[id] = ExampleStatusSucceeded
	return &ExampleCacheItem{f.jobStatus[id], id}
}

func main() {
	// This auto-refresh cache can be used for cases where keys are created by caller but processed by
	// an external service and we want to asynchronously keep track of its progress.
	exampleService := newExampleService()

	// define a sync method that the cache can use to auto-refresh in background
	syncItemCb := func(ctx context.Context, batch []ItemWrapper) ([]ItemSyncResponse, error) {
		updatedItems := make([]ItemSyncResponse, 0, len(batch))
		for _, obj := range batch {
			oldItem := obj.GetItem().(*ExampleCacheItem)
			newItem := exampleService.getStatus(oldItem.ID())
			if newItem.status != oldItem.status {
				updatedItems = append(updatedItems, ItemSyncResponse{
					ID:     oldItem.ID(),
					Item:   newItem,
					Action: Update,
				})
			}
		}

		return updatedItems, nil
	}

	// define resync period as time duration we want cache to refresh. We can go as low as we want but cache
	// would still be constrained by time it takes to run Sync call for each item.
	resyncPeriod := time.Millisecond

	// Since number of items in the cache is dynamic, rate limiter is our knob to control resources we spend on
	// sync.
	rateLimiter := workqueue.DefaultControllerRateLimiter()

	// since cache refreshes itself asynchronously, it may not notice that an object has been deleted immediately,
	// so users of the cache should have the delete logic aware of this shortcoming (eg. not-exists may be a valid
	// error during removal if based on status in cache).
	cache, err := NewAutoRefreshCache("my-cache", syncItemCb, rateLimiter, resyncPeriod, 10, 100, promutils.NewTestScope())
	if err != nil {
		panic(err)
	}

	// start the cache with a context that would be to stop the cache by cancelling the context
	ctx, cancel := context.WithCancel(context.Background())
	err = cache.Start(ctx)
	if err != nil {
		panic(err)
	}

	// creating objects that go through a couple of state transitions to reach the final state.
	item1 := &ExampleCacheItem{status: ExampleStatusNotStarted, id: "item1"}
	item2 := &ExampleCacheItem{status: ExampleStatusNotStarted, id: "item2"}
	_, err1 := cache.GetOrCreate(item1.id, item1)
	_, err2 := cache.GetOrCreate(item2.id, item2)
	if err1 != nil || err2 != nil {
		fmt.Printf("unexpected error in create; err1: %v, err2: %v", err1, err2)
	}

	// wait for the cache to go through a few refresh cycles and then check status
	time.Sleep(resyncPeriod * 10)
	item, err := cache.Get(item1.ID())
	if err != nil && errors.IsCausedBy(err, ErrNotFound) {
		fmt.Printf("Item1 is no longer in the cache")
	} else {
		fmt.Printf("Current status for item1 is %v", item.(*ExampleCacheItem).status)
	}

	// stop the cache
	cancel()

}
Output:

Current status for item1 is Completed

type Batch

type Batch = []ItemWrapper

func SingleItemBatches

func SingleItemBatches(_ context.Context, snapshot []ItemWrapper) (batches []Batch, err error)

type CreateBatchesFunc

type CreateBatchesFunc func(ctx context.Context, snapshot []ItemWrapper) (batches []Batch, err error)

CreateBatchesFunc is a func type. Your implementation of this function for your cache instance is responsible for subdividing the list of cache items into batches.

type Item

type Item interface {
	IsTerminal() bool
}

type ItemID

type ItemID = string

type ItemSyncResponse

type ItemSyncResponse struct {
	ID     ItemID
	Item   Item
	Action SyncAction
}

Represents the response for the sync func

type ItemWrapper

type ItemWrapper interface {
	GetID() ItemID
	GetItem() Item
}

Items are wrapped inside an ItemWrapper to be stored in the cache.

type SyncAction

type SyncAction int

Possible actions for the cache to take as a result of running the sync function on any given cache item

const (
	Unchanged SyncAction = iota

	// The item returned has been updated and should be updated in the cache
	Update
)

type SyncFunc

type SyncFunc func(ctx context.Context, batch Batch) (
	updatedBatch []ItemSyncResponse, err error)

SyncFunc func type. Your implementation of this function for your cache instance is responsible for returning The new Item and what action should be taken. The sync function has no insight into your object, and needs to be told explicitly if the new item is different from the old one.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL