dmm

package module
v0.0.0-...-f71c5c6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 23, 2023 License: BSD-2-Clause Imports: 8 Imported by: 0

README

Durable Materialization Machine

build workflow PkgGoDev Chat

A durable execution framework, inspired by kubernetes

imagine k8s reconciliation controllers but as a generic framework. It is not a separate database layer or api server. Instead it is built to work alongside existing database persistence and adds nats.io worker tokens. See the blog for the context.

The term "durable execution" isnt well established yet. The original inspiration for this code is from Kubernetes "Controller Pattern". Ubers Cadence (forked by Temporal) is the first example I know of infrastructure ideas leaking into application code. It calls itself a "highly available orchestration engine" but i think its a much deeper idea than that.

quickstart

the example uses bun with postgres. this isn't required for dmm.

docker run -d -e POSTGRES_HOST_AUTH_METHOD=trust --net host postgres
nats-server -js

git clone https://github.com/kraudcloud/dmm.git
cd dmm/example
go build
./example worker # run a worker
./example create # create an object picked up by the workers
./example debug  # shows how to debug the workflows

materializers

A materializer is an idempotent function that the controller calls until it succeeds. In terms of k8s this is "reconciliation". The input to the function is cache object, and the function is supposed to ensure the cache is up to date with reality. This may require changing the cache or changing reality.

type MusicPlayer struct {
	bun.BaseModel `bun:"music_players"`
	ID uuid.UUID
	DesiredSong string
}

type MusicMaterializer struct {
	MusicPlayerID uuid.UUID
	CurrentSong strong
}

func (cache *MusicMaterializer) Materialize(ctx dmm.Context) error {

	player := getMusicPlayer(self.MusicPlayerID);

	currentSong := http.Get("http://player/current_song")

	// materialize the database object into the real world
	if currentSong != player.DesiredSong {
		http.Post("http://player/play")
	}

	if currentSong != cache.CurrentSong {
		// update the cache. this will become relevant later.
		cache.CurrentSong = currentSong
	}

    return nil
}

no events, just state

If this sounds like declarative programming, you're thinking too hard. Look, it's just fancy retrying. It's also not "reactive" or event based programming, since we dont react to ephemeral events, instead we only "react" to persisted state. Events play a role in changing the cache, but events are not durable. they can and will be lost. Instead the real world thing should report its entire absolute state, which we only put into the cache for the materializer to consider.

In the previous example code we could PULL the current state over http, here we rely on it being pushed continuously

type MusicPlayer struct {
	bun.BaseModel `bun:"music_players"`
	ID uuid.UUID
	DesiredSong string
}

type MusicMaterializer struct {
	CurrentSong strong
}

func Observe(ctx dmm.Context, report Report) {

	cache := ctx.Matter(report.PlayerID)

	if report.CurrentSong != cache.CurrentSong {

		cache.CurrentSong = report.CurrentSong

		// replace the dmm cache and trigger materialization
		ctx.Notify(cache)
	}
}

func (cache *MusicMaterializer) Materialize(ctx dmm.Context) error {

	// developers need to think of graceful degradation for when cache is zeroed
	// in this case this means the `!=` condition triggers and we send a maybe unnessesary `play` call
	// this is accepable here because we assume no other way to get the current player state

	if cache.CurrentSong != player.DesiredSong {

		http.Post("http://player/play")

		// avoid calling post immediately again by setting the cache here
		// it'll be overriden by the next observation report

		cache.CurrentSong = currentSong
	}

    return nil
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Bucket

type Bucket struct {
	// contains filtered or unexported fields
}

func Open

func Open(namespace string) (*Bucket, error)

func (*Bucket) Close

func (ctx *Bucket) Close()

func (*Bucket) Notify

func (ctx *Bucket) Notify(m Materializer) error

func (*Bucket) Register

func (ctx *Bucket) Register(t string, tp Materializer) (*WorkStream, error)

type Context

type Context struct {
	context.Context
	// contains filtered or unexported fields
}

func (*Context) Notify

func (ctx *Context) Notify(m Materializer) error

type Materializer

type Materializer interface {
	Materialize(Context) error
	Identity() string
}

type WorkStream

type WorkStream struct {
	// contains filtered or unexported fields
}

func (*WorkStream) Work

func (mat *WorkStream) Work()

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL