timeline

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 29, 2024 License: Apache-2.0 Imports: 14 Imported by: 0

README

go-timeline - Sparse timeline data cache for Go

What can this library do?

  • Store non-continuous segments of virtualy continuous timeline data.
  • Retrives data for given key and custom period.
  • If data is missing for some part of requested period - automatically loads that part (or more) from source.
  • Merges overlapping or conjuncted periods.
  • Does NOT support deletion.
  • Divides timelines by Key to be able to store multiple separate timelines - like a map of timelines.
  • Able to support any kind of key and data.
  • Supports empty periods.
  • Supports entries with equal time.
  • Is thread-safe.
  • Allows to persist and load cache index and data.
  • Has multiple drop-in implementations: memory, file, sqlite, two-layer.

This library uses go-sparse as base container for storing sparse timeline data.

Intention

This library is a good choice if:

  • You algorythm reads custom periods of virtually continuous historical data. Examples of such historical data could be market data and chat messages.
  • Reading of (almost) same periods many times (e.g. due to many parallel algorythms).
  • Reading data from source every time would be too slow.
  • Feeding the entire history to algorythm is not an option: either it is too big and you don't know which smaller custom periods will be requested, or you just don't wan't to bother configure it every time.

So the idea is that the algorythm decided which periods of data it requires, and requests them through this lib.

I personally used it to implement Binance and Telegram clients for my crypto trading software. My software has optimization module, which concurrently runs throusands replicas of trading strategy. These replicas are all requesting candlestick history and chat messages history. Loading them from the server every time would have multiple disadvantages:

  • Either I would need to manually configure period of data to load, or
  • it would be super slow to load each small period and for each replica.
  • It would result in a ban of my account, because I would immediatelly reach API limits.

By using this library my algorythm decides on its own which period to load, the library loads the data (only the part which is missing), and new data is merged with already cached data. And it is done only once per replica of strategy.

Usage

Have some API server (dummy in this case)
type ChatMessage struct {
	Time time.Time
	Text string
}

var chatMessagesOnServer = []ChatMessage{
	{time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), "Hello"},
	{time.Date(2021, 1, 2, 0, 0, 0, 0, time.UTC), "World"},
	{time.Date(2021, 1, 3, 0, 0, 0, 0, time.UTC), "How"},
	{time.Date(2021, 1, 4, 0, 0, 0, 0, time.UTC), "Are"},
	{time.Date(2021, 1, 5, 0, 0, 0, 0, time.UTC), "You"},
	{time.Date(2021, 1, 6, 0, 0, 0, 0, time.UTC), "Doing"},
	{time.Date(2021, 1, 7, 0, 0, 0, 0, time.UTC), "Today"},
	{time.Date(2021, 1, 8, 0, 0, 0, 0, time.UTC), "Good"},
	{time.Date(2021, 1, 9, 0, 0, 0, 0, time.UTC), "Bye"},
	{time.Date(2021, 1, 10, 0, 0, 0, 0, time.UTC), ":*"},
}

func GetMessagesFromServer(chatID string, from, to time.Time) ([]ChatMessage, error) {
	if chatID != "test-chat-1" {
		return nil, fmt.Errorf("invalid chat ID: %v", chatID)
	}

	var res []ChatMessage

	for _, m := range chatMessagesOnServer {
		if m.Time.Before(from) {
			continue
		}
		if m.Time.After(to) {
			break
		}

		res = append(res, m)
	}

	return res, nil
}
Create load function
loadMessages := func(key string,
	periodStart, periodEnd time.Time,
	closestFromStart, closestFromEnd *ChatMessage,
	extra interface{},
) (timeline.CacheFetchResult[ChatMessage], error) {
	msgs, err := GetMessagesFromServer(key, periodStart, periodEnd)
	if err != nil {
		return timeline.CacheFetchResult[ChatMessage]{}, fmt.Errorf("loading chat messages from server: %w", err)
	}

	// Note that period is explicitly set. It must contain at least period [periodStart, periodEnd],
	// but can be bigger. Even if there is no data at that period, PeriodStart and PeriodEnd must be set
	// to indicate that the period is empty.
	return timeline.CacheFetchResult[ChatMessage]{
		Data:        msgs,
		PeriodStart: periodStart,
		PeriodEnd:   periodEnd,
	}, nil
}
Create cache
cache := timeline.NewMemoryCache(timeline.MemoryCacheOptions[ChatMessage, string]{
	GetTimestamp:  func(d *ChatMessage) time.Time { return d.Time },
	GetFromSource: loadMessages,
})
Check that messages are not yet cached
chatID := "test-chat-1"
periodStart := time.Date(2021, 1, 3, 0, 0, 0, 0, time.UTC)
periodEnd := time.Date(2021, 1, 5, 0, 0, 0, 0, time.UTC)

_, areCached, _ := cache.GetCached(chatID, periodStart, periodEnd)
require.False(t, areCached)
Fetch messages from server
msgs, err := cache.Get(chatID, periodStart, periodEnd, nil)
require.NoError(t, err)
require.Len(t, msgs, 3) // How, Are, You

For examples, see folder examples or test files *_test.go.

Implemented caches

Memory cache

This cache implementation uses in-memory array as a storage for data entries. It uses sparse.ArrayData as base. It can be supplied with Load and Save functions to make this cache persistent.

File cache

This implementation of persistent cache, which serializes data entries into JSON and stores them in a file in a specified folder, one file per key. When data is requested for some key, the entire file content is loaded into memory.

This is not the most effective was of storing cache, but it is simple and reliable and serves more as a demonstration. Also it might be convenient to be able to read JSON files by other utilities.

SQLite cache

This is implementation of persistent cache, which uses SQLite file dabatase as its persistent storage, one dabatase file per key. When timeline is requested for some key, it is loaded its corresponding database file.

Note, that requested data is not stored in any long-term memory location - it is just returned to the called. And reading from database is not as fast as just reading from memory, so if speed is imporant you would need to also add some in-memory caching (see Two-layer cache section). But the advantage of using just plain SQLite cache is that used memory size is not groving with usage of cache - it stays on disk no matter how much data was requested.

Two-layer cache

The library provided two layer cache in form of MemoryAndSqliteCache. The intention is to have slow pesistent layer and fast volatile later to work together to sum their pros and negate their cons.

When data is added into this cache, it stored in both memory and SQLite caches. When data is requested for some key, it is first seached in memory cache, and only if it is missing there it is loaded from SQLite cache into memory cache. So it is both effectively persistent and fast. The only disadvantage is that memory usage grows after more data is requested, because it is loaded from disk and stored in memory cache.

Special use-cases

What is Composite keys?

Key is an identification of specific historical timeline. For chats history that would be ID or name of a chat. For candlestick history that would be coin name + timeframe (e.g. (BTCUSDT, 1h)). And in such an example of coin name + timeframe the key will be a composite key - a structure of two fields. This is supported by the library, but correct funtion for "stringify key" function must be provided.

How to sort by Time + ID?

Some entries not only have timestamp, but also other index fields, e.g. ID. This is supported seemlesly by the library, because order of entries is preserved. So if you want stable ordering of the entries - sort them by Time + ID in load funtion.

How to update the most recent entry?

In some historical data the entries not added, but also updated. Example of such data is candlestik history: last candle is updated maybe time before new candle is added.

The library supports updating entries - when you add new period, it overwrites any cached entries in that period. So in case of candlestick history, the function, which loads data from source, must update not the requested period exactly, but period of at least one candle length. In that case, when the library requests data from source for period [last update; now] it will actually get data at least for period [candle start ; now] and thus will overwrite last candle. But don't forget to also extend the period returned to the load funtion - it must at least include ALL returned entries.

For such case not only last entry must be updated, but every edge entry on older side of requested period. See next point for explanation why.

Importance of updating edge entries

If the nature of your historical data includes updates of last entry, you need to implement load function, so that it always loads the last entry before requested period. This is because at the time, when that period was fetched, that entry was most likely incomplete and later received more updates, which are missing in the cache. So you need to extend fetched period to overwrite that entry with its latest version.

For example, image loading 1h candles first for date [10:00; now] at 12:15. That would candles for [10:00; 11:00), [11:00; 12:00) and [12:00:13:00). Then some time passes, and at 14:30 you want to load candles for bigger period [10:00; now] (so [10:00; 14:30]). The library would ask your load funtion to load period [12:15; now]. But the candles are identifies by their Open Time , that would load only candles [13:00; 14:00) and [14:00; 15:00). Candle [12:00; 13:00) is not included, because its Open Time is smaller than 12:15. But because of this that edge candle [12:00; 13:00) won't received updated, which were done between (12:15 and 13:00). To fix that, load funtion must load one candle more than requested. It will return it to the library and library will overwrite the obsolete version. But don't forget to also extend the period returned to the load funtion - it must at least include ALL returned entries.

What if source does not provide convenient interface?

