thesis / pgmq
A non-blocking php client for Postgres Message Queue (PGMQ).
Fund package maintenance!
www.tinkoff.ru/cf/5MqZQas2dk7
Installs: 0
Dependents: 0
Suggesters: 0
Security: 0
Stars: 1
Watchers: 0
Forks: 0
Open Issues: 0
pkg:composer/thesis/pgmq
Requires
- php: ^8.3
- amphp/amp: ^3.1
- amphp/pipeline: ^1.2
- amphp/postgres: ^2.1
- revolt/event-loop: ^1.0.7
- thesis/time-span: ^0.2.3
Requires (Dev)
- bamarni/composer-bin-plugin: ^1.8.2
- phpunit/phpunit: ^12.4.3
- symfony/var-dumper: ^6.4.15 || ^7.3.5
This package is auto-updated.
Last update: 2025-11-17 21:07:30 UTC
README
Non-blocking php client for pgmq.
Installation
composer require thesis/pgmq
Contents
- Create queue
- Create unlogged queue
- Create partitioned queue
- List queues
- List queue metrics
- List queue metadata
- Drop queue
- Purge queue
- Send message
- Send message with relative delay
- Send message with absolute delay
- Send batch
- Send batch with relative delay
- Send batch with absolute delay
- Read message
- Read batch
- Pop message
- Read batch with poll
- Set visibility timeout
- Archive message
- Archive batch
- Delete message
- Delete batch
- Enable notify insert
- Disable notify insert
- Consume messages
Create queue
<?php use Thesis\Pgmq; use Amp\Postgres; $pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString('')); $queue = Pgmq\createQueue($pg, 'events');
Create unlogged queue
<?php use Thesis\Pgmq; use Amp\Postgres; $pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString('')); $queue = Pgmq\createUnloggedQueue($pg, 'events');
Create partitioned queue
<?php use Thesis\Pgmq; use Amp\Postgres; $pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString('')); $queue = Pgmq\createPartitionedQueue( pg: $pg, queue: 'events', partitionInterval: 10000, retentionInterval: 100000, );
List queues
<?php use Thesis\Pgmq; use Amp\Postgres; $pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString('')); foreach (Pgmq\listQueues($pg) as $queue) { $md = $queue->metadata(); var_dump($md); }
List queue metrics
<?php use Thesis\Pgmq; use Amp\Postgres; $pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString('')); foreach (Pgmq\metrics($pg) as $metrics) { var_dump($metrics); }
List queue metadata
<?php use Thesis\Pgmq; use Amp\Postgres; $pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString('')); foreach (Pgmq\listQueueMetadata($pg) as $md) { var_dump($md); }
Drop queue
<?php use Thesis\Pgmq; use Amp\Postgres; $pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString('')); $queue = Pgmq\createQueue($pg, 'events'); $queue->drop();
Purge queue
<?php use Thesis\Pgmq; use Amp\Postgres; $pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString('')); $queue = Pgmq\createQueue($pg, 'events'); var_dump($queue->purge());
Send message
<?php use Thesis\Pgmq; use Amp\Postgres; $pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString('')); $queue = Pgmq\createQueue($pg, 'events'); $messageId = $queue->send(new Pgmq\SendMessage('{"id": 1}', '{"x-header": "x-value"}'));
Send message with relative delay
<?php use Thesis\Pgmq; use Amp\Postgres; use Thesis\Time\TimeSpan; $pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString('')); $queue = Pgmq\createQueue($pg, 'events'); $messageId = $queue->send( new Pgmq\SendMessage('{"id": 1}', '{"x-header": "x-value"}'), TimeSpan::fromSeconds(5), );
Send message with absolute delay
<?php use Thesis\Pgmq; use Amp\Postgres; $pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString('')); $queue = Pgmq\createQueue($pg, 'events'); $messageId = $queue->send( new Pgmq\SendMessage('{"id": 1}', '{"x-header": "x-value"}'), new \DateTimeImmutable('+5 seconds'), );
Send batch
<?php use Thesis\Pgmq; use Amp\Postgres; $pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString('')); $queue = Pgmq\createQueue($pg, 'events'); $messageIds = $queue->sendBatch([ new Pgmq\SendMessage('{"id": 1}', '{"x-header": "x-value"}'), new Pgmq\SendMessage('{"id": 2}', '{"x-header": "x-value"}'), ]);
Send batch with relative delay
<?php use Thesis\Pgmq; use Amp\Postgres; use Thesis\Time\TimeSpan; $pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString('')); $queue = Pgmq\createQueue($pg, 'events'); $messageIds = $queue->sendBatch( [ new Pgmq\SendMessage('{"id": 1}', '{"x-header": "x-value"}'), new Pgmq\SendMessage('{"id": 2}', '{"x-header": "x-value"}'), ], TimeSpan::fromSeconds(5), );
Send batch with absolute delay
<?php use Thesis\Pgmq; use Amp\Postgres; $pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString('')); $queue = Pgmq\createQueue($pg, 'events'); $messageIds = $queue->sendBatch( [ new Pgmq\SendMessage('{"id": 1}', '{"x-header": "x-value"}'), new Pgmq\SendMessage('{"id": 2}', '{"x-header": "x-value"}'), ], new \DateTimeImmutable('+5 seconds'), );
Read message
<?php use Thesis\Pgmq; use Amp\Postgres; use Thesis\Time\TimeSpan; $pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString('')); $queue = Pgmq\createQueue($pg, 'events'); $message = $queue->read(TimeSpan::fromSeconds(20));
Read batch
<?php use Thesis\Pgmq; use Amp\Postgres; use Thesis\Time\TimeSpan; $pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString('')); $queue = Pgmq\createQueue($pg, 'events'); $message = $queue->readBatch(10, TimeSpan::fromSeconds(20));
Pop message
<?php use Thesis\Pgmq; use Amp\Postgres; $pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString('')); $queue = Pgmq\createQueue($pg, 'events'); $message = $queue->pop();
Read batch with poll
<?php use Thesis\Pgmq; use Amp\Postgres; use Thesis\Time\TimeSpan; $pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString('')); $queue = Pgmq\createQueue($pg, 'events'); $messages = $queue->readPoll( batch: 10, maxPoll: TimeSpan::fromSeconds(5), pollInterval: TimeSpan::fromMilliseconds(250), );
Set visibility timeout
<?php use Thesis\Pgmq; use Amp\Postgres; use Thesis\Time\TimeSpan; $pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString('')); $queue = Pgmq\createQueue($pg, 'events'); $message = $queue->read(); if ($message !== null) { // handle the message $queue->setVisibilityTimeout($message->id, TimeSpan::fromSeconds(10)); }
Archive message
<?php use Thesis\Pgmq; use Amp\Postgres; $pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString('')); $queue = Pgmq\createQueue($pg, 'events'); $message = $queue->read(); if ($message !== null) { $queue->archive($message->id); }
Archive batch
<?php use Thesis\Pgmq; use Amp\Postgres; $pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString('')); $queue = Pgmq\createQueue($pg, 'events'); $messages = [...$queue->readBatch(5)]; if ($messages !== []) { $queue->archiveBatch(array_map( static fn(Pgmq\Message $message): int => $messages->id), $messages, ); }
Delete message
<?php use Thesis\Pgmq; use Amp\Postgres; $pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString('')); $queue = Pgmq\createQueue($pg, 'events'); $message = $queue->read(); if ($message !== null) { $queue->delete($message->id); }
Delete batch
<?php use Thesis\Pgmq; use Amp\Postgres; $pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString('')); $queue = Pgmq\createQueue($pg, 'events'); $messages = [...$queue->readBatch(5)]; if ($messages !== []) { $queue->deleteBatch(array_map( static fn(Pgmq\Message $message): int => $messages->id), $messages, ); }
Enable notify insert
<?php use Thesis\Pgmq; use Amp\Postgres; $pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString('')); $queue = Pgmq\createQueue($pg, 'events'); $channel = $queue->enableNotifyInsert(); // postgres channel to listen is returned
Disable notify insert
<?php use Thesis\Pgmq; use Amp\Postgres; $pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString('')); $queue = Pgmq\createQueue($pg, 'events'); $queue->disableNotifyInsert();
Consume messages
This functionality is not a standard feature of the pgmq extension, but is provided by the library as an add-on for reliable and correct processing of message batches from the queue, with the ability to ack, nack (with delay) and archive (term) messages from the queue.
- First of all, create the extension if it doesn't exist yet:
<?php declare(strict_types=1); use Thesis\Pgmq; Pgmq\createExtension($pg);
- Then create a queue:
<?php declare(strict_types=1); use Thesis\Pgmq; Pgmq\createExtension($pg); Pgmq\createQueue($pg, 'events');
- Next, create the consumer object:
<?php declare(strict_types=1); use Thesis\Pgmq; Pgmq\createExtension($pg); Pgmq\createQueue($pg, 'events'); $consumer = Pgmq\createConsumer($pg);
- Now we can proceed to configure the queue consumer handler:
<?php declare(strict_types=1); use Thesis\Pgmq; Pgmq\createExtension($pg); Pgmq\createQueue($pg, 'events'); $consumer = Pgmq\createConsumer($pg); $context = $consumer->consume( static function (array $messages, Pgmq\ConsumeController $ctrl): void { var_dump($messages); $ctrl->ack($messages); }, new Pgmq\ConsumeConfig( queue: 'events', ), );
Through Pgmq\ConsumeConfig you can configure:
- the
batchsize of received messages; - the message visibility timeout;
- enable monitoring for queue inserts via the LISTEN/NOTIFY mechanism;
- and set the polling interval.
At least one of these settings — listenForInserts or pollTimeout — must be specified.
Through the Pgmq\ConsumeController, you can:
- ack messages, causing them to be deleted from the queue;
- nack messages with a delay, setting a visibility timeout for them;
- terminate processing (when a message can no longer be retried), resulting in them being archived;
- stop the consumer.
Since receiving messages and acking/nacking them occur within the same transaction, for your own database queries you must use the ConsumeController::$tx object to ensure exactly-once semantics for message processing.
<?php declare(strict_types=1); use Thesis\Pgmq; Pgmq\createExtension($pg); Pgmq\createQueue($pg, 'events'); $consumer = Pgmq\createConsumer($pg); $context = $consumer->consume( static function (array $messages, Pgmq\ConsumeController $ctrl): void { $ctrl->tx->execute('...some business logic'); $ctrl->ack($messages); }, new Pgmq\ConsumeConfig( queue: 'events', ), );
Using ConsumeContext, you can gracefully stop the consumer, waiting for the current batch to finish processing.
<?php declare(strict_types=1); use Thesis\Pgmq; use function Amp\trapSignal; Pgmq\createExtension($pg); Pgmq\createQueue($pg, 'events'); $consumer = Pgmq\createConsumer($pg); $context = $consumer->consume( static function (array $messages, Pgmq\ConsumeController $ctrl): void { $ctrl->tx->execute('...some business logic'); $ctrl->ack($messages); }, new Pgmq\ConsumeConfig( queue: 'events', ), ); trapSignal([\SIGINT, \SIGTERM]) $context->stop(); $context->awaitCompletion();
Or stop all current consumers using $consumer->stop():
<?php declare(strict_types=1); use Thesis\Pgmq; use function Amp\trapSignal; Pgmq\createExtension($pg); Pgmq\createQueue($pg, 'events'); $consumer = Pgmq\createConsumer($pg); $context = $consumer->consume(...); trapSignal([\SIGINT, \SIGTERM]) $consumer->stop(); $context->awaitCompletion();