Implement a concurrent job queue in PostgreSQL.

Motivation

Message brokers such as RabbitMQ have first in first out (FIFO) queues. That means that jobs are executed in the order that they are placed on the queue.

With Postgres, we can implement more complex ordering of jobs. For example, we could manipulate the message order by adding more columns to store job data and changing the query to fetch the job. In addition, we can disable and enable jobs in the database at any time.

A drawback is that using Postgres in this way does not scale as well to millions of messages a second since each consumer must query and update the database. If the message rate is not an issue and we want more control over job execution order, using Postgres is a good candidate for implementing a job queue.

Schema

Let’s consider a simple example where order of importance of jobs is the same as the order of their chunk_idx's.

1
2
3
4
create table jobs (
  chunk_idx integer not null primary key,
  is_complete boolean default false,
);

Get Job

Here is an example query to get a job and mark it is as complete. See if you can figure out what is wrong or inefficient before scrolling to the next section.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
-- Query 1
begin;
update jobs
set is_complete = true
where chunk_idx = (
  select chunk_idx
  from jobs
  where not is_complete
  order by chunk_idx
  limit 1
)
returning chunk_idx;
commit;

Example

Usually we won’t be able to complete a job instantaneously so let’s add a sleep before committing the update to simulate a slow job. Assuming we have jobs in the database, what would happen if we run this new query (Query 1 with sleep) and then our original query (Query 1) before Query 1 with sleep completes?

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
-- Query 1 with sleep
begin;
update jobs
set is_complete = true
where chunk_idx = (
  select chunk_idx
  from jobs
  where not is_complete
  order by chunk_idx
  limit 1
)
pg_sleep(10); -- This is where we would do work
returning chunk_idx;
commit;

Both return at a delay and return the same chunk_idx. This means that jobs will be duplicated if we have multiple consumers.

Fixing Duplication

Let’s see if we can fix the job duplication so that we don’t have consumers doing the same job.

The issue is that in Query 1 with sleep, the selected row is not updated until the transaction completes. When we run Query 1, it doesn’t block until it tries to update the row. When it gets blocked, it already has selected the same row as the Query 1 with sleep. After the Query 1 with sleep transaction completes, Query 1 will do its update1.

In Postgres, if we select ... for update we can lock the row so that other select for update's will have to wait2.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
-- Query 2
begin;
update jobs
set is_complete = true
where chunk_idx = (
  select chunk_idx
  from jobs
  where not is_complete
  order by chunk_idx
  for update
  limit 1
)
returning chunk_idx;
commit;
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
-- Query 2 with sleep
begin;
update jobs
set is_complete = true
where chunk_idx = (
  select chunk_idx
  from jobs
  where not is_complete
  order by chunk_idx
  for update
  limit 1
)
pg_sleep(10);
returning chunk_idx;
commit;
Again we can run Query 2 with sleep and then Query 2. This time, the two queries will complete different jobs. In addition, if the first query aborts before it is complete, the second query will still grab the correct chunk.

Although we fixed multiple consumers doing the same job, the second transaction waits for the first transaction before it completes which means that consumers are still not doing jobs concurrently.

Concurrency

In Postgres 9.5, the SKIP LOCKED feature was added. This gives us exactly what we want which is to skip a job if it is currently locked for an update.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
-- Query 3
begin;
update jobs
set is_complete = true
where chunk_idx = (
  select chunk_idx
  from jobs
  where not is_complete
  order by chunk_idx
  for update skip locked
  limit 1
)
returning chunk_idx;
commit;
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
-- Query 3 with sleep
begin;
update jobs
set is_complete = true
where chunk_idx = (
  select chunk_idx
  from jobs
  where not is_complete
  order by chunk_idx
  for update skip locked
  limit 1
)
pg_sleep(10);
returning chunk_idx;
commit;
This doesn’t block so if we run Query 3 with sleep which locks job 1 and then Query 3, Query 3 will complete job 2 without waiting for other jobs to complete. If Query 3 with sleep fails, then the next consumer to get work would pick up job 1.

Takeaways

By using the SELECT FOR UPDATE SKIP LOCKED command, we can implement a job queue in Postgres which assigns the most important job at all times and can be used concurrently. By using Postgres for our job queue, we can implement more complex logic for how priority of jobs should be calculated compared to a message broker.

Acknowledgements

The idea and the queries in this post are from a company technical talk by Nibir Bora and Tim Higgins. The resources used to develop the talk and original code can be found at https://github.com/nbir/pg-queue-talk


  1. The UPDATE command acquires a ROW EXCLUSIVE lock which conflicts with other ROW EXCLUSIVE locks on the row but not SELECT’s which acquires a ACCESS SHARE lock. See PSQL Locking. ↩︎

  2. The SELECT FOR UPDATE command acquires a row level FOR UPDATE lock which prevents other FOR UPDATE locks from being acquired on that row. So if two queries try to SELECT FOR UPDATE on the same row, the second query will need to wait until the first one relinquishes the lock. See PSQL Row Locking. ↩︎