utils

package
v0.2.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 26, 2019 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func MarshalObjToStruct added in v0.1.1

func MarshalObjToStruct(input interface{}) (*structpb.Struct, error)

Marshals obj into a struct. Will use jsonPb if input is a proto message, otherwise, it'll use json marshaler.

func MarshalPbToString added in v0.1.1

func MarshalPbToString(msg proto.Message) (string, error)

Marshals a proto message using jsonPb marshaler to string.

func MarshalPbToStruct added in v0.1.1

func MarshalPbToStruct(in proto.Message) (out *structpb.Struct, err error)

Marshals a proto message into proto Struct using jsonPb marshaler.

func UnmarshalStructToPb added in v0.1.1

func UnmarshalStructToPb(structObj *structpb.Struct, msg proto.Message) error

Unmarshals a proto struct into a proto message using jsonPb marshaler.

Types

type AutoRefreshCache

type AutoRefreshCache interface {
	// starts background refresh of items
	Start(ctx context.Context)

	// Get item by id if exists else null
	Get(id string) CacheItem

	// Get object if exists else create it
	GetOrCreate(item CacheItem) (CacheItem, error)
}

AutoRefreshCache with regular GetOrCreate and Delete along with background asynchronous refresh. Caller provides callbacks for create, refresh and delete item. The cache doesn't provide apis to update items.

func NewAutoRefreshCache

func NewAutoRefreshCache(syncCb CacheSyncItem, syncRateLimiter RateLimiter, resyncPeriod time.Duration,
	size int, scope promutils.Scope) (AutoRefreshCache, error)
Example
package main

import (
	"context"
	"fmt"
	"time"
)

type ExampleItemStatus string

const (
	ExampleStatusNotStarted ExampleItemStatus = "Not-started"
	ExampleStatusStarted    ExampleItemStatus = "Started"
	ExampleStatusSucceeded  ExampleItemStatus = "Completed"
)

type ExampleCacheItem struct {
	status ExampleItemStatus
	id     string
}

func (e *ExampleCacheItem) ID() string {
	return e.id
}

type ExampleService struct {
	jobStatus map[string]ExampleItemStatus
}

func newExampleService() *ExampleService {
	return &ExampleService{jobStatus: make(map[string]ExampleItemStatus)}
}

// advance the status to next, and return
func (f *ExampleService) getStatus(id string) *ExampleCacheItem {
	if _, ok := f.jobStatus[id]; !ok {
		f.jobStatus[id] = ExampleStatusStarted
	}
	f.jobStatus[id] = ExampleStatusSucceeded
	return &ExampleCacheItem{f.jobStatus[id], id}
}

func main() {
	// This auto-refresh cache can be used for cases where keys are created by caller but processed by
	// an external service and we want to asynchronously keep track of its progress.
	exampleService := newExampleService()

	// define a sync method that the cache can use to auto-refresh in background
	syncItemCb := func(ctx context.Context, obj CacheItem) (CacheItem, CacheSyncAction, error) {
		oldItem := obj.(*ExampleCacheItem)
		newItem := exampleService.getStatus(oldItem.ID())
		if newItem.status != oldItem.status {
			return newItem, Update, nil
		}
		return newItem, Unchanged, nil
	}

	// define resync period as time duration we want cache to refresh. We can go as low as we want but cache
	// would still be constrained by time it takes to run Sync call for each item.
	resyncPeriod := time.Millisecond

	// Since number of items in the cache is dynamic, rate limiter is our knob to control resources we spend on
	// sync.
	rateLimiter := NewRateLimiter("ExampleRateLimiter", 10000, 1)

	// since cache refreshes itself asynchronously, it may not notice that an object has been deleted immediately,
	// so users of the cache should have the delete logic aware of this shortcoming (eg. not-exists may be a valid
	// error during removal if based on status in cache).
	cache, err := NewAutoRefreshCache(syncItemCb, rateLimiter, resyncPeriod, 100, nil)
	if err != nil {
		panic(err)
	}

	// start the cache with a context that would be to stop the cache by cancelling the context
	ctx, cancel := context.WithCancel(context.Background())
	cache.Start(ctx)

	// creating objects that go through a couple of state transitions to reach the final state.
	item1 := &ExampleCacheItem{status: ExampleStatusNotStarted, id: "item1"}
	item2 := &ExampleCacheItem{status: ExampleStatusNotStarted, id: "item2"}
	_, err1 := cache.GetOrCreate(item1)
	_, err2 := cache.GetOrCreate(item2)
	if err1 != nil || err2 != nil {
		fmt.Printf("unexpected error in create; err1: %v, err2: %v", err1, err2)
	}

	// wait for the cache to go through a few refresh cycles and then check status
	time.Sleep(resyncPeriod * 10)
	fmt.Printf("Current status for item1 is %v", cache.Get(item1.ID()).(*ExampleCacheItem).status)

	// stop the cache
	cancel()

}
Output:

Current status for item1 is Completed

type CacheItem

type CacheItem interface {
	ID() string
}

type CacheSyncAction added in v0.1.2

type CacheSyncAction int

Possible actions for the cache to take as a result of running the sync function on any given cache item

const (
	Unchanged CacheSyncAction = iota

	// The item returned has been updated and should be updated in the cache
	Update

	// The item should be removed from the cache
	Delete
)

type CacheSyncItem

type CacheSyncItem func(ctx context.Context, obj CacheItem) (
	newItem CacheItem, result CacheSyncAction, err error)

Your implementation of this function for your cache instance is responsible for returning

  1. The new CacheItem, and
  2. What action should be taken. The sync function has no insight into your object, and needs to be told explicitly if the new item is different from the old one.

type RateLimiter

type RateLimiter interface {
	Wait(ctx context.Context) error
}

Interface to use rate limiter

func NewRateLimiter

func NewRateLimiter(rateLimiterName string, tps float64, burst int) RateLimiter

Create a new rate-limiter with the tps and burst values

type Sequencer

type Sequencer interface {
	GetNext() uint64
	GetCur() uint64
}

Sequencer is a thread-safe incremental integer counter. Note that it is a singleton, so GetSequencer.GetNext may not always start at 0.

func GetSequencer

func GetSequencer() Sequencer

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL