Documentation
¶
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AutoRefreshCache ¶
type AutoRefreshCache interface { // starts background refresh of items Start(ctx context.Context) // Get item by id if exists else null Get(id string) CacheItem // Get object if exists else create it GetOrCreate(item CacheItem) (CacheItem, error) }
AutoRefreshCache with regular GetOrCreate and Delete along with background asynchronous refresh. Caller provides callbacks for create, refresh and delete item. The cache doesn't provide apis to update items.
func NewAutoRefreshCache ¶
func NewAutoRefreshCache(syncCb CacheSyncItem, syncRateLimiter RateLimiter, resyncPeriod time.Duration) AutoRefreshCache
Example ¶
package main import ( "context" "fmt" "time" ) type ExampleItemStatus string const ( ExampleStatusNotStarted ExampleItemStatus = "Not-started" ExampleStatusStarted ExampleItemStatus = "Started" ExampleStatusSucceeded ExampleItemStatus = "Completed" ) type ExampleCacheItem struct { status ExampleItemStatus id string } func (e *ExampleCacheItem) ID() string { return e.id } type ExampleService struct { jobStatus map[string]ExampleItemStatus } func newExampleService() *ExampleService { return &ExampleService{jobStatus: make(map[string]ExampleItemStatus)} } // advance the status to next, and return func (f *ExampleService) getStatus(id string) *ExampleCacheItem { if _, ok := f.jobStatus[id]; !ok { f.jobStatus[id] = ExampleStatusStarted } f.jobStatus[id] = ExampleStatusSucceeded return &ExampleCacheItem{f.jobStatus[id], id} } func main() { // This auto-refresh cache can be used for cases where keys are created by caller but processed by // an external service and we want to asynchronously keep track of its progress. exampleService := newExampleService() // define a sync method that the cache can use to auto-refresh in background syncItemCb := func(ctx context.Context, obj CacheItem) (CacheItem, error) { return exampleService.getStatus(obj.(*ExampleCacheItem).ID()), nil } // define resync period as time duration we want cache to refresh. We can go as low as we want but cache // would still be constrained by time it takes to run Sync call for each item. resyncPeriod := time.Millisecond // Since number of items in the cache is dynamic, rate limiter is our knob to control resources we spend on // sync. rateLimiter := NewRateLimiter("ExampleRateLimiter", 10000, 1) // since cache refreshes itself asynchronously, it may not notice that an object has been deleted immediately, // so users of the cache should have the delete logic aware of this shortcoming (eg. not-exists may be a valid // error during removal if based on status in cache). cache := NewAutoRefreshCache(syncItemCb, rateLimiter, resyncPeriod) // start the cache with a context that would be to stop the cache by cancelling the context ctx, cancel := context.WithCancel(context.Background()) cache.Start(ctx) // creating objects that go through a couple of state transitions to reach the final state. item1 := &ExampleCacheItem{status: ExampleStatusNotStarted, id: "item1"} item2 := &ExampleCacheItem{status: ExampleStatusNotStarted, id: "item2"} _, err1 := cache.GetOrCreate(item1) _, err2 := cache.GetOrCreate(item2) if err1 != nil || err2 != nil { fmt.Printf("unexpected error in create; err1: %v, err2: %v", err1, err2) } // wait for the cache to go through a few refresh cycles and then check status time.Sleep(resyncPeriod * 10) fmt.Printf("Current status for item1 is %v", cache.Get(item1.ID()).(*ExampleCacheItem).status) // stop the cache cancel() }
Output: Current status for item1 is Completed
type RateLimiter ¶
Interface to use rate limiter
func NewRateLimiter ¶
func NewRateLimiter(rateLimiterName string, tps float64, burst int) RateLimiter
Create a new rate-limiter with the tps and burst values
Click to show internal directories.
Click to hide internal directories.