distr_ep

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 14, 2023 License: Apache-2.0 Imports: 10 Imported by: 0

README

go-distr-ep

This project provides a distributed event processor framework for golang. This framework would be useful for cases where you need to process events sequentially, for a given key.

go-distr-ep uses redis for persisting events (fault-tolerance) and for distributing them among all the participating processors.

Features

  • Events are processed in order, for a given key
  • Events for a given key are processed sequentially (i.e. not concurrently)
  • Events can be scheduled to be executed after a delay
  • Fault-tolerance (Events are persisted to an external cache, redis)

Dependencies

Example

type TestCallbackImpl struct {
	callbackName string
}

func (t *TestCallbackImpl) StartProcessing(key string) {
	// start key processing - initialize things like cache, etc that you need
}

func (t *TestCallbackImpl) ProcessEvent(key string, val interface{}) bool {
	// process event
	return false
	// return true when processing is completed for this key
	// return true
}

func main() {
	client := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
	defer client.Close()

	callbackImpl := &TestCallbackImpl{callbackName: name}
	dep := &distr_ep.DistributedEventProcessor{
		RedisClient: client,
		Namespace:   "test1",
		// LockTTL:     time.Second * 1000,
		CleanupDur:  time.Second * 1,
		Callback:    callbackImpl,
		LogLevel:    log.InfoLevel,
		// AtLeastOnce: true,
		Scheduling:  true,
	}
	if err := dep.Init(); err != nil {
		log.Errorf("error initializing.. %v", err)
		return
	}

	// Add an event to processing
	// Start: true to indicate this event is a trigger event for processing 
	evt := &distr_ep.DistrEvent{Key: "key1", Val: val, Start: true}
	dep.AddEvent(evt))
	// Schedule an event for processing after 5 seconds
	dep.ScheduleEvent(evt, time.Second * 5)
}

Local Test

You can run a local redis cluster using grokzen/redis-cluster

docker run -d --name redis-cluster -e "IP=0.0.0.0" -p 7000-7005:7000-7005 grokzen/redis-cluster:latest

Note: On MacOS, port 7000 is alredy bound by AirPlay receiver

Design

The Distributed Event Processor (DEP) framework's goals are:

  • Distributed: Events can be submitted by multiple clients
  • Sequential: Events should be processed in-order, for a given key
  • Resilient: Event processing should be resilient (fail-over)
  • Sticky: Process events for a given key by a given processor, within the prescribed TTL

Components

DEP is designed to be a simple framework with minimal overhead. It uses Redis to store events and to manage distributed locks.

We use go-redis and redsync libraries for Redis client operations and distributed lock, respectively

Event Submission

When a new event is submitted for a given key

  • If this is a start event, the Key is added to list of PendingKeys
  • The event pushed to the end of a redis LIST specific to this Key
  • The TTL of the LIST key is renewed to the configured value

PendingKey Consumer

PendingKeys Consumer's job is to monitor for any pending keys that need to be processed. It runs on all the participating nodes/processors.

  • Move a Key from PendingKeys to a consumer specific LIST
  • Kick off the Key Processor for this Key
  • Remove the Key from the consumer specific LIST

Key Processor

Key processor attempts to acquire the lock for the specified key and if successful, processes the pending events

  • Attempt to acquire redis lock for this Key
  • If Key lock is NOT acquired: stops processing
  • If Key lock is acquired, adds this Key to the processor's list of active Keys
  • Start processing events from the Key's event LIST. It uses a blocking wait of EventPollTimeout (default 1 hour)
  • If there are no events after EventPollTimeout, stop the processor
  • For each event in the list (atmost-once sematics):
    • Pop the event
    • Invoke the Callback.ProcessEvent(key, val) as a go-routine
    • Rewew the redis Key lock, while the event is being processed
    • If ProcessEvent() returns true to indicate that this key is now completely processed, stop the processor
  • When processing stops for the Key:
    • Release the Key lock
    • Remove the Key from the processor's list of active keys

Key Lock

The redis key lock is the central piece to achieve sequential processing. It allows us to prevent concurrent execution for the same key. Only one event processor at a time can acquire the lock and thus process events for that key.

We use the open source library redsync for distributed locks

Client Monitor

Client Monitor is a background process that monitors any Clients that might have terminated midway.

  • Each Client does a periodic checkin, updating its TTL in a redis ZSET
  • If a Client is detected as inactive, all the Keys that it was processing shall be re-submitted to the Pending Keys list (which is de-queued by Pending Key Consumers)

Scheduled Events (Optional feature)

DEP also supports scheduled events. The Event Scheduler runs a periodic check for scheduled events. It also uses redis ZSET (like the Key Monitor) to monitor for events that are expriring.

Redis Keys

Following is a table of Redis keys used

Item Redis Key Remarks
Key Event List dep:{DEP_NS}:key-el:{KEY} Redis LIST to store events by Key
Key Lock dep:{DEP_NS}:key-lk:{KEY} Redis lock name buy Key
Pending Keys {dep:{DEP_NS}:pk}-pending Redis LIST for pending Keys that need to be processed
Client: Pending Keys Offload {dep:{DEP_NS}:pk}-ol:{CLIENT_ID} Redis LIST for offloaded pending Keys (Client specific)
Client: Active Keys dep:{DEP_NS}:pk-active:{CLIENT_ID} Redis List for Client's Active Keys
Client Monitor: Active Clients ZSet dep:{DEP_NS}:mon-zset Client Monitor: Redis ZSet. Clients update their TTL periodically
Client Monitor: Lock dep:{DEP_NS}:mon-zset:lk Client Monitor Lock. Required to run the monitoring job
Scheduled Events ZSet dep:{DEP_NS}:sch-zset Scheduler Events ZSet: Expiry is determined by Epoch
Scheduled Event Payload HSet dep:{DEP_NS}:sch-hset Scheduler Events Payload
Scheduler Lock dep:{DEP_NS}:sch-zset:lk Schduler Job Lock

Documentation

Index

Constants

View Source
const (
	LOCK_TTL       = time.Hour * 24
	LOCK_RETRY_DUR = time.Millisecond * 100
	CLEANUP_DUR    = time.Second * 10
	LIST_TTL       = time.Hour * 24
	SCHEDULE_DUR   = time.Second * 1
	EVT_POLL_TO    = time.Second * 3600 * 1
	// https://redis.io/docs/reference/cluster-spec/#hash-tags
	PK_HASH_PREFIX = "{dep:%s:pk-}"
)
View Source
const (
	REDIS_POS_LEFT  = "LEFT"
	REDIS_POS_RIGHT = "RIGHT"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type DistrEvent added in v0.2.0

type DistrEvent struct {
	Key   string
	Val   interface{}
	Start bool
}

type DistributedEventProcessor

type DistributedEventProcessor struct {
	// namespace
	Namespace string
	// redis connection
	RedisClient *redis.ClusterClient
	// key lock duration
	LockTTL time.Duration
	// cleanup delay
	CleanupDur time.Duration
	// Event callback
	Callback EventCallback
	// LogLevel
	LogLevel logrus.Level
	// EventProcessingMode - set to true if retry is required
	// default is atmost once
	AtLeastOnce bool
	// Scheduling enabled
	Scheduling bool
	// Event polling timeout. Time to wait for new events for a key
	// This should be less than LockTTL
	EventPollTimeout time.Duration
	// contains filtered or unexported fields
}

func (*DistributedEventProcessor) AddEvent

func (d *DistributedEventProcessor) AddEvent(e *DistrEvent) error

func (*DistributedEventProcessor) Init

func (d *DistributedEventProcessor) Init() error

func (*DistributedEventProcessor) ScheduleEvent added in v0.1.3

func (d *DistributedEventProcessor) ScheduleEvent(e *DistrEvent,
	delay time.Duration) error

func (*DistributedEventProcessor) Shutdown added in v0.2.0

func (d *DistributedEventProcessor) Shutdown()

type EventCallback

type EventCallback interface {
	// invoked when start processing
	StartProcessing(key string)
	// returns true if this key processing is completed
	ProcessEvent(key string, val interface{}) bool
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL