cache

package module
v1.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 18, 2024 License: BSD-2-Clause Imports: 18 Imported by: 0

README

jetcache-go

banner

Build Status codeCov Go Repport Card License Release

Translate to: 简体中文

Overview

jetcache-go is a general-purpose cache access framework based on go-redis/cache. It implements the core features of the Java version of JetCache, including:

  • ✅ Flexible combination of two-level caching: You can use memory, Redis, or your own custom storage method.
  • ✅ The Once interface adopts the singleflight pattern, which is highly concurrent and thread-safe.
  • ✅ By default, MsgPack is used for encoding and decoding values. Optional sonic and native json.
  • ✅ The default local cache implementation includes Ristretto and FreeCache.
  • ✅ The default distributed cache implementation is based on go-redis/v9, and you can also customize your own implementation.
  • ✅ You can customize the errNotFound error and use placeholders to prevent cache penetration by caching empty results.
  • ✅ Supports asynchronous refreshing of distributed caches.
  • ✅ Metrics collection: By default, it prints statistical metrics (QPM, Hit, Miss, Query, QueryFail) through logs.
  • ✅ Automatic degradation of distributed cache query failures.
  • ✅ The MGet interface supports the Load function. In a distributed caching scenario, the Pipeline mode is used to improve performance. (v1.1.0+)
  • ✅ Invalidate local caches (in all Go processes) after updates (v1.1.1+)

Learning jetcache-go

Visit documentation for more details.

Installation

To start using the latest version of jetcache-go, you can import the library into your project:

go get github.com/mgtv-tech/jetcache-go

Getting started

Basic Usage

import (
	"bytes"
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"time"

	"github.com/mgtv-tech/jetcache-go"
	"github.com/mgtv-tech/jetcache-go/local"
	"github.com/mgtv-tech/jetcache-go/remote"
	"github.com/redis/go-redis/v9"
)

var errRecordNotFound = errors.New("mock gorm.errRecordNotFound")

type object struct {
	Str string
	Num int
}

func Example_basicUsage() {
	ring := redis.NewRing(&redis.RingOptions{
		Addrs: map[string]string{
			"localhost": ":6379",
		},
	})

	mycache := cache.New(cache.WithName("any"),
		cache.WithRemote(remote.NewGoRedisV9Adapter(ring)),
		cache.WithLocal(local.NewFreeCache(256*local.MB, time.Minute)),
		cache.WithErrNotFound(errRecordNotFound))

	ctx := context.TODO()
	key := "mykey:1"
	obj, _ := mockDBGetObject(1)
	if err := mycache.Set(ctx, key, cache.Value(obj), cache.TTL(time.Hour)); err != nil {
		panic(err)
	}

	var wanted object
	if err := mycache.Get(ctx, key, &wanted); err == nil {
		fmt.Println(wanted)
	}
	// Output: {mystring 42}

	mycache.Close()
}

func Example_advancedUsage() {
	ring := redis.NewRing(&redis.RingOptions{
		Addrs: map[string]string{
			"localhost": ":6379",
		},
	})

	mycache := cache.New(cache.WithName("any"),
		cache.WithRemote(remote.NewGoRedisV9Adapter(ring)),
		cache.WithLocal(local.NewFreeCache(256*local.MB, time.Minute)),
		cache.WithErrNotFound(errRecordNotFound),
		cache.WithRefreshDuration(time.Minute))

	ctx := context.TODO()
	key := "mykey:1"
	obj := new(object)
	if err := mycache.Once(ctx, key, cache.Value(obj), cache.TTL(time.Hour), cache.Refresh(true),
		cache.Do(func(ctx context.Context) (any, error) {
			return mockDBGetObject(1)
		})); err != nil {
		panic(err)
	}
	fmt.Println(obj)
	// Output: &{mystring 42}

	mycache.Close()
}

func Example_mGetUsage() {
	ring := redis.NewRing(&redis.RingOptions{
		Addrs: map[string]string{
			"localhost": ":6379",
		},
	})

	mycache := cache.New(cache.WithName("any"),
		cache.WithRemote(remote.NewGoRedisV9Adapter(ring)),
		cache.WithLocal(local.NewFreeCache(256*local.MB, time.Minute)),
		cache.WithErrNotFound(errRecordNotFound),
		cache.WithRemoteExpiry(time.Minute),
	)
	cacheT := cache.NewT[int, *object](mycache)

	ctx := context.TODO()
	key := "mget"
	ids := []int{1, 2, 3}

	ret := cacheT.MGet(ctx, key, ids, func(ctx context.Context, ids []int) (map[int]*object, error) {
		return mockDBMGetObject(ids)
	})

	var b bytes.Buffer
	for _, id := range ids {
		b.WriteString(fmt.Sprintf("%v", ret[id]))
	}
	fmt.Println(b.String())
	// Output: &{mystring 1}&{mystring 2}<nil>

	cacheT.Close()
}

func Example_syncLocalUsage() {
	ring := redis.NewRing(&redis.RingOptions{
		Addrs: map[string]string{
			"localhost": ":6379",
		},
	})

	sourceID := "12345678" // Unique identifier for this cache instance
	channelName := "syncLocalChannel"
	pubSub := ring.Subscribe(context.Background(), channelName)

	mycache := cache.New(cache.WithName("any"),
		cache.WithRemote(remote.NewGoRedisV9Adapter(ring)),
		cache.WithLocal(local.NewFreeCache(256*local.MB, time.Minute)),
		cache.WithErrNotFound(errRecordNotFound),
		cache.WithRemoteExpiry(time.Minute),
		cache.WithSourceId(sourceID),
		cache.WithSyncLocal(true),
		cache.WithEventHandler(func(event *cache.Event) {
			// Broadcast local cache invalidation for the received keys
			bs, _ := json.Marshal(event)
			ring.Publish(context.Background(), channelName, string(bs))
		}),
	)
	obj, _ := mockDBGetObject(1)
	if err := mycache.Set(context.TODO(), "mykey", cache.Value(obj), cache.TTL(time.Hour)); err != nil {
		panic(err)
	}

	go func() {
		for {
			msg := <-pubSub.Channel()
			var event *cache.Event
			if err := json.Unmarshal([]byte(msg.Payload), &event); err != nil {
				panic(err)
			}
			fmt.Println(event.Keys)

			// Invalidate local cache for received keys (except own events)
			if event.SourceID != sourceID {
				for _, key := range event.Keys {
					mycache.DeleteFromLocalCache(key)
				}
			}
		}
	}()

	// Output: [mykey]
	mycache.Close()
	time.Sleep(time.Second)
}

func mockDBGetObject(id int) (*object, error) {
	if id > 100 {
		return nil, errRecordNotFound
	}
	return &object{Str: "mystring", Num: 42}, nil
}

func mockDBMGetObject(ids []int) (map[int]*object, error) {
	ret := make(map[int]*object)
	for _, id := range ids {
		if id == 3 {
			continue
		}
		ret[id] = &object{Str: "mystring", Num: id}
	}
	return ret, nil
}

Contributing

Everyone is welcome to help improve jetcache-go. If you have any questions, suggestions, or want to add other features, please submit an issue or PR directly.

Please follow these steps to submit a PR:

  • Clone the repository
  • Create a new branch: name it feature-xxx for new features or bug-xxx for bug fixes
  • Describe the changes in detail in the PR

Contact

If you have any questions, please contact daoshenzzg@gmail.com.

Documentation

Overview

Example (AdvancedUsage)
ring := redis.NewRing(&redis.RingOptions{
	Addrs: map[string]string{
		"localhost": ":6379",
	},
})

mycache := cache.New(cache.WithName("any"),
	cache.WithRemote(remote.NewGoRedisV9Adapter(ring)),
	cache.WithLocal(local.NewFreeCache(256*local.MB, time.Minute)),
	cache.WithErrNotFound(errRecordNotFound),
	cache.WithRefreshDuration(time.Minute))

ctx := context.TODO()
key := "mykey:1"
obj := new(object)
if err := mycache.Once(ctx, key, cache.Value(obj), cache.TTL(time.Hour), cache.Refresh(true),
	cache.Do(func(ctx context.Context) (any, error) {
		return mockDBGetObject(1)
	})); err != nil {
	panic(err)
}
fmt.Println(obj)
Output:

&{mystring 42}
Example (BasicUsage)
ring := redis.NewRing(&redis.RingOptions{
	Addrs: map[string]string{
		"localhost": ":6379",
	},
})

mycache := cache.New(cache.WithName("any"),
	cache.WithRemote(remote.NewGoRedisV9Adapter(ring)),
	cache.WithLocal(local.NewFreeCache(256*local.MB, time.Minute)),
	cache.WithErrNotFound(errRecordNotFound))

ctx := context.TODO()
key := "mykey:1"
obj, _ := mockDBGetObject(1)
if err := mycache.Set(ctx, key, cache.Value(obj), cache.TTL(time.Hour)); err != nil {
	panic(err)
}

var wanted object
if err := mycache.Get(ctx, key, &wanted); err == nil {
	fmt.Println(wanted)
}
Output:

{mystring 42}
Example (MGetUsage)
ring := redis.NewRing(&redis.RingOptions{
	Addrs: map[string]string{
		"localhost": ":6379",
	},
})

mycache := cache.New(cache.WithName("any"),
	cache.WithRemote(remote.NewGoRedisV9Adapter(ring)),
	cache.WithLocal(local.NewFreeCache(256*local.MB, time.Minute)),
	cache.WithErrNotFound(errRecordNotFound),
	cache.WithRemoteExpiry(time.Minute),
)
cacheT := cache.NewT[int, *object](mycache)

ctx := context.TODO()
key := "mget"
ids := []int{1, 2, 3}

ret := cacheT.MGet(ctx, key, ids, func(ctx context.Context, ids []int) (map[int]*object, error) {
	return mockDBMGetObject(ids)
})

var b bytes.Buffer
for _, id := range ids {
	b.WriteString(fmt.Sprintf("%v", ret[id]))
}
fmt.Println(b.String())
Output:

&{mystring 1}&{mystring 2}<nil>
Example (SyncLocalUsage)
ring := redis.NewRing(&redis.RingOptions{
	Addrs: map[string]string{
		"localhost": ":6379",
	},
})

sourceID := "12345678" // Unique identifier for this cache instance
channelName := "syncLocalChannel"
pubSub := ring.Subscribe(context.Background(), channelName)

mycache := cache.New(cache.WithName("any"),
	cache.WithRemote(remote.NewGoRedisV9Adapter(ring)),
	cache.WithLocal(local.NewFreeCache(256*local.MB, time.Minute)),
	cache.WithErrNotFound(errRecordNotFound),
	cache.WithRemoteExpiry(time.Minute),
	cache.WithSourceId(sourceID),
	cache.WithSyncLocal(true),
	cache.WithEventHandler(func(event *cache.Event) {
		// Broadcast local cache invalidation for the received keys
		bs, _ := json.Marshal(event)
		ring.Publish(context.Background(), channelName, string(bs))
	}),
)
obj, _ := mockDBGetObject(1)
if err := mycache.Set(context.TODO(), "mykey", cache.Value(obj), cache.TTL(time.Hour)); err != nil {
	panic(err)
}

go func() {
	for {
		msg := <-pubSub.Channel()
		var event *cache.Event
		if err := json.Unmarshal([]byte(msg.Payload), &event); err != nil {
			panic(err)
		}
		fmt.Println(event.Keys)

		// Invalidate local cache for received keys (except own events)
		if event.SourceID != sourceID {
			for _, key := range event.Keys {
				mycache.DeleteFromLocalCache(key)
			}
		}
	}
}()
Output:

[mykey]

Index

Examples

Constants

View Source
const (
	TypeLocal  = "local"
	TypeRemote = "remote"
	TypeBoth   = "both"
)

Variables

View Source
var (
	ErrCacheMiss          = errors.New("cache: key is missing")
	ErrRemoteLocalBothNil = errors.New("cache: both remote and local are nil")
)

Functions

This section is empty.

Types

type Cache

type Cache interface {
	// Set sets the cache with ItemOption
	Set(ctx context.Context, key string, opts ...ItemOption) error
	// Once gets the opts.value for the given key from the cache or
	// executes, caches, and returns the results of the given opts.do,
	// making sure that only one execution is in-flight for a given key
	// at a time. If a duplicate comes in, the duplicate caller waits for the
	// original to complete and receives the same results.
	Once(ctx context.Context, key string, opts ...ItemOption) error
	// Delete deletes cached val with key.
	Delete(ctx context.Context, key string) error
	// DeleteFromLocalCache deletes local cached val with key.
	DeleteFromLocalCache(key string)
	// Exists reports whether val for the given key exists.
	Exists(ctx context.Context, key string) bool
	// Get gets the val for the given key and fills into val.
	Get(ctx context.Context, key string, val any) error
	// GetSkippingLocal gets the val for the given key skipping local cache.
	GetSkippingLocal(ctx context.Context, key string, val any) error
	// TaskSize returns Refresh task size.
	TaskSize() int
	// CacheType returns cache type
	CacheType() string
	// Close closes the cache. This should be called when cache refreshing is
	// enabled and no longer needed, or when it may lead to resource leaks.
	Close()
}

Cache interface is used to define the cache implementation.

func New

func New(opts ...Option) Cache

type DoFunc

type DoFunc func(ctx context.Context) (any, error)

DoFunc returns getValue to be cached.

type Event added in v1.1.1

type Event struct {
	CacheName string
	SourceID  string
	EventType EventType
	Keys      []string
}

type EventType added in v1.1.1

type EventType int
const (
	EventTypeSet          EventType = 1
	EventTypeSetByOnce    EventType = 2
	EventTypeSetByRefresh EventType = 3
	EventTypeSetByMGet    EventType = 4
	EventTypeDelete       EventType = 5
)

type ItemOption

type ItemOption func(o *item)

ItemOption defines the method to customize an Options.

func Do

func Do(do DoFunc) ItemOption

func Refresh

func Refresh(refresh bool) ItemOption

func SetNX

func SetNX(setNx bool) ItemOption

func SetXX

func SetXX(setXx bool) ItemOption

func SkipLocal

func SkipLocal(skipLocal bool) ItemOption

func TTL

func TTL(ttl time.Duration) ItemOption

func Value

func Value(value any) ItemOption

type Option

type Option func(o *Options)

Option defines the method to customize an Options.

func WithCodec

func WithCodec(codec string) Option

func WithErrNotFound

func WithErrNotFound(err error) Option

func WithEventChBufSize added in v1.1.1

func WithEventChBufSize(eventChBufSize int) Option

func WithEventHandler added in v1.1.1

func WithEventHandler(eventHandler func(event *Event)) Option

func WithLocal

func WithLocal(local local.Local) Option

func WithName

func WithName(name string) Option

func WithNotFoundExpiry

func WithNotFoundExpiry(notFoundExpiry time.Duration) Option

func WithOffset

func WithOffset(offset time.Duration) Option

func WithRefreshConcurrency

func WithRefreshConcurrency(refreshConcurrency int) Option

func WithRefreshDuration

func WithRefreshDuration(refreshDuration time.Duration) Option

func WithRemote

func WithRemote(remote remote.Remote) Option

func WithRemoteExpiry

func WithRemoteExpiry(remoteExpiry time.Duration) Option

func WithSeparator added in v1.2.0

func WithSeparator(separator string) Option

func WithSeparatorDisabled added in v1.2.1

func WithSeparatorDisabled(separatorDisabled bool) Option

func WithSourceId added in v1.1.1

func WithSourceId(sourceId string) Option

func WithStatsDisabled

func WithStatsDisabled(statsDisabled bool) Option

func WithStatsHandler

func WithStatsHandler(handler stats.Handler) Option

func WithStopRefreshAfterLastAccess

func WithStopRefreshAfterLastAccess(stopRefreshAfterLastAccess time.Duration) Option

func WithSyncLocal added in v1.1.1

func WithSyncLocal(syncLocal bool) Option

type Options

type Options struct {
	// contains filtered or unexported fields
}

Options are used to store cache options.

type T

type T[K constraints.Ordered, V any] struct {
	Cache
}

T wrap Cache to support golang's generics

func NewT

func NewT[K constraints.Ordered, V any](cache Cache) *T[K, V]

NewT new a T

func (*T[K, V]) Get added in v1.1.4

func (w *T[K, V]) Get(ctx context.Context, key string, id K, fn func(context.Context, K) (V, error)) (V, error)

Get retrieves the value associated with the given `key` and `id`.

It first attempts to fetch the value from the cache. If a cache miss occurs, it calls the provided `fn` function to fetch the value and stores it in the cache with an expiration time determined by the cache configuration.

A `Once` mechanism is employed to ensure only one fetch is performed for a given `key` and `id` combination, even under concurrent access.

func (*T[K, V]) MGet

func (w *T[K, V]) MGet(ctx context.Context, key string, ids []K, fn func(context.Context, []K) (map[K]V, error)) (result map[K]V)

MGet efficiently retrieves multiple values associated with the given `key` and `ids`.

It attempts to fetch all values from the cache. For missing values, it calls the provided `fn` function to fetch the remaining values and updates the cache with an expiration time determined by the cache configuration.

The results are returned as a map where the key is the `id` and the value is the corresponding data.

func (*T[K, V]) Set added in v1.1.4

func (w *T[K, V]) Set(ctx context.Context, key string, id K, v V) error

Set sets the value `v` associated with the given `key` and `id` in the cache. The expiration time of the cached value is determined by the cache configuration.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL