postgresworker

package
v1.13.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 16, 2022 License: Apache-2.0 Imports: 16 Imported by: 0

README

Example

This is example for create Postgres Event Listener (CDC), inspired by Hasura Event Triggers

Create delivery handler

package workerhandler

import (
	"context"
	"encoding/json"
	
	"example.service/internal/modules/examplemodule/delivery/workerhandler"

	"github.com/golangid/candi/candishared"
	"github.com/golangid/candi/codebase/factory/types"
	"github.com/golangid/candi/tracer"
)

// PostgresListenerHandler struct
type PostgresListenerHandler struct {
	uc        usecase.Usecase
	validator interfaces.Validator
}

// NewPostgresListenerHandler constructor
func NewPostgresListenerHandler(uc usecase.Usecase, validator interfaces.Validator) *PostgresListenerHandler {
	return &PostgresListenerHandler{
		uc:        uc,
		validator: validator,
	}
}

// MountHandlers mount handler group
func (h *PostgresListenerHandler) MountHandlers(group *types.WorkerHandlerGroup) {
	group.Add("table-names", h.handleDataChange) // listen change data capture on table "table-names"
}

func (h *PostgresListenerHandler) handleDataChange(eventContext *candishared.EventContext) error {
	trace := tracer.StartTrace(eventContext.Context(), "DeliveryPostgresListener:HandleDataChange")
	defer trace.Finish()

	fmt.Printf("data change on table 'table-names' detected: %s\n", eventContext.Message())
	// call usecase
	return nil
}


Register in module

package examplemodule

import (
	"example.service/internal/modules/examplemodule/delivery/workerhandler"

	"github.com/golangid/candi/codebase/factory/dependency"
	"github.com/golangid/candi/codebase/factory/types"
	"github.com/golangid/candi/codebase/interfaces"
)

type Module struct {
	// ...another delivery handler
	workerHandlers map[types.Worker]interfaces.WorkerHandler
}

func NewModules(deps dependency.Dependency) *Module {
	return &Module{
		workerHandlers: map[types.Worker]interfaces.WorkerHandler{
			// ...another worker handler
			// ...
			types.PostgresListener: workerhandler.NewPostgresListenerHandler(usecaseUOW.User(), deps.GetValidator()),
		},
	}
}

// ...another method

Message in JSON Payload

{
	"event_id": "<md5-hash-unique-event-id>",
	"table": "<table-name>",
	"action": "<operation-name>", // INSERT, UPDATE, or DELETE
	"data": {
		"old": <old-column-values-object>,
		"new": <new-column-values-object>
	}
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewWorker

func NewWorker(service factory.ServiceFactory, opts ...OptionFunc) factory.AppServerFactory

NewWorker create new postgres event listener

Types

type EventPayload

type EventPayload struct {
	EventID string           `json:"event_id"`
	Table   string           `json:"table"`
	Action  string           `json:"action"`
	Data    EventPayloadData `json:"data"`
}

EventPayload event model

func (EventPayload) GetID added in v1.12.4

func (e EventPayload) GetID() string

GetID get id if old/new data is empty, cause from long payload limitation

type EventPayloadData added in v1.9.0

type EventPayloadData struct {
	IsTooLongPayload bool        `json:"is_too_long_payload,omitempty"`
	OldID            string      `json:"old_id"`
	NewID            string      `json:"new_id"`
	Old              interface{} `json:"old"`
	New              interface{} `json:"new"`
}

EventPayloadData event data

type OptionFunc added in v1.7.4

type OptionFunc func(*option)

OptionFunc type

func SetDebugMode added in v1.7.4

func SetDebugMode(debugMode bool) OptionFunc

SetDebugMode option func

func SetLocker added in v1.8.8

func SetLocker(locker candiutils.Locker) OptionFunc

SetLocker option func

func SetMaxGoroutines added in v1.7.4

func SetMaxGoroutines(maxGoroutines int) OptionFunc

SetMaxGoroutines option func

func SetPostgresDSN added in v1.7.4

func SetPostgresDSN(dsn string) OptionFunc

SetPostgresDSN option func

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL