Documentation ¶
Index ¶
Examples ¶
Constants ¶
const (
ErrNotFound errors.ErrorCode = "NOT_FOUND"
)
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AutoRefresh ¶
type AutoRefresh interface { // starts background refresh of items. Start(ctx context.Context) error // Get item by id. Get(id ItemID) (Item, error) // Get object if exists else create it. GetOrCreate(id ItemID, item Item) (Item, error) }
AutoRefresh with regular GetOrCreate and Delete along with background asynchronous refresh. Caller provides callbacks for create, refresh and delete item. The cache doesn't provide apis to update items.
func NewAutoRefreshBatchedCache ¶
func NewAutoRefreshBatchedCache(createBatches CreateBatchesFunc, syncCb SyncFunc, syncRateLimiter workqueue.RateLimiter, resyncPeriod time.Duration, parallelizm, size int, scope promutils.Scope) (AutoRefresh, error)
Instantiates a new AutoRefresh Cache that syncs items in batches.
func NewAutoRefreshCache ¶
func NewAutoRefreshCache(syncCb SyncFunc, syncRateLimiter workqueue.RateLimiter, resyncPeriod time.Duration, parallelizm, size int, scope promutils.Scope) (AutoRefresh, error)
Instantiates a new AutoRefresh Cache that syncs items periodically.
Example ¶
package main import ( "context" "fmt" "sync" "time" "k8s.io/client-go/util/workqueue" "github.com/lyft/flytestdlib/errors" "github.com/lyft/flytestdlib/promutils" ) type ExampleItemStatus string const ( ExampleStatusNotStarted ExampleItemStatus = "Not-started" ExampleStatusStarted ExampleItemStatus = "Started" ExampleStatusSucceeded ExampleItemStatus = "Completed" ) type ExampleCacheItem struct { status ExampleItemStatus id string } func (e *ExampleCacheItem) ID() string { return e.id } type ExampleService struct { jobStatus map[string]ExampleItemStatus lock sync.RWMutex } func newExampleService() *ExampleService { return &ExampleService{ jobStatus: make(map[string]ExampleItemStatus), lock: sync.RWMutex{}, } } // advance the status to next, and return func (f *ExampleService) getStatus(id string) *ExampleCacheItem { f.lock.Lock() defer f.lock.Unlock() if _, ok := f.jobStatus[id]; !ok { f.jobStatus[id] = ExampleStatusStarted } f.jobStatus[id] = ExampleStatusSucceeded return &ExampleCacheItem{f.jobStatus[id], id} } func main() { // This auto-refresh cache can be used for cases where keys are created by caller but processed by // an external service and we want to asynchronously keep track of its progress. exampleService := newExampleService() // define a sync method that the cache can use to auto-refresh in background syncItemCb := func(ctx context.Context, batch []ItemWrapper) ([]ItemSyncResponse, error) { updatedItems := make([]ItemSyncResponse, 0, len(batch)) for _, obj := range batch { oldItem := obj.GetItem().(*ExampleCacheItem) newItem := exampleService.getStatus(oldItem.ID()) if newItem.status != oldItem.status { updatedItems = append(updatedItems, ItemSyncResponse{ ID: oldItem.ID(), Item: newItem, Action: Update, }) } } return updatedItems, nil } // define resync period as time duration we want cache to refresh. We can go as low as we want but cache // would still be constrained by time it takes to run Sync call for each item. resyncPeriod := time.Millisecond // Since number of items in the cache is dynamic, rate limiter is our knob to control resources we spend on // sync. rateLimiter := workqueue.DefaultControllerRateLimiter() // since cache refreshes itself asynchronously, it may not notice that an object has been deleted immediately, // so users of the cache should have the delete logic aware of this shortcoming (eg. not-exists may be a valid // error during removal if based on status in cache). cache, err := NewAutoRefreshCache(syncItemCb, rateLimiter, resyncPeriod, 10, 100, promutils.NewTestScope()) if err != nil { panic(err) } // start the cache with a context that would be to stop the cache by cancelling the context ctx, cancel := context.WithCancel(context.Background()) err = cache.Start(ctx) if err != nil { panic(err) } // creating objects that go through a couple of state transitions to reach the final state. item1 := &ExampleCacheItem{status: ExampleStatusNotStarted, id: "item1"} item2 := &ExampleCacheItem{status: ExampleStatusNotStarted, id: "item2"} _, err1 := cache.GetOrCreate(item1.id, item1) _, err2 := cache.GetOrCreate(item2.id, item2) if err1 != nil || err2 != nil { fmt.Printf("unexpected error in create; err1: %v, err2: %v", err1, err2) } // wait for the cache to go through a few refresh cycles and then check status time.Sleep(resyncPeriod * 10) item, err := cache.Get(item1.ID()) if err != nil && errors.IsCausedBy(err, ErrNotFound) { fmt.Printf("Item1 is no longer in the cache") } else { fmt.Printf("Current status for item1 is %v", item.(*ExampleCacheItem).status) } // stop the cache cancel() }
Output: Current status for item1 is Completed
type Batch ¶
type Batch = []ItemWrapper
func SingleItemBatches ¶
func SingleItemBatches(_ context.Context, snapshot []ItemWrapper) (batches []Batch, err error)
type CreateBatchesFunc ¶
type CreateBatchesFunc func(ctx context.Context, snapshot []ItemWrapper) (batches []Batch, err error)
Your implementation of this function for your cache instance is responsible for subdividing the list of cache items into batches.
type ItemSyncResponse ¶
type ItemSyncResponse struct { ID ItemID Item Item Action SyncAction }
Represents the response for the sync func
type ItemWrapper ¶
Items are wrapped inside an ItemWrapper to be stored in the cache.
type SyncAction ¶
type SyncAction int
Possible actions for the cache to take as a result of running the sync function on any given cache item
const ( Unchanged SyncAction = iota // The item returned has been updated and should be updated in the cache Update )
type SyncFunc ¶
type SyncFunc func(ctx context.Context, batch Batch) ( updatedBatch []ItemSyncResponse, err error)
Your implementation of this function for your cache instance is responsible for returning
- The new Item, and
- What action should be taken. The sync function has no insight into your object, and needs to be told explicitly if the new item is different from the old one.