Replicate Postgres database to Elasticsearch (pgtoes)
Table of contents
Description
pgtoes
is a service for replicating data from Postgres to Elasticsearch.
How it works
pgtoes
leverages the logical decoding feature of Postgres.
What you need
- At least PostgreSQL version 10
- Enable logical decoding feature with few extra params:
wal_level = logical
max_wal_senders = 10
max_replication_slots = 10
wal2json
plugin on Postgres server from here github.com/eulerto/wal2json
apt install git build-essential postgresql-server-dev-<VERSION>
git clone https://github.com/eulerto/wal2json.git wal2json
USE_PGXS=1 make
USE_PGXS=1 make install
- Replication user
CREATE ROLE pgtoes WITH SUPERUSER LOGIN REPLICATION PASSWORD 'secret';
Synopsis
pgtoes --help
pgtoes is a service for streaming data from Postgres to Elasticsearch
Usage:
pgtoes [command]
Available Commands:
help Help about any command
init Initialize database and index initial data
remove Remove created publication and replication slots
run Start streaming changes
Flags:
--config string config file
-h, --help help for pgtoes
Use "pgtoes [command] --help" for more information about a command.
Configuration
NOTE: dateformat uses golang time style layout formatting so if you want to get daily indexes then use dateformat: 2006.01.02
Using YAML file
Example 1: Only one table with monthly indexes and insert only
Indexes are created like pgtoes_demo-t-2020.10
index:
name: pgtoes_demo
format: true
separator: "-"
dateformat: 2006.01
for: public.t
with: insert
source: postgres://pgtoes:secret@127.0.0.1/pgtoes
target: http://127.0.0.1:9200/
Example 2: Multiple tables, all operations and corresponding table indexes
Indexes are created like demo.table1
, demo.table2
etc without schema information
index:
name: demo
format: true
separator: "."
dateformat: ""
for:
- public.table1
- public.table2
- public.table3
with:
- insert
- update
- delete
source: postgres://pgtoes:secret@127.0.0.1/pgtoes
target: http://127.0.0.1:9200/
log:
type: stdout
format: json
metric:
enabled: true
port: 9698
Example 3: Endpoint in Cloud ID format
index:
name: iam
format: true
separator: "."
dateformat: ""
for: public.access
with: insert
source: postgres://pgtoes:secret@127.0.0.1/iam
target:
cloudid: |-
verylongstring
username: myusername
password: secretpassword
Environment variables
Name |
Default value |
Description |
INDEX_NAME |
pgtoes_demo |
string |
INDEX_FORMAT |
true |
true , false |
INDEX_SEPARATOR |
- |
string |
INDEX_DATEFORMAT |
2006.01 |
2006.01.02, 2006.01 or 2006 |
INDEX_FOR |
public.t |
schema.table |
INDEX_WITH |
insert, update, delete |
insert , update , delete or truncate separated with comma |
INDEX_SOURCE |
postgres://pgtoes:secret@127.0.0.1/pgtoes |
postgres://[user[:password]@][host][:port][/dbname] |
INDEX_TARGET |
http://127.0.0.1:9200/ |
HTTP endpoint to Elasticsearch |
INDEX_TARGET_URL |
|
HTTP endpoint to Elasticsearch |
INDEX_TARGET_CLOUDID |
|
Endpoint in Cloud ID format (this precedes URL definition) |
INDEX_TARGET_USERNAME |
|
string |
INDEX_TARGET_PASSWORD |
|
string |
LOG_FORMAT |
text |
text , json |
LOG_OUTPUT |
stdout |
stdout |
LOG_LEVEL |
info |
debug , info , warn , error , fatal |
METRIC_ENABLED |
true |
true , false |
METRIC_PORT |
9698 |
integer |
Deployment
Kubernetes
1. Create namespace
kubectl create namespace pgtoes
kubectl config set-context --current --namespace pgtoes
2. Create configmap
configmap.yaml
:
---
apiVersion: v1
kind: ConfigMap
metadata:
name: your-deploy-name
namespace: pgtoes
data
config.yml: |-
---
index:
name: pgtoes_demo
format: true
separator: "-"
dateformat: 2006.01
for: public.t
with: insert
source: postgres://pgtoes:secret@127.0.0.1/pgtoes
target: http://127.0.0.1:9200/
Apply
kubectl apply -f configmap.yaml
3. Create imagePullSecrets
From existing Docker config.json
kubectl create secret generic regcred --from-file=.dockerconfigjson=~/.docker/config.json --type=kubernetes.io/dockerconfigjson
4. Deploy testpod
testpod.yaml
:
---
apiVersion: v1
kind: Pod
metadata:
name: testpod
spec:
containers:
- name: testpod-container
image: registry.gitlab.viidakko.fi/tojo/pgtoes:latest
command: ["bash", "-c", "sleep 3600"]
volumeMounts:
- name: config-volume
mountPath: /etc/pgtoes
imagePullSecrets:
- name: regcred
restartPolicy: Never
volumes:
- name: config-volume
configMap:
name: "your-deploy-name"
Apply
kubectl apply -f testpod.yaml
5. Use testpod to check connections
Execute bash in testpod
kubectl exec testpod -it -- bash
Inside testpod you can check various things
ping your-postgres-host
ping your-elasticsearch-host
psql -h your-postgres-host -U pgtoes your-database-name
netcat your-elasticsearch-host 9200
6. Initialize environment
Run inside testpod
pgtoes init --config /etc/pgtoes/config.yml
7. Deploy with helm
This will start streaming changes from Postgres database to Elasticsearch and is equivalent to running pgtoes run --config /etc/pgtoes/config.yaml
e.g. if you want to test using testpod.
Installing from inside repository folder. See default values from here helm/values.yaml
$ helm install your-deploy-name ./helm --values your-config.yaml
Code quality and security reports
Sonarqube
Development
Running tests
Needs docker environment up and runnig
cd test/
make test
EXPERIMENTAL: Using Waypoint
Quick setup
1. Establish services
Running tests launches Elasticsearch and Postgresql services to the background which can be used by Waypoint
cd test/
make test
2. Launch Kubernetes with minikube
You will need docker installed
minikube start --vm-driver docker --kubernetes-version 1.16.15
2.1 Set image pull secret
kubectl create secret generic regcred --from-file=.dockerconfigjson=~/.docker/config.json --type=kubernetes.io/dockerconfigjson
3. Launch Waypoint server
waypoint install -accept-tos -platform=docker
4. Run waypoint
waypoint init
waypoint config set -app run INDEX_TARGET="http://<HOST_IP>:19200/"
waypoint config set -app run INDEX_SOURCE="postgres://pgtoes:secret@<HOST_IP>:15432/pgtoes"
waypoint up