olric

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 9, 2025 License: Apache-2.0 Imports: 41 Imported by: 0

README

Olric

build codecov

This is forked version of the main repository with few bug fixes, refactoring, and it is only handles the embedded version. Please use the original repo for any bugs or related questions.

Modifications from original library

  • Support only embedded mode even though the majority of the code to run client/server is still there except the runner code.
  • Remove Client/Server mode
  • Renamed module name
  • Upgrade go version to 1.22.9
  • Refactor the readme to suit the behavior of this fork
  • Fix some go routines leaks bugs
  • TLS Support

Overview

Olric is a distributed, in-memory key/value store and cache. It's designed from the ground up to be distributed, and it can be used both as an embedded Go library and as a language-independent service.

With Olric, you can instantly create a fast, scalable, shared pool of RAM across a cluster of computers.

Olric is implemented in Go and uses the Redis serialization protocol. So Olric has client implementations in all major programming languages.

Olric is highly scalable and available. Distributed applications can use it for distributed caching, clustering and publish-subscribe messaging.

It is designed to scale out to hundreds of members and thousands of clients. When you add new members, they automatically discover the cluster and linearly increase the memory capacity. Olric offers simple scalability, partitioning ( sharding), and re-balancing out-of-the-box. It does not require any extra coordination processes. With Olric, when you start another process to add more capacity, data and backups are automatically and evenly balanced.

See Samples sections to get started!

At a glance

  • Designed to share some transient, approximate, fast-changing data between servers,
  • Uses Redis serialization protocol,
  • Implements a distributed hash table,
  • Provides a drop-in replacement for Redis Publish/Subscribe messaging system,
  • Supports both programmatic and declarative configuration,
  • Supports different eviction algorithms (including LRU and TTL),
  • Highly available and horizontally scalable,
  • Provides best-effort consistency guarantees without being a complete CP (indeed PA/EC) solution,
  • Supports replication by default (with sync and async options),
  • Quorum-based voting for replica control (Read/Write quorums),
  • Supports atomic operations,
  • Provides an iterator on distributed maps,
  • Provides a plugin interface for service discovery daemons,
  • Provides a locking primitive which inspired by SETNX of Redis,

Possible Use Cases

Olric is an eventually consistent, unordered key/value data store. It supports various eviction mechanisms for distributed caching implementations. Olric also provides publish-subscribe messaging, data replication, failure detection and simple anti-entropy services.

It's good at distributed caching and publish/subscribe messaging.

Table of Contents

Features

  • Designed to share some transient, approximate, fast-changing data between servers,
  • Accepts arbitrary types as value,
  • Only in-memory,
  • Uses Redis protocol,
  • Compatible with existing Redis clients,
  • Embeddable but can be used as a language-independent service with olricd,
  • GC-friendly storage engine,
  • O(1) running time for lookups,
  • Supports atomic operations,
  • Provides a lock implementation which can be used for non-critical purposes,
  • Different eviction policies: LRU, MaxIdleDuration and Time-To-Live (TTL),
  • Highly available,
  • Horizontally scalable,
  • Provides best-effort consistency guarantees without being a complete CP (indeed PA/EC) solution,
  • Distributes load fairly among cluster members with a consistent hash function,
  • Supports replication by default (with sync and async options),
  • Quorum-based voting for replica control,
  • Thread-safe by default,
  • Provides an iterator on distributed maps,
  • Provides a plugin interface for service discovery daemons and cloud providers,
  • Provides a locking primitive which inspired by SETNX of Redis,
  • Provides a drop-in replacement of Redis' Publish-Subscribe messaging feature.

See Architecture section to see details.

HowTo.

See Samples section to learn how to embed Olric into your existing Golang application.

Cluster Events

Olric can send push cluster events to cluster.events channel. Available cluster events:

  • node-join-event
  • node-left-event
  • fragment-migration-event
  • fragment-received-even

If you want to receive these events, set true to EnableClusterEventsChannel and subscribe to cluster.events channel. The default is false.

See events/cluster_events.go file to get more information about events.

Configuration

import "github.com/tochemey/olric/config"
...
c := config.New("local")

The New function takes a parameter called env. It denotes the network environment and consumed by hashicorp/memberlist. Default configuration is good enough for distributed caching scenario. In order to see all configuration parameters, please take a look at this.

See Sample Code section for an introduction.

Network Configuration

In an Olric instance, there are two different TCP servers. One for Olric, and the other one is for memberlist. BindAddr is very critical to deploy a healthy Olric node. There are different scenarios:

  • You can freely set a domain name or IP address as BindAddr for both Olric and memberlist. Olric will resolve and use it to bind.
  • You can freely set localhost, 127.0.0.1 or ::1 as BindAddr in development environment for both Olric and memberlist.
  • You can freely set 0.0.0.0 as BindAddr for both Olric and memberlist. Olric will pick an IP address, if there is any.
  • If you don't set BindAddr, hostname will be used, and it will be resolved to get a valid IP address.
  • You can set a network interface by using Config.Interface and Config.MemberlistInterface fields. Olric will find an appropriate IP address for the given interfaces, if there is any.
  • You can set both BindAddr and interface parameters. In this case Olric will ensure that BindAddr is available on the given interface.

You should know that Olric needs a single and stable IP address to function properly. If you don't know the IP address of the host at the deployment time, you can set BindAddr as 0.0.0.0. Olric will very likely to find an IP address for you.

Service Discovery

Olric provides a service discovery interface which can be used to implement plugins.

We currently have a bunch of service discovery plugins for automatic peer discovery on cloud environments:

In order to get more info about installation and configuration of the plugins, see their GitHub page.

Timeouts

Olric nodes supports setting KeepAlivePeriod on TCP sockets.

Server-side:

config.KeepAlivePeriod

KeepAlivePeriod denotes whether the operating system should send keep-alive messages on the connection.

Client-side:

config.DialTimeout

Timeout for TCP dial. The timeout includes name resolution, if required. When using TCP, and the host in the address parameter resolves to multiple IP addresses, the timeout is spread over each consecutive dial, such that each is given an appropriate fraction of the time to connect.

config.ReadTimeout

Timeout for socket reads. If reached, commands will fail with a timeout instead of blocking. Use value -1 for no timeout and 0 for default. The default is config.DefaultReadTimeout

config.WriteTimeout

Timeout for socket writes. If reached, commands will fail with a timeout instead of blocking. The default is config.DefaultWriteTimeout

Architecture

Overview

Olric uses:

Olric distributes data among partitions. Every partition is being owned by a cluster member and may have one or more backups for redundancy. When you read or write a DMap entry, you transparently talk to the partition owner. Each request hits the most up-to-date version of a particular data entry in a stable cluster.

In order to find the partition which the key belongs to, Olric hashes the key and mod it with the number of partitions:

partID = MOD(hash result, partition count)

The partitions are being distributed among cluster members by using a consistent hashing algorithm. In order to get details, please see buraksezer/consistent.

When a new cluster is created, one of the instances is elected as the cluster coordinator. It manages the partition table:

  • When a node joins or leaves, it distributes the partitions and their backups among the members again,
  • Removes empty previous owners from the partition owners list,
  • Pushes the new partition table to all the members,
  • Pushes the partition table to the cluster periodically.

Members propagate their birthdate(POSIX time in nanoseconds) to the cluster. The coordinator is the oldest member in the cluster. If the coordinator leaves the cluster, the second oldest member gets elected as the coordinator.

Olric has a component called rebalancer which is responsible for keeping underlying data structures consistent:

  • Works on every node,
  • When a node joins or leaves, the cluster coordinator pushes the new partition table. Then, the rebalancer runs immediately and moves the partitions and backups to their new hosts,
  • Merges fragmented partitions.

Partitions have a concept called owners list. When a node joins or leaves the cluster, a new primary owner may be assigned by the coordinator. At any time, a partition may have one or more partition owners. If a partition has two or more owners, this is called fragmented partition. The last added owner is called primary owner. Write operation is only done by the primary owner. The previous owners are only used for read and delete.

When you read a key, the primary owner tries to find the key on itself, first. Then, queries the previous owners and backups, respectively. The delete operation works the same way.

The data(distributed map objects) in the fragmented partition is moved slowly to the primary owner by the **rebalancer **. Until the move is done, the data remains available on the previous owners. The DMap methods use this list to query data on the cluster.

Please note that, 'multiple partition owners' is an undesirable situation and the rebalancer component is designed to fix that in a short time.

Consistency and Replication Model

Olric is an AP product in the context of CAP theorem, which employs the combination of primary-copy and optimistic replication techniques. With optimistic replication, when the partition owner receives a write or delete operation for a key, applies it locally, and propagates it to the backup owners.

This technique enables Olric clusters to offer high throughput. However, due to temporary situations in the system, such as network failure, backup owners can miss some updates and diverge from the primary owner. If a partition owner crashes while there is an inconsistency between itself and the backups, strong consistency of the data can be lost.

Two types of backup replication are available: sync and async. Both types are still implementations of the optimistic replication model.

  • sync: Blocks until write/delete operation is applied by backup owners.
  • async: Just fire & forget.
Last-write-wins conflict resolution

Every time a piece of data is written to Olric, a timestamp is attached by the client. Then, when Olric has to deal with conflict data in the case of network partitioning, it simply chooses the data with the most recent timestamp. This called LWW conflict resolution policy.

PACELC Theorem

From Wikipedia:

In theoretical computer science, the PACELC theorem is an extension to the CAP theorem. It states that in case of network partitioning (P) in a distributed computer system, one has to choose between availability (A) and consistency (C) (as per the CAP theorem), but else (E), even when the system is running normally in the absence of partitions, one has to choose between latency (L) and consistency (C).

In the context of PACELC theorem, Olric is a PA/EC product. It means that Olric is considered to be consistent data store if the network is stable. Because the key space is divided between partitions and every partition is controlled by its primary owner. All operations on DMaps are redirected to the partition owner.

In the case of network partitioning, Olric chooses availability over consistency. So that you can still access some parts of the cluster when the network is unreliable, but the cluster may return inconsistent results.

Olric implements read-repair and quorum based voting system to deal with inconsistencies in the DMaps.

Readings on PACELC theorem:

Read-Repair on DMaps

Read repair is a feature that allows for inconsistent data to be fixed at query time. Olric tracks every write operation with a timestamp value and assumes that the latest write operation is the valid one. When you want to access a key/value pair, the partition owner retrieves all available copies for that pair and compares the timestamp values. The latest one is the winner. If there is some outdated version of the requested pair, the primary owner propagates the latest version of the pair.

Read-repair is disabled by default for the sake of performance. If you have a use case that requires a more strict consistency control than a distributed caching scenario, you can enable read-repair via the configuration.

Quorum-based replica control

Olric implements Read/Write quorum to keep the data in a consistent state. When you start a write operation on the cluster and write quorum (W) is 2, the partition owner tries to write the given key/value pair on its own data storage and on the replica nodes. If the number of successful write operations is below W, the primary owner returns ErrWriteQuorum. The read flow is the same: if you have R=2 and the owner only access one of the replicas, it returns ErrReadQuorum.

Simple Split-Brain Protection

Olric implements a technique called majority quorum to manage split-brain conditions. If a network partitioning occurs, and some members lost the connection to rest of the cluster, they immediately stops functioning and return an error to incoming requests. This behaviour is controlled by MemberCountQuorum parameter. It's default 1.

When the network healed, the stopped nodes joins again the cluster and fragmented partitions is merged by their primary owners in accordance with LWW policy. Olric also implements an ownership report mechanism to fix inconsistencies in partition distribution after a partitioning event.

Eviction

Olric supports different policies to evict keys from distributed maps.

Expire with TTL

Olric implements TTL eviction policy. It shares the same algorithm with Redis:

Periodically Redis tests a few keys at random among keys with an expire set. All the keys that are already expired are deleted from the keyspace.

Specifically this is what Redis does 10 times per second:

  • Test 20 random keys from the set of keys with an associated expire.
  • Delete all the keys found expired.
  • If more than 25% of keys were expired, start again from step 1.

This is a trivial probabilistic algorithm, basically the assumption is that our sample is representative of the whole key space, and we continue to expire until the percentage of keys that are likely to be expired is under 25%

When a client tries to access a key, Olric returns ErrKeyNotFound if the key is found to be timed out. A background task evicts keys with the algorithm described above.

Expire with MaxIdleDuration

Maximum time for each entry to stay idle in the DMap. It limits the lifetime of the entries relative to the time of the last read or write access performed on them. The entries whose idle period exceeds this limit are expired and evicted automatically. An entry is idle if no Get, Put, PutEx, Expire, PutIf, PutIfEx on it. Configuration of MaxIdleDuration feature varies by preferred deployment method.

Expire with LRU

Olric implements LRU eviction method on DMaps. Approximated LRU algorithm is borrowed from Redis. The Redis authors proposes the following algorithm:

It is important to understand that the eviction process works like this:

  • A client runs a new command, resulting in more data added.
  • Redis checks the memory usage, and if it is greater than the maxmemory limit , it evicts keys according to the policy.
  • A new command is executed, and so forth.

So we continuously cross the boundaries of the memory limit, by going over it, and then by evicting keys to return back under the limits.

If a command results in a lot of memory being used (like a big set intersection stored into a new key) for some time the memory limit can be surpassed by a noticeable amount.

Approximated LRU algorithm

Redis LRU algorithm is not an exact implementation. This means that Redis is not able to pick the best candidate for eviction, that is, the access that was accessed the most in the past. Instead it will try to run an approximation of the LRU algorithm, by sampling a small number of keys, and evicting the one that is the best (with the oldest access time) among the sampled keys.

Olric tracks access time for every DMap instance. Then it picks and sorts some configurable amount of keys to select keys for eviction. Every node runs this algorithm independently. The access log is moved along with the partition when a network partition is occured.

Configuration of eviction mechanisms

Here is a simple configuration block for olricd.yaml:

cache:
  numEvictionWorkers: 1
  maxIdleDuration: ""
  ttlDuration: "100s"
  maxKeys: 100000
  maxInuse: 1000000 # in bytes
  lRUSamples: 10
  evictionPolicy: "LRU" # NONE/LRU

You can also set cache configuration per DMap. Here is a simple configuration for a DMap named foobar:

dmaps:
  foobar:
    maxIdleDuration: "60s"
    ttlDuration: "300s"
    maxKeys: 500000 # in-bytes
    lRUSamples: 20
    evictionPolicy: "NONE" # NONE/LRU

If you prefer embedded-member deployment scenario, please take a look at config#CacheConfig and config#DMapCacheConfig for the configuration.

Lock Implementation

The DMap implementation is already thread-safe to meet your thread safety requirements. When you want to have more control on the concurrency, you can use LockWithTimeout and Lock methods. Olric borrows the locking algorithm from Redis. Redis authors propose the following algorithm:

The command is a simple way to implement a locking system with Redis.

A client can acquire the lock if the above command returns OK (or retry after some time if the command returns Nil), and remove the lock just using DEL.

The lock will be auto-released after the expire time is reached.

It is possible to make this system more robust modifying the unlock schema as follows:

Instead of setting a fixed string, set a non-guessable large random string, called token. Instead of releasing the lock with DEL, send a script that only removes the key if the value matches. This avoids that a client will try to release the lock after the expire time deleting the key created by another client that acquired the lock later.

Equivalent ofSETNX command in Olric is PutIf(key, value, IfNotFound). Lock and LockWithTimeout commands are properly implements the algorithm which is proposed above.

You should know that this implementation is subject to the clustering algorithm. So there is no guarantee about reliability in the case of network partitioning. I recommend the lock implementation to be used for efficiency purposes in general, instead of correctness.

Important note about consistency:

You should know that Olric is a PA/EC (see Consistency and Replication Model) product. So if your network is stable, all the operations on key/value pairs are performed by a single cluster member. It means that you can be sure about the consistency when the cluster is stable. It's important to know that computer networks fail occasionally, processes crash and random GC pauses may happen. Many factors can lead a network partitioning. If you cannot tolerate losing strong consistency under network partitioning, you need to use a different tool for locking.

See Hazelcast and the Mythical PA/EC System and Jepsen Analysis on Hazelcast 3.8.3 for more insight on this topic.

Storage Engine

Olric implements a GC-friendly storage engine to store large amounts of data on RAM. Basically, it applies an append-only log file approach with indexes. Olric inserts key/value pairs into pre-allocated byte slices (table in Olric terminology) and indexes that memory region by using Golang's built-in map. The data type of this map is map[uint64]uint64. When a pre-allocated byte slice is full Olric allocates a new one and continues inserting the new data into it. This design greatly reduces the write latency.

When you want to read a key/value pair from the Olric cluster, it scans the related DMap fragment by iterating over the indexes(implemented by the built-in map). The number of allocated byte slices should be small. So Olric would find the key immediately but technically, the read performance depends on the number of keys in the fragment. The effect of this design on the read performance is negligible.

The size of the pre-allocated byte slices is configurable.

Samples

In this section, you can find code snippets for various scenarios.

Embedded-member scenario
Distributed map
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/tochemey/olric"
	"github.com/tochemey/olric/config"
)

func main() {
	// Sample for Olric v0.5.x

	// Deployment scenario: embedded-member
	// This creates a single-node Olric cluster. It's good enough for experimenting.

	// config.New returns a new config.Config with sane defaults. Available values for env:
	// local, lan, wan
	c := config.New("local")

	// Callback function. It's called when this node is ready to accept connections.
	ctx, cancel := context.WithCancel(context.Background())
	c.Started = func() {
		defer cancel()
		log.Println("[INFO] Olric is ready to accept connections")
	}

	// Create a new Olric instance.
	db, err := olric.New(c)
	if err != nil {
		log.Fatalf("Failed to create Olric instance: %v", err)
	}

	// Start the instance. It will form a single-node cluster.
	go func() {
		// Call Start at background. It's a blocker call.
		err = db.Start()
		if err != nil {
			log.Fatalf("olric.Start returned an error: %v", err)
		}
	}()

	<-ctx.Done()

	// In embedded-member scenario, you can use the EmbeddedClient. It implements
	// the Client interface.
	e := db.NewEmbeddedClient()

	dm, err := e.NewDMap("bucket-of-arbitrary-items")
	if err != nil {
		log.Fatalf("olric.NewDMap returned an error: %v", err)
	}

	ctx, cancel = context.WithCancel(context.Background())

	// Magic starts here!
	fmt.Println("##")
	fmt.Println("Simple Put/Get on a DMap instance:")
	err = dm.Put(ctx, "my-key", "Olric Rocks!")
	if err != nil {
		log.Fatalf("Failed to call Put: %v", err)
	}

	gr, err := dm.Get(ctx, "my-key")
	if err != nil {
		log.Fatalf("Failed to call Get: %v", err)
	}

	// Olric uses the Redis serialization format.
	value, err := gr.String()
	if err != nil {
		log.Fatalf("Failed to read Get response: %v", err)
	}

	fmt.Println("Response for my-key:", value)
	fmt.Println("##")

	// Don't forget the call Shutdown when you want to leave the cluster.
	ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()

	err = db.Shutdown(ctx)
	if err != nil {
		log.Printf("Failed to shutdown Olric: %v", err)
	}
}
Publish-Subscribe
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/tochemey/olric"
	"github.com/tochemey/olric/config"
)

func main() {
	// Sample for Olric v0.5.x

	// Deployment scenario: embedded-member
	// This creates a single-node Olric cluster. It's good enough for experimenting.

	// config.New returns a new config.Config with sane defaults. Available values for env:
	// local, lan, wan
	c := config.New("local")

	// Callback function. It's called when this node is ready to accept connections.
	ctx, cancel := context.WithCancel(context.Background())
	c.Started = func() {
		defer cancel()
		log.Println("[INFO] Olric is ready to accept connections")
	}

	// Create a new Olric instance.
	db, err := olric.New(c)
	if err != nil {
		log.Fatalf("Failed to create Olric instance: %v", err)
	}

	// Start the instance. It will form a single-node cluster.
	go func() {
		// Call Start at background. It's a blocker call.
		err = db.Start()
		if err != nil {
			log.Fatalf("olric.Start returned an error: %v", err)
		}
	}()

	<-ctx.Done()

	// In embedded-member scenario, you can use the EmbeddedClient. It implements
	// the Client interface.
	e := db.NewEmbeddedClient()

	ps, err := e.NewPubSub()
	if err != nil {
		log.Fatalf("olric.NewPubSub returned an error: %v", err)
	}

	ctx, cancel = context.WithCancel(context.Background())

	// Olric implements a drop-in replacement of Redis Publish-Subscribe messaging
	// system. PubSub client is just a thin layer around go-redis/redis.
	rps := ps.Subscribe(ctx, "my-channel")

	// Get a message to read messages from my-channel
	msg := rps.Channel()

	go func() {
		// Publish a message here.
		_, err := ps.Publish(ctx, "my-channel", "Olric Rocks!")
		if err != nil {
			log.Fatalf("PubSub.Publish returned an error: %v", err)
		}
	}()

	// Consume messages
	rm := <-msg

	fmt.Printf("Received message: \"%s\" from \"%s\"", rm.Channel, rm.Payload)

	// Don't forget the call Shutdown when you want to leave the cluster.
	ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()

	err = e.Close(ctx)
	if err != nil {
		log.Printf("Failed to close EmbeddedClient: %v", err)
	}
}
SCAN on DMaps
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/tochemey/olric"
	"github.com/tochemey/olric/config"
)

func main() {
	// Sample for Olric v0.5.x

	// Deployment scenario: embedded-member
	// This creates a single-node Olric cluster. It's good enough for experimenting.

	// config.New returns a new config.Config with sane defaults. Available values for env:
	// local, lan, wan
	c := config.New("local")

	// Callback function. It's called when this node is ready to accept connections.
	ctx, cancel := context.WithCancel(context.Background())
	c.Started = func() {
		defer cancel()
		log.Println("[INFO] Olric is ready to accept connections")
	}

	// Create a new Olric instance.
	db, err := olric.New(c)
	if err != nil {
		log.Fatalf("Failed to create Olric instance: %v", err)
	}

	// Start the instance. It will form a single-node cluster.
	go func() {
		// Call Start at background. It's a blocker call.
		err = db.Start()
		if err != nil {
			log.Fatalf("olric.Start returned an error: %v", err)
		}
	}()

	<-ctx.Done()

	// In embedded-member scenario, you can use the EmbeddedClient. It implements
	// the Client interface.
	e := db.NewEmbeddedClient()

	dm, err := e.NewDMap("bucket-of-arbitrary-items")
	if err != nil {
		log.Fatalf("olric.NewDMap returned an error: %v", err)
	}

	ctx, cancel = context.WithCancel(context.Background())

	// Magic starts here!
	fmt.Println("##")
	fmt.Println("Insert 10 keys")
	var key string
	for i := 0; i < 10; i++ {
		if i%2 == 0 {
			key = fmt.Sprintf("even:%d", i)
		} else {
			key = fmt.Sprintf("odd:%d", i)
		}
		err = dm.Put(ctx, key, nil)
		if err != nil {
			log.Fatalf("Failed to call Put: %v", err)
		}
	}

	i, err := dm.Scan(ctx)
	if err != nil {
		log.Fatalf("Failed to call Scan: %v", err)
	}

	fmt.Println("Iterate over all the keys")
	for i.Next() {
		fmt.Println(">> Key", i.Key())
	}

	i.Close()

	i, err = dm.Scan(ctx, olric.Match("^even:"))
	if err != nil {
		log.Fatalf("Failed to call Scan: %v", err)
	}

	fmt.Println("\n\nScan with regex: ^even:")
	for i.Next() {
		fmt.Println(">> Key", i.Key())
	}

	i.Close()

	// Don't forget the call Shutdown when you want to leave the cluster.
	ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()

	err = db.Shutdown(ctx)
	if err != nil {
		log.Printf("Failed to shutdown Olric: %v", err)
	}
}
Publish-Subscribe
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/tochemey/olric"
)

func main() {
	// Sample for Olric v0.5.x

	// Deployment scenario: client-server

	// NewClusterClient takes a list of the nodes. This list may only contain a
	// load balancer address. Please note that Olric nodes will calculate the partition owner
	// and proxy the incoming requests.
	c, err := olric.NewClusterClient([]string{"localhost:3320"})
	if err != nil {
		log.Fatalf("olric.NewClusterClient returned an error: %v", err)
	}

	// In client-server scenario, you can use the ClusterClient. It implements
	// the Client interface.
	ps, err := c.NewPubSub()
	if err != nil {
		log.Fatalf("olric.NewPubSub returned an error: %v", err)
	}

	ctx, cancel := context.WithCancel(context.Background())

	// Olric implements a drop-in replacement of Redis Publish-Subscribe messaging
	// system. PubSub client is just a thin layer around go-redis/redis.
	rps := ps.Subscribe(ctx, "my-channel")

	// Get a message to read messages from my-channel
	msg := rps.Channel()

	go func() {
		// Publish a message here.
		_, err := ps.Publish(ctx, "my-channel", "Olric Rocks!")
		if err != nil {
			log.Fatalf("PubSub.Publish returned an error: %v", err)
		}
	}()

	// Consume messages
	rm := <-msg

	fmt.Printf("Received message: \"%s\" from \"%s\"", rm.Channel, rm.Payload)

	// Don't forget the call Shutdown when you want to leave the cluster.
	ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()

	err = c.Close(ctx)
	if err != nil {
		log.Printf("Failed to close ClusterClient: %v", err)
	}
}

Contributions

Please don't hesitate to fork the project and send a pull request or just e-mail me to ask questions and share ideas.

License

The Apache License, Version 2.0 - see LICENSE for more details.

About the name

The inner voice of Turgut Özben who is the main character of Oğuz Atay's masterpiece -The Disconnected-.

Documentation

Overview

Package olric provides a distributed cache and in-memory key/value data store. It can be used both as an embedded Go library and as a language-independent service.

With Olric, you can instantly create a fast, scalable, shared pool of RAM across a cluster of computers.

Olric is designed to be a distributed cache. But it also provides Publish/Subscribe, data replication, failure detection and simple anti-entropy services. So it can be used as an ordinary key/value data store to scale your cloud application.

Index

Constants

View Source
const DefaultPingResponse = "PONG"
View Source
const DefaultRoutingTableFetchInterval = time.Minute

DefaultRoutingTableFetchInterval is the default value of RoutingTableFetchInterval. ClusterClient implementation fetches the routing table from the cluster to route requests to the right partition.

View Source
const DefaultScanCount = 10
View Source
const ReleaseVersion string = "0.2.0-alpha"

ReleaseVersion is the current stable version of Olric

Variables

View Source
var (
	// ErrOperationTimeout is returned when an operation times out.
	ErrOperationTimeout = errors.New("operation timeout")

	// ErrServerGone means that a cluster member is closed unexpectedly.
	ErrServerGone = errors.New("server is gone")

	// ErrKeyNotFound means that returned when a key could not be found.
	ErrKeyNotFound = errors.New("key not found")

	// ErrKeyFound means that the requested key found in the cluster.
	ErrKeyFound = errors.New("key found")

	// ErrWriteQuorum means that write quorum cannot be reached to operate.
	ErrWriteQuorum = errors.New("write quorum cannot be reached")

	// ErrReadQuorum means that read quorum cannot be reached to operate.
	ErrReadQuorum = errors.New("read quorum cannot be reached")

	// ErrLockNotAcquired is returned when the requested lock could not be acquired
	ErrLockNotAcquired = errors.New("lock not acquired")

	// ErrNoSuchLock is returned when the requested lock does not exist
	ErrNoSuchLock = errors.New("no such lock")

	// ErrClusterQuorum means that the cluster could not reach a healthy numbers of members to operate.
	ErrClusterQuorum = errors.New("failed to find enough peers to create quorum")

	// ErrKeyTooLarge means that the given key is too large to process.
	// Maximum length of a key is 256 bytes.
	ErrKeyTooLarge = errors.New("key too large")

	// ErrEntryTooLarge returned if the required space for an entry is bigger than table size.
	ErrEntryTooLarge = errors.New("entry too large for the configured table size")

	// ErrConnRefused returned if the target node refused a connection request.
	// It is good to call RefreshMetadata to update the underlying data structures.
	ErrConnRefused = errors.New("connection refused")
)
View Source
var (
	// ErrNotReady denotes that the Future instance you hold is not ready to read the response yet.
	ErrNotReady = errors.New("not ready yet")

	// ErrPipelineClosed denotes that the underlying pipeline is closed, and it's impossible to operate.
	ErrPipelineClosed = errors.New("pipeline is closed")

	// ErrPipelineExecuted denotes that Exec was already called on the underlying pipeline.
	ErrPipelineExecuted = errors.New("pipeline already executed")
)
View Source
var ErrNilResponse = errors.New("storage entry is nil")

Functions

This section is empty.

Types

type Client

type Client interface {
	// NewDMap returns a new DMap client with the given options.
	NewDMap(name string, options ...DMapOption) (DMap, error)

	// NewPubSub returns a new PubSub client with the given options.
	NewPubSub(options ...PubSubOption) (*PubSub, error)

	// Stats returns stats.Stats with the given options.
	Stats(ctx context.Context, address string, options ...StatsOption) (stats.Stats, error)

	// Ping sends a ping message to an Olric node. Returns PONG if message is empty,
	// otherwise return a copy of the message as a bulk. This command is often used to test
	// if a connection is still alive, or to measure latency.
	Ping(ctx context.Context, address, message string) (string, error)

	// RoutingTable returns the latest version of the routing table.
	RoutingTable(ctx context.Context) (RoutingTable, error)

	// Members returns a thread-safe list of cluster members.
	Members(ctx context.Context) ([]Member, error)

	// RefreshMetadata fetches a list of available members and the latest routing
	// table version. It also closes stale clients, if there are any.
	RefreshMetadata(ctx context.Context) error

	// Close stops background routines and frees allocated resources.
	Close(ctx context.Context) error
}

Client is an interface that denotes an Olric client.

type ClusterClient

type ClusterClient struct {
	// contains filtered or unexported fields
}

func NewClusterClient

func NewClusterClient(addresses []string, options ...ClusterClientOption) (*ClusterClient, error)

NewClusterClient creates a new Client instance. It needs one node address at least to discover the whole cluster.

func (*ClusterClient) Close

func (cl *ClusterClient) Close(ctx context.Context) error

Close stops background routines and frees allocated resources.

func (*ClusterClient) Members

func (cl *ClusterClient) Members(ctx context.Context) ([]Member, error)

Members returns a thread-safe list of cluster members.

func (*ClusterClient) NewDMap

func (cl *ClusterClient) NewDMap(name string, options ...DMapOption) (DMap, error)

NewDMap returns a new DMap client with the given options.

func (*ClusterClient) NewPubSub

func (cl *ClusterClient) NewPubSub(options ...PubSubOption) (*PubSub, error)

NewPubSub returns a new PubSub client with the given options.

func (*ClusterClient) Ping

func (cl *ClusterClient) Ping(ctx context.Context, addr, message string) (string, error)

Ping sends a ping message to an Olric node. Returns PONG if message is empty, otherwise return a copy of the message as a bulk. This command is often used to test if a connection is still alive, or to measure latency.

func (*ClusterClient) RefreshMetadata

func (cl *ClusterClient) RefreshMetadata(ctx context.Context) error

RefreshMetadata fetches a list of available members and the latest routing table version. It also closes stale clients, if there are any.

func (*ClusterClient) RoutingTable

func (cl *ClusterClient) RoutingTable(ctx context.Context) (RoutingTable, error)

RoutingTable returns the latest version of the routing table.

func (*ClusterClient) Stats

func (cl *ClusterClient) Stats(ctx context.Context, address string, options ...StatsOption) (stats.Stats, error)

Stats returns stats.Stats with the given options.

type ClusterClientOption

type ClusterClientOption func(c *clusterClientConfig)

func WithConfig

func WithConfig(c *config.Client) ClusterClientOption

func WithHasher

func WithHasher(h hasher.Hasher) ClusterClientOption

func WithLogger

func WithLogger(l *log.Logger) ClusterClientOption

func WithRoutingTableFetchInterval

func WithRoutingTableFetchInterval(interval time.Duration) ClusterClientOption

WithRoutingTableFetchInterval is used to set a custom value to routingTableFetchInterval. ClusterClient implementation retrieves the routing table from the cluster to route requests to the partition owners.

type ClusterDMap

type ClusterDMap struct {
	// contains filtered or unexported fields
}

ClusterDMap implements a client for DMaps.

func (*ClusterDMap) Decr

func (dm *ClusterDMap) Decr(ctx context.Context, key string, delta int) (int, error)

Decr atomically decrements the key by delta. The return value is the new value after being decremented or an error.

func (*ClusterDMap) Delete

func (dm *ClusterDMap) Delete(ctx context.Context, keys ...string) (int, error)

Delete deletes values for the given keys. Delete will not return error if key doesn't exist. It's thread-safe. It is safe to modify the contents of the argument after Delete returns.

func (*ClusterDMap) Destroy

func (dm *ClusterDMap) Destroy(ctx context.Context) error

Destroy flushes the given DMap on the cluster. You should know that there is no global lock on DMaps. So if you call Put/PutEx and Destroy methods concurrently on the cluster, Put call may set new values to the DMap.

func (*ClusterDMap) Expire

func (dm *ClusterDMap) Expire(ctx context.Context, key string, timeout time.Duration) error

Expire updates the expiry for the given key. It returns ErrKeyNotFound if the DB does not contain the key. It's thread-safe.

func (*ClusterDMap) Get

func (dm *ClusterDMap) Get(ctx context.Context, key string) (*GetResponse, error)

Get gets the value for the given key. It returns ErrKeyNotFound if the DB does not contain the key. It's thread-safe. It is safe to modify the contents of the returned value. See GetResponse for the details.

func (*ClusterDMap) GetPut

func (dm *ClusterDMap) GetPut(ctx context.Context, key string, value interface{}) (*GetResponse, error)

GetPut atomically sets the key to value and returns the old value stored at key. It returns nil if there is no previous value.

func (*ClusterDMap) Incr

func (dm *ClusterDMap) Incr(ctx context.Context, key string, delta int) (int, error)

Incr atomically increments the key by delta. The return value is the new value after being incremented or an error.

func (*ClusterDMap) IncrByFloat

func (dm *ClusterDMap) IncrByFloat(ctx context.Context, key string, delta float64) (float64, error)

IncrByFloat atomically increments the key by delta. The return value is the new value after being incremented or an error.

func (*ClusterDMap) Lock

func (dm *ClusterDMap) Lock(ctx context.Context, key string, deadline time.Duration) (LockContext, error)

Lock sets a lock for the given key. Acquired lock is only for the key in this dmap.

It returns immediately if it acquires the lock for the given key. Otherwise, it waits until deadline.

You should know that the locks are approximate, and only to be used for non-critical purposes.

func (*ClusterDMap) LockWithTimeout

func (dm *ClusterDMap) LockWithTimeout(ctx context.Context, key string, timeout, deadline time.Duration) (LockContext, error)

LockWithTimeout sets a lock for the given key. If the lock is still unreleased the end of given period of time, it automatically releases the lock. Acquired lock is only for the key in this DMap.

It returns immediately if it acquires the lock for the given key. Otherwise, it waits until deadline.

You should know that the locks are approximate, and only to be used for non-critical purposes.

func (*ClusterDMap) Name

func (dm *ClusterDMap) Name() string

Name exposes name of the DMap.

func (*ClusterDMap) Pipeline

func (dm *ClusterDMap) Pipeline(opts ...PipelineOption) (*DMapPipeline, error)

Pipeline is a mechanism to realise Redis Pipeline technique.

Pipelining is a technique to extremely speed up processing by packing operations to batches, send them at once to Redis and read a replies in a singe step. See https://redis.io/topics/pipelining

Pay attention, that Pipeline is not a transaction, so you can get unexpected results in case of big pipelines and small read/write timeouts. Redis client has retransmission logic in case of timeouts, pipeline can be retransmitted and commands can be executed more than once.

func (*ClusterDMap) Put

func (dm *ClusterDMap) Put(ctx context.Context, key string, value interface{}, options ...PutOption) error

Put sets the value for the given key. It overwrites any previous value for that key, and it's thread-safe. The key has to be a string. value type is arbitrary. It is safe to modify the contents of the arguments after Put returns but not before.

func (*ClusterDMap) Scan

func (dm *ClusterDMap) Scan(ctx context.Context, options ...ScanOption) (Iterator, error)

Scan returns an iterator to loop over the keys.

Available scan options:

* Count * Match

type ClusterIterator

type ClusterIterator struct {
	// contains filtered or unexported fields
}

ClusterIterator implements distributed query on DMaps.

func (*ClusterIterator) Close

func (i *ClusterIterator) Close()

Close stops the iteration and releases allocated resources.

func (*ClusterIterator) Key

func (i *ClusterIterator) Key() string

Key returns a key name from the distributed map.

func (*ClusterIterator) Next

func (i *ClusterIterator) Next() bool

Next returns true if there is more key in the iterator implementation. Otherwise, it returns false

type ClusterLockContext

type ClusterLockContext struct {
	// contains filtered or unexported fields
}

func (*ClusterLockContext) Lease

func (c *ClusterLockContext) Lease(ctx context.Context, duration time.Duration) error

func (*ClusterLockContext) Unlock

func (c *ClusterLockContext) Unlock(ctx context.Context) error

type DMap

type DMap interface {
	// Name exposes name of the DMap.
	Name() string

	// Put sets the value for the given key. It overwrites any previous value for
	// that key, and it's thread-safe. The key has to be a string. value type is arbitrary.
	// It is safe to modify the contents of the arguments after Put returns but not before.
	Put(ctx context.Context, key string, value interface{}, options ...PutOption) error

	// Get gets the value for the given key. It returns ErrKeyNotFound if the DB
	// does not contain the key. It's thread-safe. It is safe to modify the contents
	// of the returned value. See GetResponse for the details.
	Get(ctx context.Context, key string) (*GetResponse, error)

	// Delete deletes values for the given keys. Delete will not return error
	// if key doesn't exist. It's thread-safe. It is safe to modify the contents
	// of the argument after Delete returns.
	Delete(ctx context.Context, keys ...string) (int, error)

	// Incr atomically increments the key by delta. The return value is the new value
	// after being incremented or an error.
	Incr(ctx context.Context, key string, delta int) (int, error)

	// Decr atomically decrements the key by delta. The return value is the new value
	// after being decremented or an error.
	Decr(ctx context.Context, key string, delta int) (int, error)

	// GetPut atomically sets the key to value and returns the old value stored at key. It returns nil if there is no
	// previous value.
	GetPut(ctx context.Context, key string, value interface{}) (*GetResponse, error)

	// IncrByFloat atomically increments the key by delta. The return value is the new value
	// after being incremented or an error.
	IncrByFloat(ctx context.Context, key string, delta float64) (float64, error)

	// Expire updates the expiry for the given key. It returns ErrKeyNotFound if
	// the DB does not contain the key. It's thread-safe.
	Expire(ctx context.Context, key string, timeout time.Duration) error

	// Lock sets a lock for the given key. Acquired lock is only for the key in
	// this dmap.
	//
	// It returns immediately if it acquires the lock for the given key. Otherwise,
	// it waits until deadline.
	//
	// You should know that the locks are approximate, and only to be used for
	// non-critical purposes.
	Lock(ctx context.Context, key string, deadline time.Duration) (LockContext, error)

	// LockWithTimeout sets a lock for the given key. If the lock is still unreleased
	// the end of given period of time,
	// it automatically releases the lock. Acquired lock is only for the key in
	// this dmap.
	//
	// It returns immediately if it acquires the lock for the given key. Otherwise,
	// it waits until deadline.
	//
	// You should know that the locks are approximate, and only to be used for
	// non-critical purposes.
	LockWithTimeout(ctx context.Context, key string, timeout, deadline time.Duration) (LockContext, error)

	// Scan returns an iterator to loop over the keys.
	//
	// Available scan options:
	//
	// * Count
	// * Match
	Scan(ctx context.Context, options ...ScanOption) (Iterator, error)

	// Destroy flushes the given DMap on the cluster. You should know that there
	// is no global lock on DMaps. So if you call Put/PutEx and Destroy methods
	// concurrently on the cluster, Put call may set new values to the DMap.
	Destroy(ctx context.Context) error

	// Pipeline is a mechanism to realise Redis Pipeline technique.
	//
	// Pipelining is a technique to extremely speed up processing by packing
	// operations to batches, send them at once to Redis and read a replies in a
	// singe step.
	// See https://redis.io/topics/pipelining
	//
	// Pay attention, that Pipeline is not a transaction, so you can get unexpected
	// results in case of big pipelines and small read/write timeouts.
	// Redis client has retransmission logic in case of timeouts, pipeline
	// can be retransmitted and commands can be executed more than once.
	Pipeline(opts ...PipelineOption) (*DMapPipeline, error)
}

DMap defines methods to access and manipulate distributed maps.

type DMapOption

type DMapOption func(*dmapConfig)

DMapOption is a function for defining options to control behavior of distributed map instances.

func StorageEntryImplementation

func StorageEntryImplementation(e func() storage.Entry) DMapOption

StorageEntryImplementation sets and encoder/decoder implementation for your choice of storage engine.

type DMapPipeline

type DMapPipeline struct {
	// contains filtered or unexported fields
}

DMapPipeline implements a pipeline for the following methods of the DMap API:

* Put * Get * Delete * Incr * Decr * GetPut * IncrByFloat

DMapPipeline enables batch operations on DMap data.

func (*DMapPipeline) Close

func (dp *DMapPipeline) Close()

Close closes the pipeline and frees the allocated resources. You shouldn't try to reuse a closed pipeline.

func (*DMapPipeline) Decr

func (dp *DMapPipeline) Decr(ctx context.Context, key string, delta int) (*FutureDecr, error)

Decr queues a Decr command. The parameters are identical to the DMap.Decr, but it returns FutureDecr to read the batched response.

func (*DMapPipeline) Delete

func (dp *DMapPipeline) Delete(ctx context.Context, key string) *FutureDelete

Delete queues a Delete command. The parameters are identical to the DMap.Delete, but it returns FutureDelete to read the batched response.

func (*DMapPipeline) Discard

func (dp *DMapPipeline) Discard() error

Discard discards the pipelined commands and resets all internal states. A pipeline can be reused after calling Discard.

func (*DMapPipeline) Exec

func (dp *DMapPipeline) Exec(ctx context.Context) error

Exec executes all queued commands using one client-server roundtrip per partition.

func (*DMapPipeline) Expire

func (dp *DMapPipeline) Expire(ctx context.Context, key string, timeout time.Duration) (*FutureExpire, error)

Expire queues an Expire command. The parameters are identical to the DMap.Expire, but it returns FutureExpire to read the batched response.

func (*DMapPipeline) Get

func (dp *DMapPipeline) Get(ctx context.Context, key string) *FutureGet

Get queues a Get command. The parameters are identical to the DMap.Get, but it returns FutureGet to read the batched response.

func (*DMapPipeline) GetPut

func (dp *DMapPipeline) GetPut(ctx context.Context, key string, value interface{}) (*FutureGetPut, error)

GetPut queues a GetPut command. The parameters are identical to the DMap.GetPut, but it returns FutureGetPut to read the batched response.

func (*DMapPipeline) Incr

func (dp *DMapPipeline) Incr(ctx context.Context, key string, delta int) (*FutureIncr, error)

Incr queues an Incr command. The parameters are identical to the DMap.Incr, but it returns FutureIncr to read the batched response.

func (*DMapPipeline) IncrByFloat

func (dp *DMapPipeline) IncrByFloat(ctx context.Context, key string, delta float64) (*FutureIncrByFloat, error)

IncrByFloat queues an IncrByFloat command. The parameters are identical to the DMap.IncrByFloat, but it returns FutureIncrByFloat to read the batched response.

func (*DMapPipeline) Put

func (dp *DMapPipeline) Put(ctx context.Context, key string, value interface{}, options ...PutOption) (*FuturePut, error)

Put queues a Put command. The parameters are identical to the DMap.Put, but it returns FuturePut to read the batched response.

type EmbeddedClient

type EmbeddedClient struct {
	// contains filtered or unexported fields
}

EmbeddedClient is an Olric client implementation for embedded-member scenario.

func (*EmbeddedClient) Close

func (e *EmbeddedClient) Close(_ context.Context) error

Close stops background routines and frees allocated resources.

func (*EmbeddedClient) Members

func (e *EmbeddedClient) Members(_ context.Context) ([]Member, error)

Members returns a thread-safe list of cluster members.

func (*EmbeddedClient) NewDMap

func (e *EmbeddedClient) NewDMap(name string, options ...DMapOption) (DMap, error)

func (*EmbeddedClient) NewPubSub

func (e *EmbeddedClient) NewPubSub(options ...PubSubOption) (*PubSub, error)

NewPubSub returns a new PubSub client with the given options.

func (*EmbeddedClient) Ping

func (e *EmbeddedClient) Ping(ctx context.Context, addr, message string) (string, error)

Ping sends a ping message to an Olric node. Returns PONG if message is empty, otherwise return a copy of the message as a bulk. This command is often used to test if a connection is still alive, or to measure latency.

func (*EmbeddedClient) RefreshMetadata

func (e *EmbeddedClient) RefreshMetadata(_ context.Context) error

RefreshMetadata fetches a list of available members and the latest routing table version. It also closes stale clients, if there are any. EmbeddedClient has this method to implement the Client interface. It doesn't need to refresh metadata manually.

func (*EmbeddedClient) RoutingTable

func (e *EmbeddedClient) RoutingTable(ctx context.Context) (RoutingTable, error)

RoutingTable returns the latest version of the routing table.

func (*EmbeddedClient) Stats

func (e *EmbeddedClient) Stats(ctx context.Context, address string, options ...StatsOption) (stats.Stats, error)

Stats exposes some useful metrics to monitor an Olric node.

type EmbeddedDMap

type EmbeddedDMap struct {
	// contains filtered or unexported fields
}

EmbeddedDMap is an DMap client implementation for embedded-member scenario.

func (*EmbeddedDMap) Decr

func (dm *EmbeddedDMap) Decr(ctx context.Context, key string, delta int) (int, error)

Decr atomically decrements the key by delta. The return value is the new value after being decremented or an error.

func (*EmbeddedDMap) Delete

func (dm *EmbeddedDMap) Delete(ctx context.Context, keys ...string) (int, error)

Delete deletes values for the given keys. Delete will not return error if key doesn't exist. It's thread-safe. It is safe to modify the contents of the argument after Delete returns.

func (*EmbeddedDMap) Destroy

func (dm *EmbeddedDMap) Destroy(ctx context.Context) error

Destroy flushes the given DMap on the cluster. You should know that there is no global lock on DMaps. So if you call Put/PutEx and Destroy methods concurrently on the cluster, Put call may set new values to the DMap.

func (*EmbeddedDMap) Expire

func (dm *EmbeddedDMap) Expire(ctx context.Context, key string, timeout time.Duration) error

Expire updates the expiry for the given key. It returns ErrKeyNotFound if the DB does not contain the key. It's thread-safe.

func (*EmbeddedDMap) Get

func (dm *EmbeddedDMap) Get(ctx context.Context, key string) (*GetResponse, error)

Get gets the value for the given key. It returns ErrKeyNotFound if the DB does not contain the key. It's thread-safe. It is safe to modify the contents of the returned value. See GetResponse for the details.

func (*EmbeddedDMap) GetPut

func (dm *EmbeddedDMap) GetPut(ctx context.Context, key string, value interface{}) (*GetResponse, error)

GetPut atomically sets the key to value and returns the old value stored at key. It returns nil if there is no previous value.

func (*EmbeddedDMap) Incr

func (dm *EmbeddedDMap) Incr(ctx context.Context, key string, delta int) (int, error)

Incr atomically increments the key by delta. The return value is the new value after being incremented or an error.

func (*EmbeddedDMap) IncrByFloat

func (dm *EmbeddedDMap) IncrByFloat(ctx context.Context, key string, delta float64) (float64, error)

IncrByFloat atomically increments the key by delta. The return value is the new value after being incremented or an error.

func (*EmbeddedDMap) Lock

func (dm *EmbeddedDMap) Lock(ctx context.Context, key string, deadline time.Duration) (LockContext, error)

Lock sets a lock for the given key. Acquired lock is only for the key in this dmap.

It returns immediately if it acquires the lock for the given key. Otherwise, it waits until deadline.

You should know that the locks are approximate, and only to be used for non-critical purposes.

func (*EmbeddedDMap) LockWithTimeout

func (dm *EmbeddedDMap) LockWithTimeout(ctx context.Context, key string, timeout, deadline time.Duration) (LockContext, error)

LockWithTimeout sets a lock for the given key. If the lock is still unreleased the end of given period of time, it automatically releases the lock. Acquired lock is only for the key in this dmap.

It returns immediately if it acquires the lock for the given key. Otherwise, it waits until deadline.

You should know that the locks are approximate, and only to be used for non-critical purposes.

func (*EmbeddedDMap) Name

func (dm *EmbeddedDMap) Name() string

Name exposes name of the DMap.

func (*EmbeddedDMap) Pipeline

func (dm *EmbeddedDMap) Pipeline(opts ...PipelineOption) (*DMapPipeline, error)

Pipeline is a mechanism to realise Redis Pipeline technique.

Pipelining is a technique to extremely speed up processing by packing operations to batches, send them at once to Redis and read a replies in a singe step. See https://redis.io/topics/pipelining

Pay attention, that Pipeline is not a transaction, so you can get unexpected results in case of big pipelines and small read/write timeouts. Redis client has retransmission logic in case of timeouts, pipeline can be retransmitted and commands can be executed more than once.

func (*EmbeddedDMap) Put

func (dm *EmbeddedDMap) Put(ctx context.Context, key string, value interface{}, options ...PutOption) error

Put sets the value for the given key. It overwrites any previous value for that key, and it's thread-safe. The key has to be a string. value type is arbitrary. It is safe to modify the contents of the arguments after Put returns but not before.

func (*EmbeddedDMap) Scan

func (dm *EmbeddedDMap) Scan(ctx context.Context, options ...ScanOption) (Iterator, error)

Scan returns an iterator to loop over the keys.

Available scan options:

* Count * Match

type EmbeddedIterator

type EmbeddedIterator struct {
	// contains filtered or unexported fields
}

EmbeddedIterator implements distributed query on DMaps.

func (*EmbeddedIterator) Close

func (e *EmbeddedIterator) Close()

Close stops the iteration and releases allocated resources.

func (*EmbeddedIterator) Key

func (e *EmbeddedIterator) Key() string

Key returns a key name from the distributed map.

func (*EmbeddedIterator) Next

func (e *EmbeddedIterator) Next() bool

Next returns true if there is more key in the iterator implementation. Otherwise, it returns false.

type EmbeddedLockContext

type EmbeddedLockContext struct {
	// contains filtered or unexported fields
}

EmbeddedLockContext is returned by Lock and LockWithTimeout methods. It should be stored in a proper way to release the lock.

func (*EmbeddedLockContext) Lease

func (l *EmbeddedLockContext) Lease(ctx context.Context, duration time.Duration) error

Lease takes the duration to update the expiry for the given Lock.

func (*EmbeddedLockContext) Unlock

func (l *EmbeddedLockContext) Unlock(ctx context.Context) error

Unlock releases the lock.

type FutureDecr

type FutureDecr struct {
	// contains filtered or unexported fields
}

FutureDecr is used to read the result of a pipelined Decr command.

func (*FutureDecr) Result

func (f *FutureDecr) Result() (int, error)

Result returns a response for the pipelined Decr command.

type FutureDelete

type FutureDelete struct {
	// contains filtered or unexported fields
}

FutureDelete is used to read the result of a pipelined Delete command.

func (*FutureDelete) Result

func (f *FutureDelete) Result() (int, error)

Result returns a response for the pipelined Delete command.

type FutureExpire

type FutureExpire struct {
	// contains filtered or unexported fields
}

FutureExpire is used to read the result of a pipelined Expire command.

func (*FutureExpire) Result

func (f *FutureExpire) Result() error

Result returns a response for the pipelined Expire command.

type FutureGet

type FutureGet struct {
	// contains filtered or unexported fields
}

FutureGet is used to read result of a pipelined Get command.

func (*FutureGet) Result

func (f *FutureGet) Result() (*GetResponse, error)

Result returns a response for the pipelined Get command.

type FutureGetPut

type FutureGetPut struct {
	// contains filtered or unexported fields
}

FutureGetPut is used to read the result of a pipelined GetPut command.

func (*FutureGetPut) Result

func (f *FutureGetPut) Result() (*GetResponse, error)

Result returns a response for the pipelined GetPut command.

type FutureIncr

type FutureIncr struct {
	// contains filtered or unexported fields
}

FutureIncr is used to read the result of a pipelined Incr command.

func (*FutureIncr) Result

func (f *FutureIncr) Result() (int, error)

Result returns a response for the pipelined Incr command.

type FutureIncrByFloat

type FutureIncrByFloat struct {
	// contains filtered or unexported fields
}

FutureIncrByFloat is used to read the result of a pipelined IncrByFloat command.

func (*FutureIncrByFloat) Result

func (f *FutureIncrByFloat) Result() (float64, error)

Result returns a response for the pipelined IncrByFloat command.

type FuturePut

type FuturePut struct {
	// contains filtered or unexported fields
}

FuturePut is used to read the result of a pipelined Put command.

func (*FuturePut) Result

func (f *FuturePut) Result() error

