kanthorq

package module
v0.0.0-...-81e983d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 6, 2024 License: MIT Imports: 7 Imported by: 2

README

@kanthorlabs/kanthorq

Messaging System backed by PostgreSQL

KanthorQ is a robust high-performance messaging system for Go and Postgres. Checkout documentation to discover more.

Getting Started

Prerequisites

Before diving into KanthorQ, you’ll need a PostgreSQL database. This can be a PostgreSQL instance running locally or in the cloud. Alternatively, you can use any database that supports the PostgreSQL wire protocol, such as CockroachDB or Amazon Aurora (PostgreSQL-compatible edition).

If you don’t have a PostgreSQL instance running, you can quickly start one locally using Docker:

docker run --name postgres -p 5432:5432 -e POSTGRES_PASSWORD=changemenow -d postgres:16
Installation

To install KanthorQ, make sure you're in a Go project directory (one that contains a go.mod file). Then run the following command:

go get github.com/kanthorlabs/kanthorq
Running migrations

KanthorQ relies on PostgreSQL to manage its events and tasks. To set up the necessary database schema, you’ll need to run some migrations. First, install the KanthorQ command-line tool:

go install github.com/kanthorlabs/kanthorq/cmd/kanthorq@latest

Next, run the migrations to set up KanthorQ’s database schema:

kanthorq migrate up -s 'github://kanthorlabs/kanthorq/migration#main' -d 'postgres://postgres:changemenow@localhost:5432/postgres?sslmode=disable'

Make sure to replace the -d option with the URI of your PostgreSQL instance if you're using a different database setup.

Sending Events with a Publisher

Once the migration is complete, you’re ready to start sending events using the publisher. Here’s an example of how to publish events in Go:

import (
	"context"

	"github.com/kanthorlabs/kanthorq"
	"github.com/kanthorlabs/kanthorq/entities"
	"github.com/kanthorlabs/kanthorq/publisher"
)

func main() {
	ctx := context.Background()

	// Initialize a publisher
	pub, cleanup := kanthorq.Pub(ctx, &publisher.Options{
		// Replace the connection string with your database URI
		Connection: "postgres://postgres:changemenow@localhost:5432/postgres?sslmode=disable",
		// Using the default stream for demo purposes
		StreamName: entities.DefaultStreamName,
	})
	// Clean up the publisher after use
	defer cleanup()

	subject := "system.say_hello"
	body := []byte("{\"msg\": \"Hello World!\"}")

	// Define your first event
	event := entities.NewEvent(subject, body)

	events := []*entities.Event{
		event,
		// Another  event
		entities.NewEvent("system.say_hello", []byte("{\"msg\": \"I'm comming!\"}")),
		// And yet another event
		entities.NewEvent("system.say_goodbye", []byte("{\"msg\": \"See you!!\"}")),
	}

	if err := pub.Send(ctx, events); err != nil {
		// Handle error
	}
}

In this example, you initialize a publisher that sends three events with different messages. The publisher handles event sending and interacts with the PostgreSQL database to persist those events.

Handling Events with a Subscriber

Once you’ve sent some events, you’ll want to handle them using a subscriber. Here’s a basic example of how to subscribe to events:

import (
	"context"
	"errors"
	"fmt"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/kanthorlabs/kanthorq"
	"github.com/kanthorlabs/kanthorq/entities"
	"github.com/kanthorlabs/kanthorq/puller"
	"github.com/kanthorlabs/kanthorq/subscriber"
)

func main() {
	// Listen for SIGTERM, so pressing Ctrl-C stops the program
	ctx, stop := signal.NotifyContext(context.TODO(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
	defer stop()

	var options = &subscriber.Options{
		// Replace the connection string with your database URI
		Connection: "postgres://postgres:changemenow@localhost:5432/postgres?sslmode=disable",
		// Use the default stream for demo purposes
		StreamName: entities.DefaultStreamName,
		// Use the default consumer for demo purposes
		ConsumerName: entities.DefaultConsumerName,
		// Receive only events matching the filter,
		// so that both system.say_hello and system.say_goodbye will be processed
		ConsumerSubjectIncludes: []string{"system.>"},
		// Retry the task if it fails this many times
		ConsumerAttemptMax: entities.DefaultConsumerAttemptMax,
		// Reprocess stuck tasks after this duration
		ConsumerVisibilityTimeout: entities.DefaultConsumerVisibilityTimeout,
		Puller: puller.PullerIn{
			// Number of events to pull in one batch
			Size: 100,
			// Wait time before completing the batch if Size isn’t reached
			WaitingTime: 1000,
		},
	}

	// Handle events; this goroutine will block until Ctrl-C is pressed
	err := kanthorq.Sub(ctx, options, func(ctx context.Context, msg *subscriber.Message) error {
		ts := time.UnixMilli(msg.Event.CreatedAt).Format(time.RFC3339)
		// Print the received event
		fmt.Printf("RECEIVED: %s | %s | %s\n", msg.Event.Id, msg.Event.Subject, ts)
		return nil
	})

	// Print any errors, if applicable
	if err != nil && !errors.Is(err, context.Canceled) {
		log.Fatal(err)
	}

	fmt.Println("----- END OF EXAMPLE ------")
}

This example shows a subscriber listening for events matching the subject filter system.>. The subscriber processes all events with subjects such as system.say_hello or system.say_goodbye.

After running the above example, you should see output similar to the following:

2024/08/30 09:18:43 RECEIVED: event_01j6gh7t5v6j2q9ma0n78hw9fe | system.say_hello | 2024-08-30T09:18:42+07:00
2024/08/30 09:18:43 RECEIVED: event_01j6gh7t5s973x2sby12j9pwkc | system.say_hello | 2024-08-30T09:18:42+07:00
2024/08/30 09:18:45 RECEIVED: event_01j6gh7x3pvmk6demx3cq27j1q | system.say_goodbye | 2024-08-30T09:18:45+07:00

See the Defaule Subscriber example for the complete code.

Other features

Thank you

I want to send many thanks to these wonderful libraries, framework, and communities

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Pub

func Pub(ctx context.Context, options *publisher.Options) (publisher.Publisher, func())

func Sub

func Sub(ctx context.Context, options *subscriber.Options, handler subscriber.Handler) error

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL