rangedbws

package
v0.7.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 30, 2021 License: BSD-3-Clause Imports: 15 Imported by: 0

Documentation

Overview

Example (StreamAggregateTypeEvents)
// Given
shortuuid.SetRand(100)
inMemoryStore := inmemorystore.New(inmemorystore.WithClock(sequentialclock.New()))
api, err := rangedbws.New(rangedbws.WithStore(inMemoryStore))
PrintError(err)
defer api.Stop()

server := httptest.NewServer(api)
defer server.Close()

serverAddress := strings.TrimPrefix(server.URL, "http://")
websocketUrl := fmt.Sprintf("ws://%s/events/thing,that", serverAddress)
socket, _, err := websocket.DefaultDialer.Dial(websocketUrl, nil)
PrintError(err)

var wg sync.WaitGroup
wg.Add(1)

go func() {
	defer Close(socket)

	for i := 0; i < 2; i++ {
		_, message, err := socket.ReadMessage()
		PrintError(err)
		fmt.Println(jsontools.PrettyJSON(message))
	}

	wg.Done()
}()

// When
ctx, done := context.WithTimeout(context.Background(), 5*time.Second)
defer done()
PrintError(IgnoreFirstNumber(inMemoryStore.Save(ctx,
	&rangedb.EventRecord{Event: rangedbtest.ThingWasDone{ID: "dce275e43137467b92c9f4eb6c9c77a3", Number: 100}},
)))
PrintError(IgnoreFirstNumber(inMemoryStore.Save(ctx,
	&rangedb.EventRecord{Event: rangedbtest.AnotherWasComplete{ID: "594c68cfa7944f9b94afc83505ff99e9"}},
)))
PrintError(IgnoreFirstNumber(inMemoryStore.Save(ctx,
	&rangedb.EventRecord{Event: rangedbtest.ThatWasDone{ID: "075d37ae85894093aa818b391442df9b"}},
)))

wg.Wait()
Output:

{
  "aggregateType": "thing",
  "aggregateID": "dce275e43137467b92c9f4eb6c9c77a3",
  "globalSequenceNumber": 0,
  "streamSequenceNumber": 0,
  "insertTimestamp": 0,
  "eventID": "d2ba8e70072943388203c438d4e94bf3",
  "eventType": "ThingWasDone",
  "data": {
    "id": "dce275e43137467b92c9f4eb6c9c77a3",
    "number": 100
  },
  "metadata": null
}
{
  "aggregateType": "that",
  "aggregateID": "075d37ae85894093aa818b391442df9b",
  "globalSequenceNumber": 2,
  "streamSequenceNumber": 0,
  "insertTimestamp": 2,
  "eventID": "2e9e6918af10498cb7349c89a351fdb7",
  "eventType": "ThatWasDone",
  "data": {
    "ID": "075d37ae85894093aa818b391442df9b"
  },
  "metadata": null
}
Example (StreamAllEvents)
// Given
shortuuid.SetRand(100)
inMemoryStore := inmemorystore.New(inmemorystore.WithClock(sequentialclock.New()))
websocketApi, err := rangedbws.New(rangedbws.WithStore(inMemoryStore))
PrintError(err)
defer websocketApi.Stop()

server := httptest.NewServer(websocketApi)
defer server.Close()

serverAddress := strings.TrimPrefix(server.URL, "http://")
websocketUrl := fmt.Sprintf("ws://%s/events", serverAddress)
socket, _, err := websocket.DefaultDialer.Dial(websocketUrl, nil)
PrintError(err)

var wg sync.WaitGroup
wg.Add(1)

go func() {
	defer Close(socket)

	for i := 0; i < 2; i++ {
		_, message, err := socket.ReadMessage()
		PrintError(err)
		fmt.Println(jsontools.PrettyJSON(message))
	}

	wg.Done()
}()

// When
ctx, done := context.WithTimeout(context.Background(), 5*time.Second)
defer done()
PrintError(IgnoreFirstNumber(inMemoryStore.Save(ctx,
	&rangedb.EventRecord{Event: rangedbtest.ThingWasDone{ID: "52e247a7c0a54a65906e006dac9be108", Number: 100}},
)))
PrintError(IgnoreFirstNumber(inMemoryStore.Save(ctx,
	&rangedb.EventRecord{Event: rangedbtest.AnotherWasComplete{ID: "a3d9faa7614a46b388c6dce9984b6620"}},
)))

wg.Wait()
Output:

{
  "aggregateType": "thing",
  "aggregateID": "52e247a7c0a54a65906e006dac9be108",
  "globalSequenceNumber": 0,
  "streamSequenceNumber": 0,
  "insertTimestamp": 0,
  "eventID": "d2ba8e70072943388203c438d4e94bf3",
  "eventType": "ThingWasDone",
  "data": {
    "id": "52e247a7c0a54a65906e006dac9be108",
    "number": 100
  },
  "metadata": null
}
{
  "aggregateType": "another",
  "aggregateID": "a3d9faa7614a46b388c6dce9984b6620",
  "globalSequenceNumber": 1,
  "streamSequenceNumber": 0,
  "insertTimestamp": 1,
  "eventID": "99cbd88bbcaf482ba1cc96ed12541707",
  "eventType": "AnotherWasComplete",
  "data": {
    "id": "a3d9faa7614a46b388c6dce9984b6620"
  },
  "metadata": null
}

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func New

func New(options ...Option) (*websocketAPI, error)

New constructs a websocketAPI.

Types

type MessageWriter

type MessageWriter interface {
	WriteMessage(messageType int, data []byte) error
}

MessageWriter is the interface for writing a message to a connection

type Option

type Option func(*websocketAPI)

Option defines functional option parameters for websocketAPI.

func WithLogger

func WithLogger(logger *log.Logger) Option

WithLogger is a functional option to inject a Logger.

func WithStore

func WithStore(store rangedb.Store) Option

WithStore is a functional option to inject a Store.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL