cache

package module
v1.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 20, 2024 License: BSD-2-Clause Imports: 16 Imported by: 0

README

Build Status codeCov Go Repport Card License

Translations: English | 简体中文

介绍

jetcache-go是基于go-redis/cache拓展的通用缓存访问框架。 实现了类似Java版JetCache的核心功能,包括:

  • ✅ 二级缓存自由组合:本地缓存、分布式缓存、本地缓存+分布式缓存
  • Once接口采用单飞(singleflight)模式,高并发且线程安全
  • ✅ 默认采用MsgPack来编解码Value。可选sonic、原生json
  • ✅ 本地缓存默认实现了TinyLFUFreeCache
  • ✅ 分布式缓存默认实现了go-redis/v8的适配器,你也可以自定义实现
  • ✅ 可以自定义errNotFound,通过占位符替换,缓存空结果防止缓存穿透
  • ✅ 支持开启分布式缓存异步刷新
  • ✅ 指标采集,默认实现了通过日志打印各级缓存的统计指标(QPM、Hit、Miss、Query、QueryFail)
  • ✅ 分布式缓存查询故障自动降级
  • MGet接口支持Load函数。带分布缓存场景,采用Pipeline模式实现 (v1.1.0+)
  • ✅ 支持拓展缓存更新后所有GO进程的本地缓存失效 (v1.1.1+)

安装

使用最新版本的jetcache-go,您可以在项目中导入该库:

go get github.com/mgtv-tech/jetcache-go

快速开始

package cache_test

import (
	"bytes"
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"time"

	"github.com/go-redis/redis/v8"

	"github.com/mgtv-tech/jetcache-go"
	"github.com/mgtv-tech/jetcache-go/local"
	"github.com/mgtv-tech/jetcache-go/remote"
	"github.com/mgtv-tech/jetcache-go/util"
)

var errRecordNotFound = errors.New("mock gorm.errRecordNotFound")

type object struct {
	Str string
	Num int
}

func mockDBGetObject(id int) (*object, error) {
	if id > 100 {
		return nil, errRecordNotFound
	}
	return &object{Str: "mystring", Num: 42}, nil
}

func mockDBMGetObject(ids []int) (map[int]*object, error) {
	ret := make(map[int]*object)
	for _, id := range ids {
		if id == 3 {
			continue
		}
		ret[id] = &object{Str: "mystring", Num: id}
	}
	return ret, nil
}

func Example_basicUsage() {
	ring := redis.NewRing(&redis.RingOptions{
		Addrs: map[string]string{
			"localhost": ":6379",
		},
	})

	mycache := cache.New(cache.WithName("any"),
		cache.WithRemote(remote.NewGoRedisV8Adaptor(ring)),
		cache.WithLocal(local.NewFreeCache(256*local.MB, time.Minute)),
		cache.WithErrNotFound(errRecordNotFound))

	ctx := context.TODO()
	key := util.JoinAny(":", "mykey", 1)
	obj, _ := mockDBGetObject(1)
	if err := mycache.Set(ctx, key, cache.Value(obj), cache.TTL(time.Hour)); err != nil {
		panic(err)
	}

	var wanted object
	if err := mycache.Get(ctx, key, &wanted); err == nil {
		fmt.Println(wanted)
	}
	// Output: {mystring 42}

	mycache.Close()
}

func Example_advancedUsage() {
	ring := redis.NewRing(&redis.RingOptions{
		Addrs: map[string]string{
			"localhost": ":6379",
		},
	})

	mycache := cache.New(cache.WithName("any"),
		cache.WithRemote(remote.NewGoRedisV8Adaptor(ring)),
		cache.WithLocal(local.NewFreeCache(256*local.MB, time.Minute)),
		cache.WithErrNotFound(errRecordNotFound),
		cache.WithRefreshDuration(time.Minute))

	ctx := context.TODO()
	key := util.JoinAny(":", "mykey", 1)
	obj := new(object)
	if err := mycache.Once(ctx, key, cache.Value(obj), cache.TTL(time.Hour), cache.Refresh(true),
		cache.Do(func(ctx context.Context) (any, error) {
			return mockDBGetObject(1)
		})); err != nil {
		panic(err)
	}
	fmt.Println(obj)
	//Output: &{mystring 42}

	mycache.Close()
}

func Example_mGetUsage() {
	ring := redis.NewRing(&redis.RingOptions{
		Addrs: map[string]string{
			"localhost": ":6379",
		},
	})

	mycache := cache.New(cache.WithName("any"),
		cache.WithRemote(remote.NewGoRedisV8Adaptor(ring)),
		cache.WithLocal(local.NewFreeCache(256*local.MB, time.Minute)),
		cache.WithErrNotFound(errRecordNotFound),
		cache.WithRemoteExpiry(time.Minute),
	)
	cacheT := cache.NewT[int, *object](mycache)

	ctx := context.TODO()
	key := "mget"
	ids := []int{1, 2, 3}

	ret := cacheT.MGet(ctx, key, ids, func(ctx context.Context, ids []int) (map[int]*object, error) {
		return mockDBMGetObject(ids)
	})

	var b bytes.Buffer
	for _, id := range ids {
		b.WriteString(fmt.Sprintf("%v", ret[id]))
	}
	fmt.Println(b.String())
	//Output: &{mystring 1}&{mystring 2}<nil>

	cacheT.Close()
}

func Example_syncLocalUsage() {
	ring := redis.NewRing(&redis.RingOptions{
		Addrs: map[string]string{
			"localhost": ":6379",
		},
	})

	sourceID := "12345678" // Unique identifier for this cache instance
	channelName := "syncLocalChannel"
	pubSub := ring.Subscribe(context.Background(), channelName)

	mycache := cache.New(cache.WithName("any"),
		cache.WithRemote(remote.NewGoRedisV8Adaptor(ring)),
		cache.WithLocal(local.NewFreeCache(256*local.MB, time.Minute)),
		cache.WithErrNotFound(errRecordNotFound),
		cache.WithRemoteExpiry(time.Minute),
		cache.WithSourceId(sourceID),
		cache.WithSyncLocal(true),
		cache.WithEventHandler(func(event *cache.Event) {
			// Broadcast local cache invalidation for the received keys
			bs, _ := json.Marshal(event)
			ring.Publish(context.Background(), channelName, string(bs))
		}),
	)
	obj, _ := mockDBGetObject(1)
	if err := mycache.Set(context.TODO(), "mykey", cache.Value(obj), cache.TTL(time.Hour)); err != nil {
		panic(err)
	}

	go func() {
		for {
			msg, err := pubSub.ReceiveMessage(context.Background())
			if err != nil {
				panic(err)
			}
			var event *cache.Event
			if err = json.Unmarshal([]byte(msg.Payload), &event); err != nil {
				panic(err)
			}
			fmt.Println(event.Keys)

			// Invalidate local cache for received keys (except own events)
			if event.SourceID != sourceID {
				for _, key := range event.Keys {
					mycache.DeleteFromLocalCache(key)
				}
			}
		}
	}()

	// Output: [mykey]
	mycache.Close()
	time.Sleep(time.Second)
}
配置选项
// Options are used to store cache options.
Options struct {
    name                       string             // Cache name, used for log identification and metric reporting
    remote                     remote.Remote      // Remote is distributed cache, such as Redis.
    local                      local.Local        // Local is memory cache, such as FreeCache.
    codec                      string             // Value encoding and decoding method. Default is "msgpack.Name". You can also customize it.
    errNotFound                error              // Error to return for cache miss. Used to prevent cache penetration.
    remoteExpiry               time.Duration      // Remote cache ttl, Default is 1 hour.
    notFoundExpiry             time.Duration      // Duration for placeholder cache when there is a cache miss. Default is 1 minute.
    offset                     time.Duration      // Expiration time jitter factor for cache misses.
    refreshDuration            time.Duration      // Interval for asynchronous cache refresh. Default is 0 (refresh is disabled).
    stopRefreshAfterLastAccess time.Duration      // Duration for cache to stop refreshing after no access. Default is refreshDuration + 1 second.
    refreshConcurrency         int                // Maximum number of concurrent cache refreshes. Default is 4.
    statsDisabled              bool               // Flag to disable cache statistics.
    statsHandler               stats.Handler      // Metrics statsHandler collector.
    sourceID                   string             // Unique identifier for cache instance.
    syncLocal                  bool               // Enable events for syncing local cache (only for "Both" cache type).
    eventChBufSize             int                // Buffer size for event channel (default: 100).
    eventHandler               func(event *Event) // Function to handle local cache invalidation events.
}
缓存指标收集和统计

您可以实现stats.Handler接口并注册到Cache组件来自定义收集指标,例如使用Prometheus 采集指标。我们默认实现了通过日志打印统计指标,如下所示:

2023/09/11 16:42:30.695294 statslogger.go:178: [INFO] jetcache-go stats last 1m0s.
cache       |         qpm|   hit_ratio|         hit|        miss|       query|  query_fail
------------+------------+------------+------------+------------+------------+------------
bench       |   216440123|     100.00%|   216439867|         256|         256|           0|
bench_local |   216440123|     100.00%|   216434970|        5153|           -|           -|
bench_remote|        5153|      95.03%|        4897|         256|           -|           -|
------------+------------+------------+------------+------------+------------+------------
自定义日志
import "github.com/mgtv-tech/jetcache-go/logger"

// Set your Logger
logger.SetDefaultLogger(l logger.Logger)
自定义编解码
import (
    "github.com/mgtv-tech/jetcache-go"
    "github.com/mgtv-tech/jetcache-go/encoding"
)

// Register your codec
encoding.RegisterCodec(codec Codec)

// Set your codec name
mycache := cache.New[string, any]("any",
    cache.WithRemote(...),
    cache.WithCodec(yourCodecName string))
使用场景说明
自动刷新缓存

jetcache-go提供了自动刷新缓存的能力,目的是为了防止缓存失效时造成的雪崩效应打爆数据库。对一些key比较少,实时性要求不高,加载开销非常大的缓存场景,适合使用自动刷新。下面的代码指定每分钟刷新一次,1小时如果没有访问就停止刷新。如果缓存是redis或者多级缓存最后一级是redis,缓存加载行为是全局唯一的,也就是说不管有多少台服务器,同时只有一个服务器在刷新,目的是为了降低后端的加载负担。

mycache := cache.New(cache.WithName("any"),
		// ...
		// cache.WithRefreshDuration 设置异步刷新时间间隔
		cache.WithRefreshDuration(time.Minute),
		// cache.WithStopRefreshAfterLastAccess 设置缓存 key 没有访问后的刷新任务取消时间
        cache.WithStopRefreshAfterLastAccess(time.Hour))

// `Once` 接口通过 `cache.Refresh(true)` 开启自动刷新
err := mycache.Once(ctx, key, cache.Value(obj), cache.Refresh(true), cache.Do(func(ctx context.Context) (any, error) {
    return mockDBGetObject(1)
}))
MGet批量查询

MGet 通过 golang的泛型机制 + Load 函数,非常友好的多级缓存批量查询ID对应的实体。如果缓存是redis或者多级缓存最后一级是redis,查询时采用Pipeline实现读写操作,提升性能。需要说明是,针对异常场景(IO异常、序列化异常等),我们设计思路是尽可能提供有损服务,防止穿透。

mycache := cache.New(cache.WithName("any"),
		// ...
		cache.WithRemoteExpiry(time.Minute),
	)
cacheT := cache.NewT[int, *object](mycache)

ctx := context.TODO()
key := "mykey"
ids := []int{1, 2, 3}

ret := cacheT.MGet(ctx, key, ids, func(ctx context.Context, ids []int) (map[int]*object, error) {
    return mockDBMGetObject(ids)
})
Codec编解码选择

jetcache-go默认实现了三种编解码方式,sonicMsgPack和原生json

选择指导:

  • 追求编解码性能: 例如本地缓存命中率极高,但本地缓存byte数组转对象的反序列化操作非常耗CPU,那么选择sonic
  • 兼顾性能和极致的存储空间: 选择MsgPack,MsgPack采用MsgPack编解码,内容>64个字节,会采用snappy压缩。

Tip:使用的时候记得按需导包来完成对应的编解码器注册

 _ "github.com/mgtv-tech/jetcache-go/encoding/sonic"

Documentation

Overview

Example (AdvancedUsage)
ring := redis.NewRing(&redis.RingOptions{
	Addrs: map[string]string{
		"localhost": ":6379",
	},
})

mycache := cache.New(cache.WithName("any"),
	cache.WithRemote(remote.NewGoRedisV8Adaptor(ring)),
	cache.WithLocal(local.NewFreeCache(256*local.MB, time.Minute)),
	cache.WithErrNotFound(errRecordNotFound),
	cache.WithRefreshDuration(time.Minute))

ctx := context.TODO()
key := util.JoinAny(":", "mykey", 1)
obj := new(object)
if err := mycache.Once(ctx, key, cache.Value(obj), cache.TTL(time.Hour), cache.Refresh(true),
	cache.Do(func(ctx context.Context) (any, error) {
		return mockDBGetObject(1)
	})); err != nil {
	panic(err)
}
fmt.Println(obj)
Output:

&{mystring 42}
Example (BasicUsage)
ring := redis.NewRing(&redis.RingOptions{
	Addrs: map[string]string{
		"localhost": ":6379",
	},
})

mycache := cache.New(cache.WithName("any"),
	cache.WithRemote(remote.NewGoRedisV8Adaptor(ring)),
	cache.WithLocal(local.NewFreeCache(256*local.MB, time.Minute)),
	cache.WithErrNotFound(errRecordNotFound))

ctx := context.TODO()
key := util.JoinAny(":", "mykey", 1)
obj, _ := mockDBGetObject(1)
if err := mycache.Set(ctx, key, cache.Value(obj), cache.TTL(time.Hour)); err != nil {
	panic(err)
}

var wanted object
if err := mycache.Get(ctx, key, &wanted); err == nil {
	fmt.Println(wanted)
}
Output:

{mystring 42}
Example (MGetUsage)
ring := redis.NewRing(&redis.RingOptions{
	Addrs: map[string]string{
		"localhost": ":6379",
	},
})

mycache := cache.New(cache.WithName("any"),
	cache.WithRemote(remote.NewGoRedisV8Adaptor(ring)),
	cache.WithLocal(local.NewFreeCache(256*local.MB, time.Minute)),
	cache.WithErrNotFound(errRecordNotFound),
	cache.WithRemoteExpiry(time.Minute),
)
cacheT := cache.NewT[int, *object](mycache)

ctx := context.TODO()
key := "mget"
ids := []int{1, 2, 3}

ret := cacheT.MGet(ctx, key, ids, func(ctx context.Context, ids []int) (map[int]*object, error) {
	return mockDBMGetObject(ids)
})

var b bytes.Buffer
for _, id := range ids {
	b.WriteString(fmt.Sprintf("%v", ret[id]))
}
fmt.Println(b.String())
Output:

&{mystring 1}&{mystring 2}<nil>
Example (SyncLocalUsage)
ring := redis.NewRing(&redis.RingOptions{
	Addrs: map[string]string{
		"localhost": ":6379",
	},
})

sourceID := "12345678" // Unique identifier for this cache instance
channelName := "syncLocalChannel"
pubSub := ring.Subscribe(context.Background(), channelName)

mycache := cache.New(cache.WithName("any"),
	cache.WithRemote(remote.NewGoRedisV8Adaptor(ring)),
	cache.WithLocal(local.NewFreeCache(256*local.MB, time.Minute)),
	cache.WithErrNotFound(errRecordNotFound),
	cache.WithRemoteExpiry(time.Minute),
	cache.WithSourceId(sourceID),
	cache.WithSyncLocal(true),
	cache.WithEventHandler(func(event *cache.Event) {
		// Broadcast local cache invalidation for the received keys
		bs, _ := json.Marshal(event)
		ring.Publish(context.Background(), channelName, string(bs))
	}),
)
obj, _ := mockDBGetObject(1)
if err := mycache.Set(context.TODO(), "mykey", cache.Value(obj), cache.TTL(time.Hour)); err != nil {
	panic(err)
}

go func() {
	for {
		msg, err := pubSub.ReceiveMessage(context.Background())
		if err != nil {
			panic(err)
		}
		var event *cache.Event
		if err = json.Unmarshal([]byte(msg.Payload), &event); err != nil {
			panic(err)
		}
		fmt.Println(event.Keys)

		// Invalidate local cache for received keys (except own events)
		if event.SourceID != sourceID {
			for _, key := range event.Keys {
				mycache.DeleteFromLocalCache(key)
			}
		}
	}
}()
Output:

[mykey]

Index

Examples

Constants

View Source
const (
	TypeLocal  = "local"
	TypeRemote = "remote"
	TypeBoth   = "both"
)

Variables

View Source
var (
	ErrCacheMiss          = errors.New("cache: key is missing")
	ErrRemoteLocalBothNil = errors.New("cache: both remote and local are nil")
)

Functions

This section is empty.

Types

type Cache

type Cache interface {
	// Set sets the cache with ItemOption
	Set(ctx context.Context, key string, opts ...ItemOption) error
	// Once gets the opts.value for the given key from the cache or
	// executes, caches, and returns the results of the given opts.do,
	// making sure that only one execution is in-flight for a given key
	// at a time. If a duplicate comes in, the duplicate caller waits for the
	// original to complete and receives the same results.
	Once(ctx context.Context, key string, opts ...ItemOption) error
	// Delete deletes cached val with key.
	Delete(ctx context.Context, key string) error
	// DeleteFromLocalCache deletes local cached val with key.
	DeleteFromLocalCache(key string)
	// Exists reports whether val for the given key exists.
	Exists(ctx context.Context, key string) bool
	// Get gets the val for the given key and fills into val.
	Get(ctx context.Context, key string, val any) error
	// GetSkippingLocal gets the val for the given key skipping local cache.
	GetSkippingLocal(ctx context.Context, key string, val any) error
	// TaskSize returns Refresh task size.
	TaskSize() int
	// CacheType returns cache type
	CacheType() string
	// Close closes the cache. This should be called when cache refreshing is
	// enabled and no longer needed, or when it may lead to resource leaks.
	Close()
}

Cache interface is used to define the cache implementation.

func New

func New(opts ...Option) Cache

type DoFunc

type DoFunc func(ctx context.Context) (any, error)

DoFunc returns getValue to be cached.

type Event added in v1.1.1

type Event struct {
	CacheName string
	SourceID  string
	EventType EventType
	Keys      []string
}

type EventType added in v1.1.1

type EventType int
const (
	EventTypeSet          EventType = 1
	EventTypeSetByOnce    EventType = 2
	EventTypeSetByRefresh EventType = 3
	EventTypeSetByMGet    EventType = 4
	EventTypeDelete       EventType = 5
)

type ItemOption

type ItemOption func(o *item)

ItemOption defines the method to customize an Options.

func Do

func Do(do DoFunc) ItemOption

func Refresh

func Refresh(refresh bool) ItemOption

func SetNX

func SetNX(setNx bool) ItemOption

func SetXX

func SetXX(setXx bool) ItemOption

func SkipLocal

func SkipLocal(skipLocal bool) ItemOption

func TTL

func TTL(ttl time.Duration) ItemOption

func Value

func Value(value any) ItemOption

type Option

type Option func(o *Options)

Option defines the method to customize an Options.

func WithCodec

func WithCodec(codec string) Option

func WithErrNotFound

func WithErrNotFound(err error) Option

func WithEventChBufSize added in v1.1.1

func WithEventChBufSize(eventChBufSize int) Option

func WithEventHandler added in v1.1.1

func WithEventHandler(eventHandler func(event *Event)) Option

func WithLocal

func WithLocal(local local.Local) Option

func WithName

func WithName(name string) Option

func WithNotFoundExpiry

func WithNotFoundExpiry(notFoundExpiry time.Duration) Option

func WithOffset

func WithOffset(offset time.Duration) Option

func WithRefreshConcurrency

func WithRefreshConcurrency(refreshConcurrency int) Option

func WithRefreshDuration

func WithRefreshDuration(refreshDuration time.Duration) Option

func WithRemote

func WithRemote(remote remote.Remote) Option

func WithRemoteExpiry

func WithRemoteExpiry(remoteExpiry time.Duration) Option

func WithSourceId added in v1.1.1

func WithSourceId(sourceId string) Option

func WithStatsDisabled

func WithStatsDisabled(statsDisabled bool) Option

func WithStatsHandler

func WithStatsHandler(handler stats.Handler) Option

func WithStopRefreshAfterLastAccess

func WithStopRefreshAfterLastAccess(stopRefreshAfterLastAccess time.Duration) Option

func WithSyncLocal added in v1.1.1

func WithSyncLocal(syncLocal bool) Option

type Options

type Options struct {
	// contains filtered or unexported fields
}

Options are used to store cache options.

type T

type T[K comparable, V any] struct {
	Cache
}

T wrap Cache to support golang's generics

func NewT

func NewT[K comparable, V any](cache Cache) *T[K, V]

NewT new a T

func (*T[K, V]) MGet

func (w *T[K, V]) MGet(ctx context.Context, key string, ids []K, fn func(context.Context, []K) (map[K]V, error)) map[K]V

MGet efficiently fetches multiple values associated with a specific key prefix and a list of IDs in a single call. It prioritizes retrieving data from the cache and falls back to a user-provided function (`fn`) if values are missing. Asynchronous refresh is not supported.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL