Очереди сообщений в PostgreSQL с использованием PgQ | | ДОСТУПНЫЙ ОТДЫХ
Интересное

Очереди сообщений в PostgreSQL с использованием PgQ

Очереди сообщений в PostgreSQL с использованием PgQ

Очереди сообщений используются для выполнения: отложенных операций, взаимодействия сервисов между собой, «batch processing» и т.д. Для организации подобных очередей существуют специализированные решения, такие как: RabbitMQ, ActiveMQ, ZeroMQ и тд, но часто бывает, что в них нет большой необходимости, а их установка и поддержка причинит больше боли и страданий, чем принесет пользы. Допустим, у вас есть сервис, при регистрации в котором пользователю отправляется email для подтверждения, и, если вы используете Postgres, то вам повезло — в Postgres, почти из коробки, есть расширение PgQ, которое сделает всю грязную работу за вас.

В этой статье я расскажу об организации очередей сообщений (задач) в PostgreSQL с использованием расширения PgQ. Эта статья будет полезна, если вы еще не использовали PgQ или используете самописные очереди поверх Postgres.

Зачем вообще нужен PgQ, если можно просто создать табличку и записывать туда задачи? Казалось бы, можно, но вам придется учесть паралельный доступ к задачам, возможные ошибки (что будет, если процесс обрабатывающий задачу, упадет?), а также производительность (PgQ очень быстрый, а самописные решения, как правило, нет, особенно если транзакция в базе не закрывается во время всего выполнения задачи), но самое главное, почему на мой взгляд надо использовать PgQ, это то, что PgQ уже написан и работает, а самописное решение еще надо написать.
(UPD: т.к. PgQ работает поверх Postgres, все прелести транзакций можно использовать и в нем)

Но у PgQ есть один огромный минус — отсутствие документации, этот недостаток я и пытаюсь компенсировать этой статьей.

Устройство
PgQ состоит из частей (как минимум 2-х): 1 — расширение pgq для postgres, 2 — демон pgqd (об их установке чуть позже).

Все взаимодействие с очередью осуществляется с помощью функций внутри Postgres.

Например, чтобы создать очередь, надо выполнить

select * from pgq.create_queue({имя очереди} text);
После того как очередь создана, в нее можно добавлять сообщения

select * from pgq.insert_event({имя очереди} text, {тип события} text, {информация о событии} text);
Теперь надо научиться получать записанные сообщения. Для этого существует такая сущность, как «consumer» (я буду писать консьюмер), который получает не сообщения (события), а «бачи» (batch). Бач — это группа подряд идущих сообщений, бачи создаются с помощью pgqd. Периодически (параметр «ticker_period» в конфигурационном файле) pgqd берет все накопленные сообщения и записывает в новый бач. Важно, если pgqd не работает, то новые бачи не создаются, а значит, консьюмерам нечего читать, также, если pgqd долго не работал, а потом включился, то он создаст один большой бач из сообщений, накопленных за это время, поэтому pgqd не следует просто так отключать.

Читать  Интернет-пользователи делятся фотографиями, изображающими жизненные циклы ягод и растений (11 фото)

Регистрация консьюмера (Важно! консьюмер будет получать события, записанные только после его регистрации, поэтому следует вначале создать консьюмер, а только потом уже писать события):

select * from pgq.register_consumer({имя очереди} text, {имя консьюмера} text);
(аналогично pgq.unregister_consumer)
Каждый консьюмер получит абсолютно все события, произошедшие после его создания (даже уже обработанные другим консьюмером), это значит, что скорее всего вам нужен всего один консьюмер на одну очередь. Далее я расскажу, как при этом разделить нагрузку на несколько серверов.

Для получения бача нужно вначале узнать его ID:

select * from pgq.next_batch({имя очереди} text, {имя консьюмера} text);
Функция может вернуть NULL, если консьюмер обработал все бачи. В этом случае нужно просто подождать, пока pgqd создаст новый бач.

При этом, пока бач не обработан, эта функция будет возвращать одно и то же значение.
Получить все события в баче можно с помощью:

select * from pgq.get_batch_events({id бача} bigint);
(Бач может быть пустым.)

Если при обработке какого-то из них возникла ошибка, то можно попробовать обработать это событие позже:

select * from pgq.event_retry({id бача} bigint, {id события} bigint, {количество секунд до ретрая} integer);
Чтобы сообщить об окончании бача и получить возможность приступить к новому, используется

select * from pgq.finish_batch({id бача} bigint);
Разумеется, это не все функции в расширении, рекомендую почитать pgq.github.io/extension/pgq/files/external-sql.html и github.com/pgq/pgq/tree/master/functions (в каждом файле содержится дефиниция и описание соответствующей функции).

Распределение нагрузки
Для того чтобы обрабатывать события одновременно несколькими обработчиками, существует расширение pgq_coop, которое работает повех pgq и добавляет новую сущность «субконсьюмер» (sub consumer), который будет получать все события с момента регистрации родительского консьюмера, разумеется, кроме уже обработанных.

select * from pgq_coop.register_subconsumer({имя очереди} text, {имя консьюмера} text, {имя субконсьюмера} text);
select * from pgq_coop.next_batch({имя очереди} text, {имя консьюмера} text, {имя субконсьюмера} text);
select * from pgq_coop.next_batch({имя очереди} text, {имя консьюмера} text, {имя субконсьюмера} text, {если другой субконсьюмер был неактивен в течение этого интервала, текущий может забрать у него бач} interval);
select * from pgq_coop.finish_batch({id бача} bigint);
Прочитать про все функции можно здесь.

Читать  Реалистичные акварельные картины Маркоса Беккари (11 фото)

Установка
Расширение pgq и демон pgqd входят в репозитории PGDG и в большинстве дитрибутивов устанавливаются очень просто, например, в Debian:

sudo apt install postgresql-XX-pgq3 pgqd (XX — номер версии).

pgqd — это небольшая программка, узнать про использование которой можно с помощью pgqd —help, не забудьте добавить ее в автозапуск (sudo systemctl enable pgqd.service, а конфиг по умолчанию — /etc/pgqd.ini).

Чтобы начать использовать PgQ, в базе надо просто подключить расширение:

create extension if not exists pgq;

C pgq_coop все немного сложнее, его нет в репозитории, но собрать его из исходников не составляет труда (пример для Debian):

sudo apt install postgresql-server-dev-XX
git clone https://github.com/pgq/pgq-coop.git
cd pgq-coop
sudo make install

и подключить расширение с помощью

create extension if not exists pgq_coop;
Полезные ссылки
Документация pgq
Функции pgq
Функции pgq_coop
Исходный код pgqd
github аккаунт со всеми связанными проектами
Wiki postgres-а
Источник

I heart FeedBurner