Result returns a response for the pipelined Put command.

type GetResponse

type GetResponse struct {
	// contains filtered or unexported fields
}

func (*GetResponse) Bool

func (g *GetResponse) Bool() (bool, error)

func (*GetResponse) Byte

func (g *GetResponse) Byte() ([]byte, error)

func (*GetResponse) Duration

func (g *GetResponse) Duration() (time.Duration, error)

func (*GetResponse) Float32

func (g *GetResponse) Float32() (float32, error)

func (*GetResponse) Float64

func (g *GetResponse) Float64() (float64, error)

func (*GetResponse) Int

func (g *GetResponse) Int() (int, error)

func (*GetResponse) Int16

func (g *GetResponse) Int16() (int16, error)

func (*GetResponse) Int32

func (g *GetResponse) Int32() (int32, error)

func (*GetResponse) Int64

func (g *GetResponse) Int64() (int64, error)

func (*GetResponse) Int8

func (g *GetResponse) Int8() (int8, error)

func (*GetResponse) Scan

func (g *GetResponse) Scan(v interface{}) error

func (*GetResponse) String

func (g *GetResponse) String() (string, error)

func (*GetResponse) TTL

func (g *GetResponse) TTL() int64

func (*GetResponse) Time

func (g *GetResponse) Time() (time.Time, error)

func (*GetResponse) Timestamp

func (g *GetResponse) Timestamp() int64

func (*GetResponse) Uint

func (g *GetResponse) Uint() (uint, error)

func (*GetResponse) Uint16

func (g *GetResponse) Uint16() (uint16, error)

func (*GetResponse) Uint32

func (g *GetResponse) Uint32() (uint32, error)

func (*GetResponse) Uint64

func (g *GetResponse) Uint64() (uint64, error)

func (*GetResponse) Uint8

func (g *GetResponse) Uint8() (uint8, error)

type Iterator

type Iterator interface {
	// Next returns true if there is more key in the iterator implementation.
	// Otherwise, it returns false.
	Next() bool

	// Key returns a key name from the distributed map.
	Key() string

	// Close stops the iteration and releases allocated resources.
	Close()
}

Iterator defines an interface to implement iterators on the distributed maps.

type LockContext

type LockContext interface {
	// Unlock releases an acquired lock for the given key. It returns ErrNoSuchLock
	// if there is no lock for the given key.
	Unlock(ctx context.Context) error

	// Lease sets or updates the timeout of the acquired lock for the given key.
	// It returns ErrNoSuchLock if there is no lock for the given key.
	Lease(ctx context.Context, duration time.Duration) error
}

LockContext interface defines methods to manage locks on distributed maps.

type Member

type Member struct {
	// Member name in the cluster. It's also host:port of the node.
	Name string

	// ID of the Member in the cluster. Hash of Name and Birthdate of the member
	ID uint64

	// Birthdate of the member in nanoseconds.
	Birthdate int64

	// Role of the member in the cluster. There is only one coordinator member
	// in a healthy cluster.
	Coordinator bool
}

Member denotes a member of the Olric cluster.

type Olric

type Olric struct {
	// contains filtered or unexported fields
}

Olric implements a distributed cache and in-memory key/value data store. It can be used both as an embedded Go library and as a language-independent service.

func New

func New(config *config.Config) (*Olric, error)

New creates a new Olric instance, otherwise returns an error.

func (*Olric) NewEmbeddedClient

func (db *Olric) NewEmbeddedClient() *EmbeddedClient

NewEmbeddedClient creates and returns a new EmbeddedClient instance.

func (*Olric) Shutdown

func (db *Olric) Shutdown(ctx context.Context) error

Shutdown stops background servers and leaves the cluster.

func (*Olric) Start

func (db *Olric) Start() error

Start starts background servers and joins the cluster. You still must call Shutdown method if Start function returns an early error.

type PipelineOption

type PipelineOption func(pipeline *DMapPipeline)

PipelineOption is a function for defining options to control behavior of the Pipeline command.

func PipelineConcurrency

func PipelineConcurrency(concurrency int) PipelineOption

PipelineConcurrency is a PipelineOption controlling the number of concurrent goroutines.

type PubSub

type PubSub struct {
	// contains filtered or unexported fields
}

func (*PubSub) PSubscribe

func (ps *PubSub) PSubscribe(ctx context.Context, channels ...string) *redis.PubSub

func (*PubSub) PubSubChannels

func (ps *PubSub) PubSubChannels(ctx context.Context, pattern string) ([]string, error)

func (*PubSub) PubSubNumPat

func (ps *PubSub) PubSubNumPat(ctx context.Context) (int64, error)

func (*PubSub) PubSubNumSub

func (ps *PubSub) PubSubNumSub(ctx context.Context, channels ...string) (map[string]int64, error)

func (*PubSub) Publish

func (ps *PubSub) Publish(ctx context.Context, channel string, message interface{}) (int64, error)

func (*PubSub) Subscribe

func (ps *PubSub) Subscribe(ctx context.Context, channels ...string) *redis.PubSub

type PubSubOption

type PubSubOption func(option *pubsubConfig)

PubSubOption is a function for defining options to control behavior of the Publish-Subscribe service.

func ToAddress

func ToAddress(addr string) PubSubOption

ToAddress is a PubSubOption for using a specific cluster member to publish messages to a channel.

type PutOption

type PutOption func(*dmap.PutConfig)

PutOption is a function for define options to control behavior of the Put command.

func EX

func EX(ex time.Duration) PutOption

EX sets the specified expire time, in seconds.

func EXAT

func EXAT(exat time.Duration) PutOption

EXAT sets the specified Unix time at which the key will expire, in seconds.

func NX

func NX() PutOption

NX only sets the key if it does not already exist.

func PX

func PX(px time.Duration) PutOption

PX sets the specified expire time, in milliseconds.

func PXAT

func PXAT(pxat time.Duration) PutOption

PXAT sets the specified Unix time at which the key will expire, in milliseconds.

func XX

func XX() PutOption

XX only sets the key if it already exists.

type Route

type Route struct {
	PrimaryOwners []string
	ReplicaOwners []string
}

type RoutingTable

type RoutingTable map[uint64]Route

type ScanOption

type ScanOption func(*dmap.ScanConfig)

ScanOption is a function for defining options to control behavior of the SCAN command.

func Count

func Count(c int) ScanOption

Count is the user specified the amount of work that should be done at every call in order to retrieve elements from the distributed map. This is just a hint for the implementation, however generally speaking this is what you could expect most of the time from the implementation. The default value is 10.

func Match

func Match(s string) ScanOption

Match is used for using regular expressions on keys. See https://pkg.go.dev/regexp

type StatsOption

type StatsOption func(*statsConfig)

StatsOption is a function for defining options to control behavior of the STATS command.

func CollectRuntime

func CollectRuntime() StatsOption

CollectRuntime is a StatsOption for collecting Go runtime statistics from a cluster member.

Directories

Path Synopsis
internal
discovery
Package discovery provides a basic memberlist integration.
Package discovery provides a basic memberlist integration.
kvstore
Package kvstore implements a GC friendly in-memory storage engine by using built-in maps and byte slices.
Package kvstore implements a GC friendly in-memory storage engine by using built-in maps and byte slices.
locker
Package locker provides a mechanism for creating finer-grained locking to help free up more global locks to handle other tasks.
Package locker provides a mechanism for creating finer-grained locking to help free up more global locks to handle other tasks.
pkg
flog
Package flog is a simple wrapper around Golang's log package which adds verbosity support.
Package flog is a simple wrapper around Golang's log package which adds verbosity support.
service_discovery
Package service_discovery provides ServiceDiscovery interface for plugins
Package service_discovery provides ServiceDiscovery interface for plugins
Package stats exposes internal data structures for Stat command
Package stats exposes internal data structures for Stat command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL