thesis/pgmq

A non-blocking php client for Postgres Message Queue (PGMQ).

Fund package maintenance!
www.tinkoff.ru/cf/5MqZQas2dk7

Installs: 0

Dependents: 0

Suggesters: 0

Security: 0

Stars: 1

Watchers: 0

Forks: 0

Open Issues: 0

pkg:composer/thesis/pgmq

0.1.x-dev 2025-11-17 16:15 UTC

This package is auto-updated.

Last update: 2025-11-17 21:07:30 UTC


README

Non-blocking php client for pgmq.

Installation

composer require thesis/pgmq

Contents

Create queue

<?php

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');

Create unlogged queue

<?php

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createUnloggedQueue($pg, 'events');

Create partitioned queue

<?php

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createPartitionedQueue(
    pg: $pg,
    queue: 'events',
    partitionInterval: 10000,
    retentionInterval: 100000,
);

List queues

<?php

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

foreach (Pgmq\listQueues($pg) as $queue) {
    $md = $queue->metadata();
    var_dump($md);
}

List queue metrics

<?php

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

foreach (Pgmq\metrics($pg) as $metrics) {
    var_dump($metrics);
}

List queue metadata

<?php

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

foreach (Pgmq\listQueueMetadata($pg) as $md) {
    var_dump($md);
}

Drop queue

<?php

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$queue->drop();

Purge queue

<?php

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
var_dump($queue->purge());

Send message

<?php

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$messageId = $queue->send(new Pgmq\SendMessage('{"id": 1}', '{"x-header": "x-value"}'));

Send message with relative delay

<?php

use Thesis\Pgmq;
use Amp\Postgres;
use Thesis\Time\TimeSpan;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$messageId = $queue->send(
    new Pgmq\SendMessage('{"id": 1}', '{"x-header": "x-value"}'),
    TimeSpan::fromSeconds(5),
);

Send message with absolute delay

<?php

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$messageId = $queue->send(
    new Pgmq\SendMessage('{"id": 1}', '{"x-header": "x-value"}'),
    new \DateTimeImmutable('+5 seconds'),
);

Send batch

<?php

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$messageIds = $queue->sendBatch([
    new Pgmq\SendMessage('{"id": 1}', '{"x-header": "x-value"}'),
    new Pgmq\SendMessage('{"id": 2}', '{"x-header": "x-value"}'),
]);

Send batch with relative delay

<?php

use Thesis\Pgmq;
use Amp\Postgres;
use Thesis\Time\TimeSpan;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$messageIds = $queue->sendBatch(
    [
        new Pgmq\SendMessage('{"id": 1}', '{"x-header": "x-value"}'),
        new Pgmq\SendMessage('{"id": 2}', '{"x-header": "x-value"}'),
    ],
    TimeSpan::fromSeconds(5),
);

Send batch with absolute delay

<?php

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$messageIds = $queue->sendBatch(
    [
        new Pgmq\SendMessage('{"id": 1}', '{"x-header": "x-value"}'),
        new Pgmq\SendMessage('{"id": 2}', '{"x-header": "x-value"}'),
    ],
    new \DateTimeImmutable('+5 seconds'),
);

Read message

<?php

use Thesis\Pgmq;
use Amp\Postgres;
use Thesis\Time\TimeSpan;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$message = $queue->read(TimeSpan::fromSeconds(20));

Read batch

<?php

use Thesis\Pgmq;
use Amp\Postgres;
use Thesis\Time\TimeSpan;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$message = $queue->readBatch(10, TimeSpan::fromSeconds(20));

Pop message

<?php

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$message = $queue->pop();

Read batch with poll

<?php

use Thesis\Pgmq;
use Amp\Postgres;
use Thesis\Time\TimeSpan;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$messages = $queue->readPoll(
    batch: 10,
    maxPoll: TimeSpan::fromSeconds(5),
    pollInterval: TimeSpan::fromMilliseconds(250),
);

Set visibility timeout

<?php

use Thesis\Pgmq;
use Amp\Postgres;
use Thesis\Time\TimeSpan;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$message = $queue->read();

if ($message !== null) {
    // handle the message

    $queue->setVisibilityTimeout($message->id, TimeSpan::fromSeconds(10));
}

Archive message

<?php

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$message = $queue->read();

if ($message !== null) {
    $queue->archive($message->id);
}

Archive batch

<?php

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$messages = [...$queue->readBatch(5)];

if ($messages !== []) {
    $queue->archiveBatch(array_map(
        static fn(Pgmq\Message $message): int => $messages->id),
        $messages,
    );
}

Delete message

<?php

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$message = $queue->read();

if ($message !== null) {
    $queue->delete($message->id);
}

Delete batch

<?php

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$messages = [...$queue->readBatch(5)];

if ($messages !== []) {
    $queue->deleteBatch(array_map(
        static fn(Pgmq\Message $message): int => $messages->id),
        $messages,
    );
}

Enable notify insert

<?php

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$channel = $queue->enableNotifyInsert(); // postgres channel to listen is returned

Disable notify insert

<?php

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$queue->disableNotifyInsert();

Consume messages

This functionality is not a standard feature of the pgmq extension, but is provided by the library as an add-on for reliable and correct processing of message batches from the queue, with the ability to ack, nack (with delay) and archive (term) messages from the queue.

  1. First of all, create the extension if it doesn't exist yet:
<?php

declare(strict_types=1);

use Thesis\Pgmq;

Pgmq\createExtension($pg);
  1. Then create a queue:
<?php

declare(strict_types=1);

use Thesis\Pgmq;

Pgmq\createExtension($pg);
Pgmq\createQueue($pg, 'events');
  1. Next, create the consumer object:
<?php

declare(strict_types=1);

use Thesis\Pgmq;

Pgmq\createExtension($pg);
Pgmq\createQueue($pg, 'events');

$consumer = Pgmq\createConsumer($pg);
  1. Now we can proceed to configure the queue consumer handler:
<?php

declare(strict_types=1);

use Thesis\Pgmq;

Pgmq\createExtension($pg);
Pgmq\createQueue($pg, 'events');

$consumer = Pgmq\createConsumer($pg);

$context = $consumer->consume(
    static function (array $messages, Pgmq\ConsumeController $ctrl): void {
        var_dump($messages);
        $ctrl->ack($messages);
    },
    new Pgmq\ConsumeConfig(
        queue: 'events',
    ),
);

Through Pgmq\ConsumeConfig you can configure:

  • the batch size of received messages;
  • the message visibility timeout;
  • enable monitoring for queue inserts via the LISTEN/NOTIFY mechanism;
  • and set the polling interval.

At least one of these settings — listenForInserts or pollTimeout — must be specified.

Through the Pgmq\ConsumeController, you can:

  • ack messages, causing them to be deleted from the queue;
  • nack messages with a delay, setting a visibility timeout for them;
  • terminate processing (when a message can no longer be retried), resulting in them being archived;
  • stop the consumer.

Since receiving messages and acking/nacking them occur within the same transaction, for your own database queries you must use the ConsumeController::$tx object to ensure exactly-once semantics for message processing.

<?php

declare(strict_types=1);

use Thesis\Pgmq;

Pgmq\createExtension($pg);
Pgmq\createQueue($pg, 'events');

$consumer = Pgmq\createConsumer($pg);

$context = $consumer->consume(
    static function (array $messages, Pgmq\ConsumeController $ctrl): void {
        $ctrl->tx->execute('...some business logic');
        $ctrl->ack($messages);
    },
    new Pgmq\ConsumeConfig(
        queue: 'events',
    ),
);

Using ConsumeContext, you can gracefully stop the consumer, waiting for the current batch to finish processing.

<?php

declare(strict_types=1);

use Thesis\Pgmq;
use function Amp\trapSignal;

Pgmq\createExtension($pg);
Pgmq\createQueue($pg, 'events');

$consumer = Pgmq\createConsumer($pg);

$context = $consumer->consume(
    static function (array $messages, Pgmq\ConsumeController $ctrl): void {
        $ctrl->tx->execute('...some business logic');
        $ctrl->ack($messages);
    },
    new Pgmq\ConsumeConfig(
        queue: 'events',
    ),
);

trapSignal([\SIGINT, \SIGTERM])

$context->stop();
$context->awaitCompletion();

Or stop all current consumers using $consumer->stop():

<?php

declare(strict_types=1);

use Thesis\Pgmq;
use function Amp\trapSignal;

Pgmq\createExtension($pg);
Pgmq\createQueue($pg, 'events');

$consumer = Pgmq\createConsumer($pg);

$context = $consumer->consume(...);

trapSignal([\SIGINT, \SIGTERM])

$consumer->stop();
$context->awaitCompletion();