rangedbapi

package
v0.12.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 2, 2021 License: BSD-3-Clause Imports: 18 Imported by: 1

Documentation

Overview

Example (GetAllEvents)
// Given
rangedbtest.SetRand(100)
inMemoryStore := inmemorystore.New(
	inmemorystore.WithClock(sequentialclock.New()),
	inmemorystore.WithUUIDGenerator(rangedbtest.NewSeededUUIDGenerator()),
)
api, err := rangedbapi.New(rangedbapi.WithStore(inMemoryStore))
PrintError(err)

server := httptest.NewServer(api)
defer server.Close()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
PrintError(IgnoreFirstNumber(inMemoryStore.Save(ctx,
	&rangedb.EventRecord{Event: rangedbtest.ThingWasDone{ID: "605f20348fb940e386c171d51c877bf1", Number: 100}},
)))
PrintError(IgnoreFirstNumber(inMemoryStore.Save(ctx,
	&rangedb.EventRecord{Event: rangedbtest.AnotherWasComplete{ID: "a095086e52bc4617a1763a62398cd645"}},
)))
url := fmt.Sprintf("%s/events.json", server.URL)

// When
response, err := http.Get(url)
PrintError(err)
defer Close(response.Body)

body, err := ioutil.ReadAll(response.Body)
PrintError(err)

fmt.Println(jsontools.PrettyJSON(body))
Output:

[
  {
    "aggregateType": "thing",
    "aggregateID": "605f20348fb940e386c171d51c877bf1",
    "globalSequenceNumber": 1,
    "streamSequenceNumber": 1,
    "insertTimestamp": 0,
    "eventID": "d2ba8e70072943388203c438d4e94bf3",
    "eventType": "ThingWasDone",
    "data": {
      "id": "605f20348fb940e386c171d51c877bf1",
      "number": 100
    },
    "metadata": null
  },
  {
    "aggregateType": "another",
    "aggregateID": "a095086e52bc4617a1763a62398cd645",
    "globalSequenceNumber": 2,
    "streamSequenceNumber": 1,
    "insertTimestamp": 1,
    "eventID": "99cbd88bbcaf482ba1cc96ed12541707",
    "eventType": "AnotherWasComplete",
    "data": {
      "id": "a095086e52bc4617a1763a62398cd645"
    },
    "metadata": null
  }
]
Example (GetEventsByAggregateType)
// Given
rangedbtest.SetRand(100)
inMemoryStore := inmemorystore.New(
	inmemorystore.WithClock(sequentialclock.New()),
	inmemorystore.WithUUIDGenerator(rangedbtest.NewSeededUUIDGenerator()),
)
api, err := rangedbapi.New(rangedbapi.WithStore(inMemoryStore))
PrintError(err)

server := httptest.NewServer(api)
defer server.Close()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
PrintError(IgnoreFirstNumber(inMemoryStore.Save(ctx,
	&rangedb.EventRecord{Event: rangedbtest.ThingWasDone{ID: "605f20348fb940e386c171d51c877bf1", Number: 100}},
)))
PrintError(IgnoreFirstNumber(inMemoryStore.Save(ctx,
	&rangedb.EventRecord{Event: rangedbtest.AnotherWasComplete{ID: "a095086e52bc4617a1763a62398cd645"}},
)))
url := fmt.Sprintf("%s/events/thing.json", server.URL)

// When
response, err := http.Get(url)
PrintError(err)
defer Close(response.Body)

body, err := ioutil.ReadAll(response.Body)
PrintError(err)

fmt.Println(jsontools.PrettyJSON(body))
Output:

[
  {
    "aggregateType": "thing",
    "aggregateID": "605f20348fb940e386c171d51c877bf1",
    "globalSequenceNumber": 1,
    "streamSequenceNumber": 1,
    "insertTimestamp": 0,
    "eventID": "d2ba8e70072943388203c438d4e94bf3",
    "eventType": "ThingWasDone",
    "data": {
      "id": "605f20348fb940e386c171d51c877bf1",
      "number": 100
    },
    "metadata": null
  }
]
Example (GetEventsByAggregateTypes)
// Given
rangedbtest.SetRand(100)
inMemoryStore := inmemorystore.New(
	inmemorystore.WithClock(sequentialclock.New()),
	inmemorystore.WithUUIDGenerator(rangedbtest.NewSeededUUIDGenerator()),
)
api, err := rangedbapi.New(rangedbapi.WithStore(inMemoryStore))
PrintError(err)

server := httptest.NewServer(api)
defer server.Close()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
PrintError(IgnoreFirstNumber(inMemoryStore.Save(ctx,
	&rangedb.EventRecord{Event: rangedbtest.ThingWasDone{ID: "605f20348fb940e386c171d51c877bf1", Number: 100}},
)))
PrintError(IgnoreFirstNumber(inMemoryStore.Save(ctx,
	&rangedb.EventRecord{Event: rangedbtest.AnotherWasComplete{ID: "a095086e52bc4617a1763a62398cd645"}},
)))
url := fmt.Sprintf("%s/events/thing,another.json", server.URL)

// When
response, err := http.Get(url)
PrintError(err)
defer Close(response.Body)

body, err := ioutil.ReadAll(response.Body)
PrintError(err)

fmt.Println(jsontools.PrettyJSON(body))
Output:

[
  {
    "aggregateType": "thing",
    "aggregateID": "605f20348fb940e386c171d51c877bf1",
    "globalSequenceNumber": 1,
    "streamSequenceNumber": 1,
    "insertTimestamp": 0,
    "eventID": "d2ba8e70072943388203c438d4e94bf3",
    "eventType": "ThingWasDone",
    "data": {
      "id": "605f20348fb940e386c171d51c877bf1",
      "number": 100
    },
    "metadata": null
  },
  {
    "aggregateType": "another",
    "aggregateID": "a095086e52bc4617a1763a62398cd645",
    "globalSequenceNumber": 2,
    "streamSequenceNumber": 1,
    "insertTimestamp": 1,
    "eventID": "99cbd88bbcaf482ba1cc96ed12541707",
    "eventType": "AnotherWasComplete",
    "data": {
      "id": "a095086e52bc4617a1763a62398cd645"
    },
    "metadata": null
  }
]
Example (GetEventsByStream)
// Given
rangedbtest.SetRand(100)
inMemoryStore := inmemorystore.New(
	inmemorystore.WithClock(sequentialclock.New()),
	inmemorystore.WithUUIDGenerator(rangedbtest.NewSeededUUIDGenerator()),
)
api, err := rangedbapi.New(rangedbapi.WithStore(inMemoryStore))
PrintError(err)

server := httptest.NewServer(api)
defer server.Close()

ctx, done := context.WithTimeout(context.Background(), 5*time.Second)
defer done()
PrintError(IgnoreFirstNumber(inMemoryStore.Save(ctx,
	&rangedb.EventRecord{Event: rangedbtest.ThingWasDone{ID: "605f20348fb940e386c171d51c877bf1", Number: 100}},
	&rangedb.EventRecord{Event: rangedbtest.ThingWasDone{ID: "605f20348fb940e386c171d51c877bf1", Number: 200}},
)))
PrintError(IgnoreFirstNumber(inMemoryStore.Save(ctx,
	&rangedb.EventRecord{Event: rangedbtest.AnotherWasComplete{ID: "a095086e52bc4617a1763a62398cd645"}},
)))
url := fmt.Sprintf("%s/events/thing/605f20348fb940e386c171d51c877bf1.json", server.URL)

// When
response, err := http.Get(url)
PrintError(err)
defer Close(response.Body)

body, err := ioutil.ReadAll(response.Body)
PrintError(err)

fmt.Println(jsontools.PrettyJSON(body))
Output:

[
  {
    "aggregateType": "thing",
    "aggregateID": "605f20348fb940e386c171d51c877bf1",
    "globalSequenceNumber": 1,
    "streamSequenceNumber": 1,
    "insertTimestamp": 0,
    "eventID": "d2ba8e70072943388203c438d4e94bf3",
    "eventType": "ThingWasDone",
    "data": {
      "id": "605f20348fb940e386c171d51c877bf1",
      "number": 100
    },
    "metadata": null
  },
  {
    "aggregateType": "thing",
    "aggregateID": "605f20348fb940e386c171d51c877bf1",
    "globalSequenceNumber": 2,
    "streamSequenceNumber": 2,
    "insertTimestamp": 1,
    "eventID": "99cbd88bbcaf482ba1cc96ed12541707",
    "eventType": "ThingWasDone",
    "data": {
      "id": "605f20348fb940e386c171d51c877bf1",
      "number": 200
    },
    "metadata": null
  }
]
Example (GetEventsByStreamNdJson)
// Given
rangedbtest.SetRand(100)
inMemoryStore := inmemorystore.New(
	inmemorystore.WithClock(sequentialclock.New()),
	inmemorystore.WithUUIDGenerator(rangedbtest.NewSeededUUIDGenerator()),
)
api, err := rangedbapi.New(rangedbapi.WithStore(inMemoryStore))
PrintError(err)

server := httptest.NewServer(api)
defer server.Close()

ctx, done := context.WithTimeout(context.Background(), 5*time.Second)
defer done()
PrintError(IgnoreFirstNumber(inMemoryStore.Save(ctx,
	&rangedb.EventRecord{Event: rangedbtest.ThingWasDone{ID: "605f20348fb940e386c171d51c877bf1", Number: 100}},
	&rangedb.EventRecord{Event: rangedbtest.ThingWasDone{ID: "605f20348fb940e386c171d51c877bf1", Number: 200}},
)))
PrintError(IgnoreFirstNumber(inMemoryStore.Save(ctx,
	&rangedb.EventRecord{Event: rangedbtest.AnotherWasComplete{ID: "a095086e52bc4617a1763a62398cd645"}},
)))
url := fmt.Sprintf("%s/events/thing/605f20348fb940e386c171d51c877bf1.ndjson", server.URL)

// When
response, err := http.Get(url)
PrintError(err)
defer Close(response.Body)

body, err := ioutil.ReadAll(response.Body)
PrintError(err)

fmt.Println(string(body))
Output:

{"aggregateType":"thing","aggregateID":"605f20348fb940e386c171d51c877bf1","globalSequenceNumber":1,"streamSequenceNumber":1,"insertTimestamp":0,"eventID":"d2ba8e70072943388203c438d4e94bf3","eventType":"ThingWasDone","data":{"id":"605f20348fb940e386c171d51c877bf1","number":100},"metadata":null}
{"aggregateType":"thing","aggregateID":"605f20348fb940e386c171d51c877bf1","globalSequenceNumber":2,"streamSequenceNumber":2,"insertTimestamp":1,"eventID":"99cbd88bbcaf482ba1cc96ed12541707","eventType":"ThingWasDone","data":{"id":"605f20348fb940e386c171d51c877bf1","number":200},"metadata":null}
Example (OptimisticDeleteStream)
// Given
inMemoryStore := inmemorystore.New(
	inmemorystore.WithClock(sequentialclock.New()),
)
api, err := rangedbapi.New(rangedbapi.WithStore(inMemoryStore))
PrintError(err)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
PrintError(IgnoreFirstNumber(inMemoryStore.Save(ctx,
	&rangedb.EventRecord{Event: rangedbtest.ThingWasDone{ID: "4b9a415c53734b69ac459a7e53eb4c1b", Number: 100}},
)))

server := httptest.NewServer(api)
defer server.Close()

url := fmt.Sprintf("%s/delete-stream/thing/4b9a415c53734b69ac459a7e53eb4c1b", server.URL)
request, err := http.NewRequest(http.MethodPost, url, nil)
PrintError(err)
request.Header.Set("ExpectedStreamSequenceNumber", "1")
client := http.DefaultClient

// When
response, err := client.Do(request)
PrintError(err)
defer Close(response.Body)

body, err := ioutil.ReadAll(response.Body)
PrintError(err)
fmt.Println(jsontools.PrettyJSON(body))
Output:

{
  "status": "OK",
  "eventsDeleted": 1
}
Example (OptimisticDeleteStream_failure)
// Given
inMemoryStore := inmemorystore.New(
	inmemorystore.WithClock(sequentialclock.New()),
)
api, err := rangedbapi.New(rangedbapi.WithStore(inMemoryStore))
PrintError(err)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
PrintError(IgnoreFirstNumber(inMemoryStore.Save(ctx,
	&rangedb.EventRecord{Event: rangedbtest.ThingWasDone{ID: "4b9a415c53734b69ac459a7e53eb4c1b", Number: 100}},
)))

server := httptest.NewServer(api)
defer server.Close()

url := fmt.Sprintf("%s/delete-stream/thing/4b9a415c53734b69ac459a7e53eb4c1b", server.URL)
request, err := http.NewRequest(http.MethodPost, url, nil)
PrintError(err)
request.Header.Set("ExpectedStreamSequenceNumber", "2")
client := http.DefaultClient

// When
response, err := client.Do(request)
PrintError(err)
defer Close(response.Body)

body, err := ioutil.ReadAll(response.Body)
PrintError(err)
fmt.Println(response.Status)
fmt.Println(jsontools.PrettyJSON(body))
Output:

409 Conflict
{
  "status": "Failed",
  "message": "unexpected sequence number: 2, actual: 1"
}
Example (OptimisticSaveEvents)
// Given
inMemoryStore := inmemorystore.New(
	inmemorystore.WithClock(sequentialclock.New()),
)
api, err := rangedbapi.New(rangedbapi.WithStore(inMemoryStore))
PrintError(err)

server := httptest.NewServer(api)
defer server.Close()

const requestBody = `[
		{
			"eventType": "ThingWasDone",
			"data":{
				"id": "141b39d2b9854f8093ef79dc47dae6af",
				"number": 100
			},
			"metadata":null
		},
		{
			"eventType": "ThingWasDone",
			"data":{
				"id": "141b39d2b9854f8093ef79dc47dae6af",
				"number": 200
			},
			"metadata":null
		}
	]`
url := fmt.Sprintf("%s/save-events/thing/141b39d2b9854f8093ef79dc47dae6af", server.URL)
request, err := http.NewRequest(http.MethodPost, url, strings.NewReader(requestBody))
PrintError(err)
request.Header.Set("Content-Type", "application/json")
request.Header.Set("ExpectedStreamSequenceNumber", "0")
client := http.DefaultClient

// When
response, err := client.Do(request)
PrintError(err)
defer Close(response.Body)

body, err := ioutil.ReadAll(response.Body)
PrintError(err)
fmt.Println(jsontools.PrettyJSON(body))
Output:

{
  "status": "OK",
  "streamSequenceNumber": 2
}
Example (OptimisticSaveEvents_failure)
// Given
inMemoryStore := inmemorystore.New(
	inmemorystore.WithClock(sequentialclock.New()),
)
api, err := rangedbapi.New(rangedbapi.WithStore(inMemoryStore))
PrintError(err)

server := httptest.NewServer(api)
defer server.Close()

const requestBody = `[
		{
			"eventType": "ThingWasDone",
			"data":{
				"id": "141b39d2b9854f8093ef79dc47dae6af",
				"number": 100
			},
			"metadata":null
		},
		{
			"eventType": "ThingWasDone",
			"data":{
				"id": "141b39d2b9854f8093ef79dc47dae6af",
				"number": 200
			},
			"metadata":null
		}
	]`
url := fmt.Sprintf("%s/save-events/thing/141b39d2b9854f8093ef79dc47dae6af", server.URL)
request, err := http.NewRequest(http.MethodPost, url, strings.NewReader(requestBody))
PrintError(err)
request.Header.Set("Content-Type", "application/json")
request.Header.Set("ExpectedStreamSequenceNumber", "2")
client := http.DefaultClient

// When
response, err := client.Do(request)
PrintError(err)
defer Close(response.Body)

body, err := ioutil.ReadAll(response.Body)
PrintError(err)
fmt.Println(response.Status)
fmt.Println(jsontools.PrettyJSON(body))
Output:

409 Conflict
{
  "status": "Failed",
  "message": "unexpected sequence number: 2, actual: 0"
}
Example (SaveEvent)
// Given
inMemoryStore := inmemorystore.New(
	inmemorystore.WithClock(sequentialclock.New()),
)
api, err := rangedbapi.New(rangedbapi.WithStore(inMemoryStore))
PrintError(err)

server := httptest.NewServer(api)
defer server.Close()

const requestBody = `[
		{
			"eventType": "ThingWasDone",
			"data":{
				"id": "141b39d2b9854f8093ef79dc47dae6af",
				"number": 100
			},
			"metadata":null
		},
		{
			"eventType": "ThingWasDone",
			"data":{
				"id": "141b39d2b9854f8093ef79dc47dae6af",
				"number": 200
			},
			"metadata":null
		}
	]`
url := fmt.Sprintf("%s/save-events/thing/141b39d2b9854f8093ef79dc47dae6af", server.URL)

// When
response, err := http.Post(url, "application/json", strings.NewReader(requestBody))
PrintError(err)
defer Close(response.Body)

body, err := ioutil.ReadAll(response.Body)
PrintError(err)
fmt.Println(jsontools.PrettyJSON(body))
Output:

{
  "status": "OK",
  "streamSequenceNumber": 2
}

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func New

func New(options ...Option) (*api, error)

New constructs an api.

Types

type Option

type Option func(*api)

Option defines functional option parameters for api.

func WithBaseUri

func WithBaseUri(baseUri string) Option

WithBaseUri is a functional option to inject the base URI for use in API links.

func WithLogger added in v0.6.0

func WithLogger(logger *log.Logger) Option

WithLogger is a functional option to inject a Logger.

func WithSnapshotStore added in v0.4.0

func WithSnapshotStore(snapshotStore projection.SnapshotStore) Option

WithSnapshotStore is a functional option to inject a SnapshotStore.

func WithStore

func WithStore(store rangedb.Store) Option

WithStore is a functional option to inject a Store.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL