Debezium Server with PostgreSQL and Redis Stream
Debezium is a great tool for capturing the row level changes that happen on a Database and stream those changes to a broker of our choice.
Since this functionality stays in the boundaries of a Database, it helps on keeping our applications simple. For example there in no need for an application to emit events on any database interactions. Debezium will monitor the row changes and will emit the events. Based on the broker solution used with Debezium a consumer can subscribe to the broker thus receive the changes.
PostgreSQL being a popular SQL database, it is supported by Debezium.
Our goal would be to listen to PostgreSQL changes and stream them to a Redis stream through a Debezium Server. It is common to use Debizum with Kafka, in case where Kafka is not present in a team’s Tech stack we can use other brokers.
In our case we would keep things lightweight by using Redis Streams.
Redis will be setup without any extra configurations.
In order to use PostgreSQL with Debezium it is essentials to alter the configuration on postgreSQL.
The configuration we shall use on postgreSQL will be the following
listen_addresses = '*' port = 5432 max_connections = 20 shared_buffers = 128MB temp_buffers = 8MB work_mem = 4MB wal_level = logical max_wal_senders = 3
As we can see we use the logical_decoding from PostgreSQL.
From the documentation:
Logical decoding is the process of extracting all persistent changes to a database’s tables into a coherent, easy to understand format which can be interpreted without detailed knowledge of the database’s internal state.
In PostgreSQL, logical decoding is implemented by decoding the Contents of the write-ahead log, which describe changes on a storage level, into an application-specific form such as a stream of tuples or SQL statements.
We will also create a namespace and a table for PostgreSQL. The namespace and the table will be the ones to listen for changes.
#!/bin/bash set -e psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL create schema test_schema; create table test_schema.employee( id SERIAL PRIMARY KEY, firstname TEXT NOT NULL, lastname TEXT NOT NULL, email TEXT not null, age INT NOT NULL, salary real, unique(email) ); EOSQL
This is the table we used in a previous PostgreSQL example.
Debezium will have to be able to interact with the PostgreSQL server as well as the the redis server.
The configuration should be the following.
debezium.sink.type=redis debezium.sink.redis.address=redis:6379 debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector debezium.source.offset.storage.file.filename=data/offsets.dat debezium.source.offset.flush.interval.ms=0 debezium.source.database.hostname=postgres debezium.source.database.port=5432 debezium.source.database.user=postgres debezium.source.database.password=postgres debezium.source.database.dbname=postgres debezium.source.database.server.name=tutorial debezium.source.schema.whitelist=test_schema debezium.source.plugin.name=pgoutput
By examining the configuration we can see that we have the necessary information for Debezium to communicate to the PostgreSQL database, also we specify the schema that we created previously. Therefore only changes from that schema will be streamed. We can also make things more restrictive for example whitelisting tables.
Since this demo will involve three different software Components docker compose will come in handy.
version: '3.1' services: redis: image: redis ports: - 6379:6379 depends_on: - postgres postgres: image: postgres restart: always environment: POSTGRES_USER: postgres POSTGRES_PASSWORD: postgres volumes: - ./postgresql.conf:/etc/postgresql/postgresql.conf - ./init:/docker-entrypoint-initdb.d command: - "-c" - "config_file=/etc/postgresql/postgresql.conf" ports: - 5432:5432 debezium: image: debezium/server volumes: - ./conf:/debezium/conf - ./data;/debezium/data depends_on: - redis
By using Compose we were able to spin up three different software components on the same network. This helps the components to interact with each other by using the dns names of the services as specified on Compose. Also the configuration files we created previously are mounted to the Docker containers. Docker Compose V2 is out there with many good features, you can find more about it on the book I authored
A Developer’s Essential Guide to Docker Compose.
In order to get the stack running we shall execute the following command
$ docker compose up
Since it is up and running, we can now start listening for events.
We shall login to Redis and start listen for any possible database updates.
$ docker exec -it debezium-example-redis-1 redis-cli > xread block 1000000 streams tutorial.test_schema.employee $
This will make it possible to block until we receive one event from the stream.
If we examine the stream name we should see the pattern of {server-name}.{schema}.{table}. This would allow consumers to subscribe only to the changes of interest.
Onwards we will make an entry.
$ docker exec -it debezium-example-postgres-1 psql postgres postgres > insert into test_schema.employee (firstname,lastname,email,age,salary) values ('John','Doe 1','john1@doe.com',18,1234.23); > \q
If we check the redis session we should see that we received an event from the Redis stream
127.0.0.1:6379> xread block 1000000 streams tutorial.test_schema.employee $ 1) 1) "tutorial.test_schema.employee" 2) 1) 1) "1663796657336-0" 2) 1) "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"default\":0,\"field\":\"id\"}],\"optional\":false,\"name\":\"tutorial.test_schema.employee.Key\"},\"payload\":{\"id\":1}}" 2) "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"default\":0,\"field\":\"id\"},{\"type\":\"string\",\"optional\":false,\"field\":\"firstname\"},{\"type\":\"string\",\"optional\":false,\"field\":\"lastname\"},{\"type\":\"string\",\"optional\":false,\"field\":\"email\"},{\"type\":\"int32\",\"optional\":false,\"field\":\"age\"},{\"type\":\"float\",\"optional\":true,\"field\":\"salary\"}],\"optional\":true,\"name\":\"tutorial.test_schema.employee.Value\",\"field\":\"before\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"default\":0,\"field\":\"id\"},{\"type\":\"string\",\"optional\":false,\"field\":\"firstname\"},{\"type\":\"string\",\"optional\":false,\"field\":\"lastname\"},{\"type\":\"string\",\"optional\":false,\"field\":\"email\"},{\"type\":\"int32\",\"optional\":false,\"field\":\"age\"},{\"type\":\"float\",\"optional\":true,\"field\":\"salary\"}],\"optional\":true,\"name\":\"tutorial.test_schema.employee.Value\",\"field\":\"after\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"version\"},{\"type\":\"string\",\"optional\":false,\"field\":\"connector\"},{\"type\":\"string\",\"optional\":false,\"field\":\"name\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"ts_ms\"},{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.data.Enum\",\"version\":1,\"parameters\":{\"allowed\":\"true,last,false,incremental\"},\"default\":\"false\",\"field\":\"snapshot\"},{\"type\":\"string\",\"optional\":false,\"field\":\"db\"},{\"type\":\"string\",\"optional\":true,\"field\":\"sequence\"},{\"type\":\"string\",\"optional\":false,\"field\":\"schema\"},{\"type\":\"string\",\"optional\":false,\"field\":\"table\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"txId\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"lsn\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"xmin\"}],\"optional\":false,\"name\":\"io.debezium.connector.postgresql.Source\",\"field\":\"source\"},{\"type\":\"string\",\"optional\":false,\"field\":\"op\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"ts_ms\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"id\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"total_order\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"data_collection_order\"}],\"optional\":true,\"field\":\"transaction\"}],\"optional\":false,\"name\":\"tutorial.test_schema.employee.Envelope\"},\"payload\":{\"before\":null,\"after\":{\"id\":1,\"firstname\":\"John\",\"lastname\":\"Doe 1\",\"email\":\"john1@doe.com\",\"age\":18,\"salary\":1234.23},\"source\":{\"version\":\"1.9.5.Final\",\"connector\":\"postgresql\",\"name\":\"tutorial\",\"ts_ms\":1663796656393,\"snapshot\":\"false\",\"db\":\"postgres\",\"sequence\":\"[null,\\\"24289128\\\"]\",\"schema\":\"test_schema\",\"table\":\"employee\",\"txId\":738,\"lsn\":24289128,\"xmin\":null},\"op\":\"c\",\"ts_ms\":1663796657106,\"transaction\":null}}" (10.17s) 127.0.0.1:6379>
How cool is that? We can now start streaming our databases changes to the broker of our choice.
You can find the source code on GitHub.
Published on Java Code Geeks with permission by Emmanouil Gkatziouras, partner at our JCG program. See the original article here: Debezium Server with PostgreSQL and Redis Stream Opinions expressed by Java Code Geeks contributors are their own. |