postgresworker

package
v1.18.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 30, 2024 License: Apache-2.0 Imports: 21 Imported by: 0

README

Example

This is example for create Postgres Event Listener (CDC), inspired by Hasura Event Triggers

Create delivery handler

package workerhandler

import (
	"context"
	"encoding/json"
	
	"example.service/internal/modules/examplemodule/delivery/workerhandler"

	"github.com/golangid/candi/candishared"
	"github.com/golangid/candi/codebase/factory/types"
	"github.com/golangid/candi/tracer"
)

// PostgresListenerHandler struct
type PostgresListenerHandler struct {
	uc        usecase.Usecase
	validator interfaces.Validator
}

// NewPostgresListenerHandler constructor
func NewPostgresListenerHandler(uc usecase.Usecase, validator interfaces.Validator) *PostgresListenerHandler {
	return &PostgresListenerHandler{
		uc:        uc,
		validator: validator,
	}
}

// MountHandlers mount handler group
func (h *PostgresListenerHandler) MountHandlers(group *types.WorkerHandlerGroup) {
	group.Add("table-names", h.handleDataChange) // listen change data capture on table "table-names"
}

func (h *PostgresListenerHandler) handleDataChange(eventContext *candishared.EventContext) error {
	trace := tracer.StartTrace(eventContext.Context(), "DeliveryPostgresListener:HandleDataChange")
	defer trace.Finish()

	fmt.Printf("data change on table 'table-names' detected: %s\n", eventContext.Message())
	// call usecase
	return nil
}


Register in module

package examplemodule

import (
	"example.service/internal/modules/examplemodule/delivery/workerhandler"

	"github.com/golangid/candi/codebase/factory/dependency"
	"github.com/golangid/candi/codebase/factory/types"
	"github.com/golangid/candi/codebase/interfaces"
)

type Module struct {
	// ...another delivery handler
	workerHandlers map[types.Worker]interfaces.WorkerHandler
}

func NewModules(deps dependency.Dependency) *Module {
	return &Module{
		workerHandlers: map[types.Worker]interfaces.WorkerHandler{
			// ...another worker handler
			// ...
			types.PostgresListener: workerhandler.NewPostgresListenerHandler(usecaseUOW.User(), deps.GetValidator()),
		},
	}
}

// ...another method

Message in JSON Payload

{
	"event_id": "<md5-hash-unique-event-id>",
	"table": "<table-name>",
	"action": "<operation-name>", // INSERT, UPDATE, or DELETE
	"data": {
		"old": <old-column-values-object>,
		"new": <new-column-values-object>
	}
}

Documentation

Index

Constants

View Source
const (
	// ActionInsert const
	ActionInsert = "INSERT"
	// ActionUpdate const
	ActionUpdate = "UPDATE"
	// ActionDelete const
	ActionDelete = "DELETE"
)

Variables

This section is empty.

Functions

func CreateHandlerRoute added in v1.13.11

func CreateHandlerRoute(sourceName, tableName string) string

CreateHandlerRoute creating key pattern for handler

func NewWorker

func NewWorker(service factory.ServiceFactory, opts ...OptionFunc) factory.AppServerFactory

NewWorker create new postgres event listener

func ParseHandlerRoute added in v1.13.11

func ParseHandlerRoute(str string) (sourceName, tableName string)

ParseHandlerRoute helper

Types

type EventPayload

type EventPayload struct {
	EventID string           `json:"event_id"`
	Table   string           `json:"table"`
	Action  string           `json:"action"`
	Data    EventPayloadData `json:"data"`
}

EventPayload event model

func (EventPayload) GetID added in v1.12.4

func (e EventPayload) GetID() string

GetID get id if old/new data is empty, cause from long payload limitation

type EventPayloadData added in v1.9.0

type EventPayloadData struct {
	IsTooLongPayload bool   `json:"is_too_long_payload,omitempty"`
	OldID            string `json:"old_id"`
	NewID            string `json:"new_id"`
	Old              any    `json:"old"`
	New              any    `json:"new"`
}

EventPayloadData event data

type OptionFunc added in v1.7.4

type OptionFunc func(*option)

OptionFunc type

func AddPostgresDSN added in v1.13.11

func AddPostgresDSN(sourceName, dsn string) OptionFunc

AddPostgresDSN option func for add multple postgres source to be listen

func SetDBOption added in v1.17.2

func SetDBOption(dbOption func(*sql.DB)) OptionFunc

SetDBOption option func

func SetDebugMode added in v1.7.4

func SetDebugMode(debugMode bool) OptionFunc

SetDebugMode option func

func SetLocker added in v1.8.8

func SetLocker(locker interfaces.Locker) OptionFunc

SetLocker option func

func SetMaxGoroutines added in v1.7.4

func SetMaxGoroutines(maxGoroutines int) OptionFunc

SetMaxGoroutines option func

func SetMaxReconnectInterval added in v1.16.1

func SetMaxReconnectInterval(maxReconnectInterval time.Duration) OptionFunc

SetMaxReconnectInterval option func

func SetMinReconnectInterval added in v1.16.1

func SetMinReconnectInterval(minReconnectInterval time.Duration) OptionFunc

SetMinReconnectInterval option func

func SetOnErrorConnectionCallback added in v1.17.0

func SetOnErrorConnectionCallback(callback func(error)) OptionFunc

SetOnErrorConnectionCallback option func for add error connection callback

func SetPostgresDSN added in v1.7.4

func SetPostgresDSN(dsn string) OptionFunc

SetPostgresDSN option func

func SetWorkerType added in v1.17.5

func SetWorkerType(wt types.Worker) OptionFunc

SetWorkerType option func

type PostgresHandlerRouteKey added in v1.13.11

type PostgresHandlerRouteKey struct {
	SourceName string `json:"sourceName"`
	TableName  string `json:"tableName"`
}

PostgresHandlerRouteKey key model

func (PostgresHandlerRouteKey) String added in v1.13.11

func (p PostgresHandlerRouteKey) String() string

String implement stringer

type PostgresSource added in v1.13.11

type PostgresSource struct {
	// contains filtered or unexported fields
}

PostgresSource model

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL