leontrolski – postgres as queue

leontrolski – postgres as queue

⇦
The group I have been working in for the final 12 months or so have had nice success utilizing Postgres-as-queue. We have
managed to keep away from the next:
- Infrastructure/configuration – I might estimate every line of terraform to be an order of
magnitude extra danger/upkeep/faff than every line of Python. - Sluggish/crunky multi-container testing.
- The necessity for experience in something past Python + Postgres.
- Elaborate retry/dead-letter-queue mechanisms.
- Fascinated by knowledge serialisation over the wire.
In a nut shell, it is easier – there are simply means fewer shifting elements.
As we’re utilizing a monolithic codebase with an affordable ORM, we even have not one of the CMD-clickability points that plague ad-hoc SNS/PubSub/Kafka architectures.
The primary objection to doing Postgres-as-queue is a efficiency one, alongside the strains of “do not put
pointless further load on the db/do not enhance row churn”. Let’s assemble an affordable instance demonstrating that queue utilization should not introduce a lot further load in lots of circumstances. As all the time, earlier than following
anybody’s recommendation on this type of stuff, profile, profile, profile!
Within the (pretty uncommon) case that you simply’re doing many duties, none of which contact the db (say developing
and sending emails from static knowledge), you may ignore this weblog publish and get on with life. In one other case, you
could also be working at some loopy scale the place these
downsides begin making use of, once more, run the numbers and profile.
Lets say the db load launched by a hypothetical activity – I at present work within the power business, so
the instance could be: a buyer submits a meter studying, we queue a activity to jot down the studying and replace some
account stability – the load seems like:
- Obtain the message from the dealer.
- Make 3 main key
SELECT
s totalling 0.3ms db time. - Make 2 barely hairier
SELECT
s with someJOIN
s/GROUP BY
s totalling 4ms
db time. - Carry out 2
UPDATE
s totalling 2ms db time (and a few row churn). - ACK the message.
Within the new Postgres-as-queue world, this seems like:
- Ballot for a message that wants processing, on discovering one,
UPDATE
the standing, totalling 1ms db time.
- Make 3 main key
SELECT
s totalling 0.3ms db time. - Make 2 barely hairier
SELECT
s with someJOIN
s/GROUP BY
s totalling 4ms
db time. - Carry out 2
UPDATE
s totalling 2ms db time (and a few row churn). - ACK the message by
UPDATE
ing the standing totalling 0.5ms db time (and a few row churn).
On this instance, our db time has gone up from 6.3ms per activity to 7.8ms. These figures are completely fictional, however
we have demonstrated an affordable mind-set concerning the overhead.
If we had only one employee polling for duties, we may ignore locking and transactions, however we wish to have many, so
we now have to make use of FOR UPDATE SKIP LOCKED
. This atomically locks the row on the level the place it selects it –
there’s dialogue of ins and outs on this excellent blog post
by 2ndQuadrant.
For our instance implementation, we now have an occasion desk that appears like:
id | standing | updated_at
------------------------------------------
UUID | SMALLINT | TIMESTAMP WITH TIME ZONE
We’ve got an INDEX
on (standing, updated_at)
. In actuality we now have many tables, one per
queue.
Our polling staff run a loop like:
for _ in shutdown_handler.loop():
event_meta = get_event_to_process(
where_status_eq=TO_PROCESS,
set_status_to=PROCESSING,
)
if event_meta is None:
time.sleep(x)
proceed
attempt:
set_status(event_meta, PROCESSED)
besides:
set_status(event_meta, ERRORED, ...)
And get_event_to_process(...)
performs SQL alongside the strains of:
WITH ids AS MATERIALIZED (
SELECT id FROM event_queue
WHERE standing = {where_status_eq}
ORDER BY updated_at
LIMIT 1
FOR UPDATE SKIP LOCKED
)
UPDATE event_queue
SET standing = {set_status_to}
WHERE id = ANY(SELECT id FROM ids)
RETURNING id
Be aware using MATERIALISED
to power the CTE to judge eagerly earlier than the UPDATE
(apart: I might like a postgres knowledgeable to say that this question is actually race situation free).
set_status(...)
simply performs an replace of standing
and updated_at
for a
specific row.
Since you’re merely interacting with a persistent desk quite that some black-box queue, it is simple so as to add
bells and whistles as your necessities change.
Retrying
Typically duties fail/timeout. We’ve got jobs that periodically ballot for previous duties which have bizarre statuses and try
to retry them as acceptable.
Ignore earlier than
We’ve got another timestamp column on our event_queue
tables – ignore_before
. That is helpful
in two situations:
- We are able to characterize timeouts (eg. “ship an e-mail if we did not obtain inbound x after 10 days”) as
common ol’ occasions. - We wish to batch up sure kinds of outbound occasion, so we are able to set their
ignore_before
to “at
the following complete hour” and bundle up a load of occasions at dispatch-time.
Cruft cleanup
You might have considered trying have cron jobs that delete queue knowledge older than a while.
Shutdown handler
The next is a pleasant helper for polling loops that aids with shutdown dealing with, and occasions itself out after an hour
of no exercise.
import os, sign, threading
INTERRUPT_TIMEOUT = 60 * 60
work_done: threading.Occasion
def kill_after_timeout() -> None:
international work_done
work_done = threading.Occasion()
if work_done.wait(INTERRUPT_TIMEOUT):
return
os.kill(os.getpid(), sign.SIGKILL)
class ShutdownHandler:
def __init__(self, max_loops: int | None = None) -> None:
self.exit_flag = False
sign.sign(sign.SIGINT, self.signal_handler)
sign.sign(sign.SIGTERM, self.signal_handler)
def signal_handler(self, sign: int, body: FrameType | None) -> None:
self.exit_flag = True
def loop(self) -> Iterator[None]:
international work_done
whereas True:
if self.exit_flag():
work_done.set()
return
threading.Thread(goal=kill_after_timeout, daemon=True).begin()
yield None
work_done.set()