Sometimes source of data may not provide interface for getting data for a custom period - sometimes only pagination is available. This is not a problem. In that case, the funtion for loadig data from source must load all the pages until requested period and the period itself, an return everything it loaded. The library will cache all of that, which might be useful in future, especially you persist cache content.

Some sources may provide pagination, but with an ability to start from specific entry or page other than first. If that is the case, the load function can use border entries information provided by the library to determine from with entry or page it should start, instead of doing than from the beggining.

Example of such interface is Telegram API. In Telegram, there is only pagination available, so you cannot request messages for a custom period. But you can request them starting from specific message ID. So the load function an start from newest loaded message after requested period, instead of starting from the latest message in chat.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var IdxCmp = time.Time.Compare

Functions

func CreateConvertorToString

func CreateConvertorToString[Val any]() func(Val) string

Types

type Cache

type Cache[Data any, Key any] interface {
	Get(key Key, periodStart, periodEnd time.Time, extra interface{}) ([]Data, error)
	GetCached(key Key, periodStart, periodEnd time.Time) ([]Data, bool, error)
	GetCachedAll(key Key, requiredPeriodStart, requiredPeriodEnd, minPeriodStart, maxPeriodEnd time.Time) (CacheStateSegment[Data], bool, error)
	GetCachedPeriodClosestFromStart(key Key, point time.Time, nonEmpty bool) (*TimePeriodBounds[Data], error)
	GetCachedPeriodClosestFromEnd(key Key, point time.Time, nonEmpty bool) (*TimePeriodBounds[Data], error)
	Close()
}

type CacheBase

type CacheBase[Data any, Key any] struct {
	// contains filtered or unexported fields
}

func NewCacheBase

func NewCacheBase[Data any, Key any](opts CacheBaseOptions[Data, Key]) *CacheBase[Data, Key]

func (*CacheBase[Data, Key]) Close

func (c *CacheBase[Data, Key]) Close()

func (*CacheBase[Data, Key]) Get

func (c *CacheBase[Data, Key]) Get(key Key, periodStart, periodEnd time.Time, extra interface{}) ([]Data, error)

Retrieve data for specified period. If data is not found in cache, it will be loaded from source. Parameter extra is passed to GetFromSource function.

func (*CacheBase[Data, Key]) GetCached

func (c *CacheBase[Data, Key]) GetCached(key Key, periodStart, periodEnd time.Time) ([]Data, bool, error)

Retrieve data from cache for specified period without loading it from source.

The error may be returned due to failure of loading data from persistent storage. If Load function is not configured, then error is never returned. Error is never returned due to loading from source, because it never happens.

func (*CacheBase[Data, Key]) GetCachedAll

func (c *CacheBase[Data, Key]) GetCachedAll(key Key, requiredPeriodStart, requiredPeriodEnd, minPeriodStart, maxPeriodEnd time.Time) (CacheStateSegment[Data], bool, error)

Retrieve data on the longest possible period, containing requested period. Period [requiredPeriodStart, requiredPeriodEnd] is required to be in the result. Period [minPeriodStart, maxPeriodEnd] is desired to be in the result, but not required. No entries will be returned outside of this period.

The error may be returned due to failure of loading data from persistent storage. If Load function is not configured, then error is never returned. Error is never returned due to loading from source, because it never happens.

func (*CacheBase[Data, Key]) GetCachedPeriodClosestFromEnd

func (c *CacheBase[Data, Key]) GetCachedPeriodClosestFromEnd(key Key, point time.Time, nonEmpty bool) (*TimePeriodBounds[Data], error)

Retrieves period boundaries (and edge entries) of a cached period, closest to the specified point from the end. If nonEmpty is true, then only non-empty periods are considered. If point is inside of some period, that period is returned. If no period is found, nil is returned.

func (*CacheBase[Data, Key]) GetCachedPeriodClosestFromStart

func (c *CacheBase[Data, Key]) GetCachedPeriodClosestFromStart(key Key, point time.Time, nonEmpty bool) (*TimePeriodBounds[Data], error)

Retrieves period boundaries (and edge entries) of a cached period, closest to the specified point from the start. If nonEmpty is true, then only non-empty periods are considered. If point is inside of some period, that period is returned. If no period is found, nil is returned.

type CacheBaseOptions

type CacheBaseOptions[Data any, Key any] struct {
	KeyToStr             func(Key) string
	GetTimestamp         func(d *Data) time.Time
	GetFromSource        CacheSource[Data, Key]
	Storage              CacheStorage[Data, Key]
	SkipDataVerification bool
}

type CacheData

type CacheData[Data any] sparse.SeriesData[Data, time.Time]

type CacheSource

type CacheSource[Data any, Key any] func(key Key, periodStart, periodEnd time.Time, closestFromStart, closestFromEnd *Data, extra interface{}) (CacheStateSegment[Data], error)

type CacheState

type CacheState[Data any] sparse.SeriesState[Data, time.Time]

type CacheStateSegment

type CacheStateSegment[Data any] struct {
	PeriodStart time.Time
	PeriodEnd   time.Time
	Data        []Data
}

type CacheStorage

type CacheStorage[Data any, Key any] interface {
	Load(key Key) (*CacheState[Data], error)
	Save(key Key, state *CacheState[Data], updated []*CacheStateSegment[Data]) error
	Add(key Key, periodStart, periodEnd time.Time, data []Data) (CacheData[Data], error)
}

type FileCache

type FileCache[Data any, Key any] struct {
	MemoryCache[Data, Key]
	// contains filtered or unexported fields
}

func NewFileCache

func NewFileCache[Data any, Key any](opts FileCacheOptions[Data, Key]) (*FileCache[Data, Key], error)

type FileCacheOptions

type FileCacheOptions[Data any, Key any] struct {
	CacheDir             string
	KeyToStr             func(Key) string
	GetTimestamp         func(d *Data) time.Time
	GetFromSource        CacheSource[Data, Key]
	SkipDataVerification bool
}

type LoadCaheFunc

type LoadCaheFunc[Data any, Key any] func(key Key) (*MemoryCacheState[Data], error)

Load cache from persistent storage

type MemoryAndSqliteCache

type MemoryAndSqliteCache[Data any, Key any, ID comparable] struct {
	MemoryCache[Data, Key]
	// contains filtered or unexported fields
}

func NewMemoryAndSqliteCache

func NewMemoryAndSqliteCache[Data any, Key any, ID comparable](opts MemoryAndSqliteCacheOptions[Data, Key, ID]) (*MemoryAndSqliteCache[Data, Key, ID], error)

type MemoryAndSqliteCacheOptions

type MemoryAndSqliteCacheOptions[Data any, Key any, ID comparable] struct {
	SqliteCacheOptions[Data, Key, ID]
	MinFirstLayerFetchPeriod time.Duration
}

type MemoryCache

type MemoryCache[Data any, Key any] struct {
	CacheBase[Data, Key]
}

func NewMemoryCache

func NewMemoryCache[Data any, Key any](opts MemoryCacheOptions[Data, Key]) *MemoryCache[Data, Key]

type MemoryCacheOptions

type MemoryCacheOptions[Data any, Key any] struct {
	GetTimestamp         func(d *Data) time.Time  // (Required)
	GetFromSource        CacheSource[Data, Key]   // (Required) Fetch data from source
	KeyToStr             func(Key) string         // (Optional)
	Save                 SaveCacheFunc[Data, Key] // (Optional) Save to persistent storage
	Load                 LoadCaheFunc[Data, Key]  // (Optional) Load from persistent storage
	SkipDataVerification bool                     // (Optional) Set this to true if you are sure your data is well-ordered
}

type MemoryCacheState

type MemoryCacheState[Data any] struct {
	Segments []*CacheStateSegment[Data]
}

type PeriodBounds

type PeriodBounds = sparse.PeriodBounds[time.Time]

type SaveCacheFunc

type SaveCacheFunc[Data any, Key any] func(key Key, state *CacheState[Data], updated []*CacheStateSegment[Data]) error

Save cache to persistent storage

type SqliteCache

type SqliteCache[Data any, Key any, ID comparable] struct {
	CacheBase[Data, Key]
}

func NewSqliteCache

func NewSqliteCache[Data any, Key any, ID comparable](opts SqliteCacheOptions[Data, Key, ID]) (*SqliteCache[Data, Key, ID], error)

type SqliteCacheOptions

type SqliteCacheOptions[Data any, Key any, ID comparable] struct {
	CacheDir             string                  // (Required) Directory to store cache files
	GetTimestamp         func(d *Data) time.Time // (Required) Get timestamp of data entry
	GetID                func(d *Data) ID        // (Required) Get ID of data entry. Needed to properly order data in DB if timestamp is not unique.
	GetFromSource        CacheSource[Data, Key]  // (Required) Fetch data from source
	KeyToStr             func(Key) string        // (Optional) Convert key to string
	SkipDataVerification bool
}

type TimePeriodBounds

type TimePeriodBounds[Data any] struct {
	sparse.PeriodBounds[time.Time]
	First *Data
	Last  *Data
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL