cache

package module
v0.0.0-...-49fce9b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 10, 2024 License: BSD-2-Clause Imports: 18 Imported by: 0

README

jetcache-go

banner

Build Status codeCov Go Repport Card License Release

Translate to: 简体中文

Overview

jetcache-go is a general-purpose cache access framework based on go-redis/cache. It implements the core features of the Java version of JetCache, including:

  • ✅ Flexible combination of two-level caching: You can use memory, Redis, or your own custom storage method.
  • ✅ The Once interface adopts the singleflight pattern, which is highly concurrent and thread-safe.
  • ✅ By default, MsgPack is used for encoding and decoding values. Optional sonic and native json.
  • ✅ The default local cache implementation includes Ristretto and FreeCache.
  • ✅ The default distributed cache implementation is based on go-redis/v9, and you can also customize your own implementation.
  • ✅ You can customize the errNotFound error and use placeholders to prevent cache penetration by caching empty results.
  • ✅ Supports asynchronous refreshing of distributed caches.
  • ✅ Metrics collection: By default, it prints statistical metrics (QPM, Hit, Miss, Query, QueryFail) through logs.
  • ✅ Automatic degradation of distributed cache query failures.
  • ✅ The MGet interface supports the Load function. In a distributed caching scenario, the Pipeline mode is used to improve performance. (v1.1.0+)
  • ✅ Invalidate local caches (in all Go processes) after updates (v1.1.1+)
Feature eko/gocache go-redis/cache mgtv-tech/jetcache-go
Multi-level Caching Yes Yes Yes
Cache-Aside Pattern Yes Yes Yes
Generics Support Yes No Yes
Singleflight Pattern Yes Yes Yes
Cache Update Listener No No Yes
Auto-Refresh No No Yes
Metrics Collection Yes Yes (simple) Yes
Caching Nil Result No No Yes
Batch Query No No Yes
Sparse List Caching No No Yes

Installation

To start using the latest version of jetcache-go, you can import the library into your project:

go get github.com/mgtv-tech/jetcache-go

Getting started

Basic Usage

package cache_test

import (
	"bytes"
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"time"

	"github.com/mgtv-tech/jetcache-go"
	"github.com/mgtv-tech/jetcache-go/local"
	"github.com/mgtv-tech/jetcache-go/remote"
	"github.com/redis/go-redis/v9"
)

var errRecordNotFound = errors.New("mock gorm.errRecordNotFound")

type object struct {
	Str string
	Num int
}

func Example_basicUsage() {
	ring := redis.NewRing(&redis.RingOptions{
		Addrs: map[string]string{
			"localhost": ":6379",
		},
	})

	mycache := cache.New(cache.WithName("any"),
		cache.WithRemote(remote.NewGoRedisV9Adapter(ring)),
		cache.WithLocal(local.NewFreeCache(256*local.MB, time.Minute)),
		cache.WithErrNotFound(errRecordNotFound))

	ctx := context.TODO()
	key := "mykey:1"
	obj, _ := mockDBGetObject(1)
	if err := mycache.Set(ctx, key, cache.Value(obj), cache.TTL(time.Hour)); err != nil {
		panic(err)
	}

	var wanted object
	if err := mycache.Get(ctx, key, &wanted); err == nil {
		fmt.Println(wanted)
	}
	// Output: {mystring 42}

	mycache.Close()
}

func Example_advancedUsage() {
	ring := redis.NewRing(&redis.RingOptions{
		Addrs: map[string]string{
			"localhost": ":6379",
		},
	})

	mycache := cache.New(cache.WithName("any"),
		cache.WithRemote(remote.NewGoRedisV9Adapter(ring)),
		cache.WithLocal(local.NewFreeCache(256*local.MB, time.Minute)),
		cache.WithErrNotFound(errRecordNotFound),
		cache.WithRefreshDuration(time.Minute))

	ctx := context.TODO()
	key := "mykey:1"
	obj := new(object)
	if err := mycache.Once(ctx, key, cache.Value(obj), cache.TTL(time.Hour), cache.Refresh(true),
		cache.Do(func(ctx context.Context) (any, error) {
			return mockDBGetObject(1)
		})); err != nil {
		panic(err)
	}
	fmt.Println(obj)
	// Output: &{mystring 42}

	mycache.Close()
}

func Example_mGetUsage() {
	ring := redis.NewRing(&redis.RingOptions{
		Addrs: map[string]string{
			"localhost": ":6379",
		},
	})

	mycache := cache.New(cache.WithName("any"),
		cache.WithRemote(remote.NewGoRedisV9Adapter(ring)),
		cache.WithLocal(local.NewFreeCache(256*local.MB, time.Minute)),
		cache.WithErrNotFound(errRecordNotFound),
		cache.WithRemoteExpiry(time.Minute),
	)
	cacheT := cache.NewT[int, *object](mycache)

	ctx := context.TODO()
	key := "mget"
	ids := []int{1, 2, 3}

	ret := cacheT.MGet(ctx, key, ids, func(ctx context.Context, ids []int) (map[int]*object, error) {
		return mockDBMGetObject(ids)
	})

	var b bytes.Buffer
	for _, id := range ids {
		b.WriteString(fmt.Sprintf("%v", ret[id]))
	}
	fmt.Println(b.String())
	// Output: &{mystring 1}&{mystring 2}<nil>

	cacheT.Close()
}

func Example_syncLocalUsage() {
	ring := redis.NewRing(&redis.RingOptions{
		Addrs: map[string]string{
			"localhost": ":6379",
		},
	})

	sourceID := "12345678" // Unique identifier for this cache instance
	channelName := "syncLocalChannel"
	pubSub := ring.Subscribe(context.Background(), channelName)

	mycache := cache.New(cache.WithName("any"),
		cache.WithRemote(remote.NewGoRedisV9Adapter(ring)),
		cache.WithLocal(local.NewFreeCache(256*local.MB, time.Minute)),
		cache.WithErrNotFound(errRecordNotFound),
		cache.WithRemoteExpiry(time.Minute),
		cache.WithSourceId(sourceID),
		cache.WithSyncLocal(true),
		cache.WithEventHandler(func(event *cache.Event) {
			// Broadcast local cache invalidation for the received keys
			bs, _ := json.Marshal(event)
			ring.Publish(context.Background(), channelName, string(bs))
		}),
	)
	obj, _ := mockDBGetObject(1)
	if err := mycache.Set(context.TODO(), "mykey", cache.Value(obj), cache.TTL(time.Hour)); err != nil {
		panic(err)
	}

	go func() {
		for {
			msg := <-pubSub.Channel()
			var event *cache.Event
			if err := json.Unmarshal([]byte(msg.Payload), &event); err != nil {
				panic(err)
			}
			fmt.Println(event.Keys)

			// Invalidate local cache for received keys (except own events)
			if event.SourceID != sourceID {
				for _, key := range event.Keys {
					mycache.DeleteFromLocalCache(key)
				}
			}
		}
	}()

	// Output: [mykey]
	mycache.Close()
	time.Sleep(time.Second)
}

func mockDBGetObject(id int) (*object, error) {
	if id > 100 {
		return nil, errRecordNotFound
	}
	return &object{Str: "mystring", Num: 42}, nil
}

func mockDBMGetObject(ids []int) (map[int]*object, error) {
	ret := make(map[int]*object)
	for _, id := range ids {
		if id == 3 {
			continue
		}
		ret[id] = &object{Str: "mystring", Num: id}
	}
	return ret, nil
}

Configure settings

// Options are used to store cache options.
type Options struct {
    name                       string             // Cache name, used for log identification and metric reporting
    remote                     remote.Remote      // Remote is distributed cache, such as Redis.
    local                      local.Local        // Local is memory cache, such as FreeCache.
    codec                      string             // Value encoding and decoding method. Default is "msgpack.Name". You can also customize it.
    errNotFound                error              // Error to return for cache miss. Used to prevent cache penetration.
    remoteExpiry               time.Duration      // Remote cache ttl, Default is 1 hour.
    notFoundExpiry             time.Duration      // Duration for placeholder cache when there is a cache miss. Default is 1 minute.
    offset                     time.Duration      // Expiration time jitter factor for cache misses.
    refreshDuration            time.Duration      // Interval for asynchronous cache refresh. Default is 0 (refresh is disabled).
    stopRefreshAfterLastAccess time.Duration      // Duration for cache to stop refreshing after no access. Default is refreshDuration + 1 second.
    refreshConcurrency         int                // Maximum number of concurrent cache refreshes. Default is 4.
    statsDisabled              bool               // Flag to disable cache statistics.
    statsHandler               stats.Handler      // Metrics statsHandler collector.
    sourceID                   string             // Unique identifier for cache instance.
    syncLocal                  bool               // Enable events for syncing local cache (only for "Both" cache type).
    eventChBufSize             int                // Buffer size for event channel (default: 100).
    eventHandler               func(event *Event) // Function to handle local cache invalidation events.
}

Cache metrics collection and statistics

You can implement the stats.Handler interface and register it with the Cache component to customize metric collection, for example, using Prometheus to collect metrics. We have provided a default implementation that logs the statistical metrics, as shown below:

2023/09/11 16:42:30.695294 statslogger.go:178: [INFO] jetcache-go stats last 1m0s.
cache       |         qpm|   hit_ratio|         hit|        miss|       query|  query_fail
------------+------------+------------+------------+------------+------------+------------
bench       |   216440123|     100.00%|   216439867|         256|         256|           0
bench_local |   216440123|     100.00%|   216434970|        5153|           -|           -
bench_remote|        5153|      95.03%|        4897|         256|           -|           -
------------+------------+------------+------------+------------+------------+------------

Custom Logger

import "github.com/mgtv-tech/jetcache-go/logger"

// Set your Logger
logger.SetDefaultLogger(l logger.Logger)

Custom Encoding and Decoding

import (
    "github.com/mgtv-tech/jetcache-go"
    "github.com/mgtv-tech/jetcache-go/encoding"
)

// Register your codec
encoding.RegisterCodec(codec Codec)

// Set your codec name
mycache := cache.New("any",
    cache.WithRemote(...),
    cache.WithCodec(yourCodecName string))

Usage Scenarios

Automatic Cache Refresh

jetcache-go provides automatic cache refresh capability to prevent cache avalanche and database overload when cache misses occur. It is suitable for scenarios with a small number of keys, low real-time requirements, and high loading overhead. The code below specifies a refresh every minute, and stops refreshing after 1 hour without access. If the cache is Redis or the last level of a multi-level cache is Redis, the cache loading behavior is globally unique, which means that only one server is refreshing at a time regardless of the number of servers, to reduce the load on the backend.

mycache := cache.New(cache.WithName("any"),
       // ...
       // cache.WithRefreshDuration sets the asynchronous refresh interval
       cache.WithRefreshDuration(time.Minute),
       // cache.WithStopRefreshAfterLastAccess sets the time to cancel the refresh task after the cache key is not accessed
        cache.WithStopRefreshAfterLastAccess(time.Hour))

// `Once` interface starts automatic refresh by `cache.Refresh(true)`
err := mycache.Once(ctx, key, cache.Value(obj), cache.Refresh(true), cache.Do(func(ctx context.Context) (any, error) {
    return mockDBGetObject(1)
})) 

MGet Batch Query

MGet utilizes golang generics and the Load function to provide a user-friendly way to batch query entities corresponding to IDs in a multi-level cache. If the cache is Redis or the last level of a multi-level cache is Redis, Pipeline is used to implement read and write operations to improve performance. It's worth noting that for abnormal scenarios (IO exceptions, serialization exceptions, etc.), our design philosophy is to provide lossy services as much as possible to prevent cache penetration.

mycache := cache.New(cache.WithName("any"),
       // ...
       cache.WithRemoteExpiry(time.Minute),
    )
cacheT := cache.NewT[int, *object](mycache)

ctx := context.TODO()
key := "mykey"
ids := []int{1, 2, 3}

ret := mycache.MGet(ctx, key, ids, func(ctx context.Context, ids []int) (map[int]*object, error) {
    return mockDBMGetObject(ids)
})

Codec Selection

jetcache-go implements three serialization and deserialization (codec) methods by default: sonicMsgPack, and native json.

Selection Guide:

  • For high-performance encoding and decoding: If the local cache hit rate is extremely high, but the deserialization operation of converting byte arrays to objects in the local cache consumes a lot of CPU, choose sonic.
  • For balanced performance and extreme storage space: Choose MsgPack, which uses MsgPack encoding and decoding. Content > 64 bytes will be compressed with snappy.

Tip: Remember to import the necessary packages as needed to register the codec.

 _ "github.com/mgtv-tech/jetcache-go/encoding/sonic"

Plugin

Plugin Project: jetcache-go-plugin, Welcome to contribute to this project. Currently, the following features have been implemented:

Contributing

Everyone is welcome to help improve jetcache-go. If you have any questions, suggestions, or want to add other features, please submit an issue or PR directly.

Please follow these steps to submit a PR:

  • Clone the repository
  • Create a new branch: name it feature-xxx for new features or bug-xxx for bug fixes
  • Describe the changes in detail in the PR

Contact

If you have any questions, please contact daoshenzzg@gmail.com.

Documentation

Overview

Example (AdvancedUsage)
ring := redis.NewRing(&redis.RingOptions{
	Addrs: map[string]string{
		"localhost": ":6379",
	},
})

mycache := cache.New(cache.WithName("any"),
	cache.WithRemote(remote.NewGoRedisV9Adapter(ring)),
	cache.WithLocal(local.NewFreeCache(256*local.MB, time.Minute)),
	cache.WithErrNotFound(errRecordNotFound),
	cache.WithRefreshDuration(time.Minute))

ctx := context.TODO()
key := "mykey:1"
obj := new(object)
if err := mycache.Once(ctx, key, cache.Value(obj), cache.TTL(time.Hour), cache.Refresh(true),
	cache.Do(func(ctx context.Context) (any, error) {
		return mockDBGetObject(1)
	})); err != nil {
	panic(err)
}
fmt.Println(obj)
Output:

&{mystring 42}
Example (BasicUsage)
ring := redis.NewRing(&redis.RingOptions{
	Addrs: map[string]string{
		"localhost": ":6379",
	},
})

mycache := cache.New(cache.WithName("any"),
	cache.WithRemote(remote.NewGoRedisV9Adapter(ring)),
	cache.WithLocal(local.NewFreeCache(256*local.MB, time.Minute)),
	cache.WithErrNotFound(errRecordNotFound))

ctx := context.TODO()
key := "mykey:1"
obj, _ := mockDBGetObject(1)
if err := mycache.Set(ctx, key, cache.Value(obj), cache.TTL(time.Hour)); err != nil {
	panic(err)
}

var wanted object
if err := mycache.Get(ctx, key, &wanted); err == nil {
	fmt.Println(wanted)
}
Output:

{mystring 42}
Example (MGetUsage)
ring := redis.NewRing(&redis.RingOptions{
	Addrs: map[string]string{
		"localhost": ":6379",
	},
})

mycache := cache.New(cache.WithName("any"),
	cache.WithRemote(remote.NewGoRedisV9Adapter(ring)),
	cache.WithLocal(local.NewFreeCache(256*local.MB, time.Minute)),
	cache.WithErrNotFound(errRecordNotFound),
	cache.WithRemoteExpiry(time.Minute),
)
cacheT := cache.NewT[int, *object](mycache)

ctx := context.TODO()
key := "mget"
ids := []int{1, 2, 3}

ret := cacheT.MGet(ctx, key, ids, func(ctx context.Context, ids []int) (map[int]*object, error) {
	return mockDBMGetObject(ids)
})

var b bytes.Buffer
for _, id := range ids {
	b.WriteString(fmt.Sprintf("%v", ret[id]))
}
fmt.Println(b.String())
Output:

&{mystring 1}&{mystring 2}<nil>
Example (SyncLocalUsage)
ring := redis.NewRing(&redis.RingOptions{
	Addrs: map[string]string{
		"localhost": ":6379",
	},
})

sourceID := "12345678" // Unique identifier for this cache instance
channelName := "syncLocalChannel"
pubSub := ring.Subscribe(context.Background(), channelName)

mycache := cache.New(cache.WithName("any"),
	cache.WithRemote(remote.NewGoRedisV9Adapter(ring)),
	cache.WithLocal(local.NewFreeCache(256*local.MB, time.Minute)),
	cache.WithErrNotFound(errRecordNotFound),
	cache.WithRemoteExpiry(time.Minute),
	cache.WithSourceId(sourceID),
	cache.WithSyncLocal(true),
	cache.WithEventHandler(func(event *cache.Event) {
		// Broadcast local cache invalidation for the received keys
		bs, _ := json.Marshal(event)
		ring.Publish(context.Background(), channelName, string(bs))
	}),
)
obj, _ := mockDBGetObject(1)
if err := mycache.Set(context.TODO(), "mykey", cache.Value(obj), cache.TTL(time.Hour)); err != nil {
	panic(err)
}

go func() {
	for {
		msg := <-pubSub.Channel()
		var event *cache.Event
		if err := json.Unmarshal([]byte(msg.Payload), &event); err != nil {
			panic(err)
		}
		fmt.Println(event.Keys)

		// Invalidate local cache for received keys (except own events)
		if event.SourceID != sourceID {
			for _, key := range event.Keys {
				mycache.DeleteFromLocalCache(key)
			}
		}
	}
}()
Output:

[mykey]

Index

Examples

Constants

View Source
const (
	TypeLocal  = "local"
	TypeRemote = "remote"
	TypeBoth   = "both"
)

Variables

View Source
var (
	ErrCacheMiss          = errors.New("cache: key is missing")
	ErrRemoteLocalBothNil = errors.New("cache: both remote and local are nil")
)

Functions

This section is empty.

Types

type Cache

type Cache interface {
	// Set sets the cache with ItemOption
	Set(ctx context.Context, key string, opts ...ItemOption) error
	// Once gets the opts.value for the given key from the cache or
	// executes, caches, and returns the results of the given opts.do,
	// making sure that only one execution is in-flight for a given key
	// at a time. If a duplicate comes in, the duplicate caller waits for the
	// original to complete and receives the same results.
	Once(ctx context.Context, key string, opts ...ItemOption) error
	// Delete deletes cached val with key.
	Delete(ctx context.Context, key string) error
	// DeleteFromLocalCache deletes local cached val with key.
	DeleteFromLocalCache(key string)
	// Exists reports whether val for the given key exists.
	Exists(ctx context.Context, key string) bool
	// Get gets the val for the given key and fills into val.
	Get(ctx context.Context, key string, val any) error
	// GetSkippingLocal gets the val for the given key skipping local cache.
	GetSkippingLocal(ctx context.Context, key string, val any) error
	// TaskSize returns Refresh task size.
	TaskSize() int
	// CacheType returns cache type
	CacheType() string
	// Close closes the cache. This should be called when cache refreshing is
	// enabled and no longer needed, or when it may lead to resource leaks.
	Close()
}

Cache interface is used to define the cache implementation.

func New

func New(opts ...Option) Cache

type DoFunc

type DoFunc func(ctx context.Context) (any, error)

DoFunc returns getValue to be cached.

type Event

type Event struct {
	CacheName string
	SourceID  string
	EventType EventType
	Keys      []string
}

type EventType

type EventType int
const (
	EventTypeSet          EventType = 1
	EventTypeSetByOnce    EventType = 2
	EventTypeSetByRefresh EventType = 3
	EventTypeSetByMGet    EventType = 4
	EventTypeDelete       EventType = 5
)

type ItemOption

type ItemOption func(o *item)

ItemOption defines the method to customize an Options.

func Do

func Do(do DoFunc) ItemOption

func Refresh

func Refresh(refresh bool) ItemOption

func SetNX

func SetNX(setNx bool) ItemOption

func SetXX

func SetXX(setXx bool) ItemOption

func SkipLocal

func SkipLocal(skipLocal bool) ItemOption

func TTL

func TTL(ttl time.Duration) ItemOption

func Value

func Value(value any) ItemOption

type Option

type Option func(o *Options)

Option defines the method to customize an Options.

func WithCodec

func WithCodec(codec string) Option

func WithErrNotFound

func WithErrNotFound(err error) Option

func WithEventChBufSize

func WithEventChBufSize(eventChBufSize int) Option

func WithEventHandler

func WithEventHandler(eventHandler func(event *Event)) Option

func WithLocal

func WithLocal(local local.Local) Option

func WithName

func WithName(name string) Option

func WithNotFoundExpiry

func WithNotFoundExpiry(notFoundExpiry time.Duration) Option

func WithOffset

func WithOffset(offset time.Duration) Option

func WithRefreshConcurrency

func WithRefreshConcurrency(refreshConcurrency int) Option

func WithRefreshDuration

func WithRefreshDuration(refreshDuration time.Duration) Option

func WithRemote

func WithRemote(remote remote.Remote) Option

func WithRemoteExpiry

func WithRemoteExpiry(remoteExpiry time.Duration) Option

func WithSourceId

func WithSourceId(sourceId string) Option

func WithStatsDisabled

func WithStatsDisabled(statsDisabled bool) Option

func WithStatsHandler

func WithStatsHandler(handler stats.Handler) Option

func WithStopRefreshAfterLastAccess

func WithStopRefreshAfterLastAccess(stopRefreshAfterLastAccess time.Duration) Option

func WithSyncLocal

func WithSyncLocal(syncLocal bool) Option

type Options

type Options struct {
	// contains filtered or unexported fields
}

Options are used to store cache options.

type T

type T[K constraints.Ordered, V any] struct {
	Cache
}

T wrap Cache to support golang's generics

func NewT

func NewT[K constraints.Ordered, V any](cache Cache) *T[K, V]

NewT new a T

func (*T[K, V]) Get

func (w *T[K, V]) Get(ctx context.Context, key string, id K, fn func(context.Context, K) (V, error)) (V, error)

Get retrieves the value associated with the given `key` and `id`.

It first attempts to fetch the value from the cache. If a cache miss occurs, it calls the provided `fn` function to fetch the value and stores it in the cache with an expiration time determined by the cache configuration.

A `Once` mechanism is employed to ensure only one fetch is performed for a given `key` and `id` combination, even under concurrent access.

func (*T[K, V]) MGet

func (w *T[K, V]) MGet(ctx context.Context, key string, ids []K, fn func(context.Context, []K) (map[K]V, error)) (result map[K]V)

MGet efficiently retrieves multiple values associated with the given `key` and `ids`.

It attempts to fetch all values from the cache. For missing values, it calls the provided `fn` function to fetch the remaining values and updates the cache with an expiration time determined by the cache configuration.

The results are returned as a map where the key is the `id` and the value is the corresponding data.

func (*T[K, V]) Set

func (w *T[K, V]) Set(ctx context.Context, key string, id K, v V) error

Set sets the value `v` associated with the given `key` and `id` in the cache. The expiration time of the cached value is determined by the cache configuration.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL