Background
PostgreSQL introduced a feature called logical decoding in version 9.4. This poorly-named feature allows us to do some very cool things like build a stream of events corresponding to changes to the table. Imagine something like this:
You have your ORM layer that connects to the DB and inserts, updates, deletes rows in a DB transaction. Now imagine you are able to run an event processor that receives these changes once the DB transaction is committed i.e. the changes are final. You’ll receive the complete row data including old and new value in case of an update to an existing row.
Before you let out a yawn, let me elaborate on why this is super-interesting.
Why is this interesting?
Asynchronous work is a very critical part of any SaaS application. Some examples are:
Send a welcome email to a new user
Generate a CSV output
Save audit logs for debugging and compliance. This is a very common ask from SaaS products.
Historically, most ORM frameworks allow you to hook into DB transaction lifecycle to trigger async work. Examples include Django’s pre_save and post_save signals. Here’s a nice article that shows how to link Django’s signals to Celery and do asynchronous work.
This is a bit restrictive. Imagine that you have 10 microservices. If these microservices are written in different languages / frameworks, the developer has to figure out a way to do this in 10 different code-bases. Furthermore, if a row is modified by a SQL script, those changes don’t even go via the ORM and are completely missed.
Using PostgreSQL to do the heavy-lifting decouples the generation of the events from the ORM layer completely and is more robust.
Let’s give it a spin
To try this out, we’ll need a PostgreSQL DB with a couple of things:
Logical replication must be enabled
wal2json output plug-in must be installed
What is this output plug-in, you ask? The output plug-in is code that converts the event to a standard external format from Postgres’ internal format. There are a couple of different output plug-in, but we’ll use the wal2json plugin in this exercise. It generates a json doc for every change.
We’ll use the Debezium PostgreSQL docker image which has both of these configured. In terminal 1, run the following command:
docker run --name pg -e POSTGRES_PASSWORD=dummy -p 5432:5432 debezium/postgres
You should see the DB coming up in a minute or so.
In terminal 2, run the following command to confirm that you’re able to connect to the DB:
docker run -it -e PGPASSWORD=dummy -e PGUSER=postgres -e PGDATABASE=postgres -e PGHOST=host.docker.internal debezium/postgres psql
You should see the familiar psql prompt. Note that the PGHOST parameter is set for a MacOS. You may need to figure out the right hostname for your docker environment.
Now in terminal 2 inside the psql prompt, run the following statements:
CREATE TABLE foo (id int primary key, name text);
alter table foo replica identity full;
SELECT 'init' FROM pg_create_logical_replication_slot('test_slot', 'wal2json');
We simply created a table and set something called replica identity. This allows tracking of complete changes to the row when updating it. We also created a replication slot called test_slot with the wal2json output plug-in registered. Now we just need a receiver who will connect to this slot and dump out the events. We’ll run a simple receiver in terminal 3.
docker run -it -e PGPASSWORD=dummy -e PGUSER=postgres -e PGHOST=host.docker.internal debezium/postgres pg_recvlogical -d postgres --slot test_slot --start -o pretty-print=1 -f -
This is what the data flow looks like in this little experiment.
Now in terminal 2, run a few insert, update and delete statements like:
insert into foo values(1, 'a');
insert into foo values(2, 'b');
update foo set name = 'c' where id = 1;
delete from foo where name = 'b';
In terminal 3, you should see the events show up like this:
Yay! We’ve got a toy version of an event stream going.
Getting it into production
As we all know, running stuff in production is a whole different ballgame. You’re likely using AWS or some other cloud provider. You’ll need to look up their documentation on how to get the DB ready.
I recommend you use format-version 2 of the wal2json plugin. The format-version 1 combines all changes in a single DB transaction into a single large json doc which will likely crash your event processor.
Also, note that the presence of the slot with unread entries prevents PG from truncating logs. This cause disk size to grow and grow. So make sure you read all the events or drop the slot if you’re not using it. In AWS, you can setup alarms for this so that you don’t accidentally kill your DB.
You’d likely want your event processor to normalize the events and put them in a message broker like RabbitMQ or Kafka. You can then have multiple downstream consumers - e.g. an email sender, PDF writer etc.
Writing the event processor is a little tricky. We wrote one in Python on top of psycopg2 that is battle-tested that writes to RabbitMQ. We’re looking for others who may be interested in this functionality and we can work together on open-sourcing it under MIT license. If you’re interested in working with us on this exercise, do leave a comment or hit me on twitter at @k2_181.
In conclusion
Using logical decoding, it is possible to generate an event stream directly from the WAL and not requiring any application-level code.
Hi, I would like to contribute to open sourcing this
Noice!!