Pub/Sub es un gran mecanismo para establecer la comunicación de microservicios. Al centrarse en definir topics buenos y fiables, los microservicios pueden existir en un contexto completamente desacoplado. AWS ofrece dos excelentes servicios que ayudan a construir sistemas basados en pub/sub que a su vez son altamente escalables, confiables y recuperables: Simple Notification Service y Simple Queue Service.

Introducción a la comunicación de microservicios

Cuando el servicio A se comunica con B a través de una llamada directa, podemos inferir que hay una dependencia en A acoplada a B. Si B no está disponible temporalmente o tiene una carga pesada, podría interrumpir la estabilidad de A.

Escribir microservicios acoplados generalmente significa lidiar con lo peor de ambos mundos. Si, en cambio, el sistema se construyera como un monolito, aún estaría acoplado entre sí, pero sin los problemas y beneficios de un sistema distribuido. En un monolito, un componente nunca está disponible o inalcanzable y la comunicación está a solo una llamada de función o instanciación de objeto de distancia. Al dividir dos servicios, se vuelven independientes entre sí a costa de crear un sistema distribuido.

Con un sistema de publicador / suscriptor, existe la posibilidad de que el sistema B rastree (o se suscriba) a los eventos que suceden en A sin un acoplamiento directo. El sistema Pub / Sub se convierte en un broker y sirve como un contrato entre ambas partes. Si el sistema A está empezando a deteriorarse, puede ser reemplazado por una versión más nueva, más inteligente y mejor sin afectar a B si los mensajes que publica A aún pueden respetarse o mantenerse compatibles con versiones anteriores.

Por qué SQS?

AWS Simple Notification Service es donde los editores notificarán los eventos que sucedan. Esa notificación ocurre sobre un topic. A partir de este momento, SNS puede notificar a los suscriptores por http, https, correo electrónico, sms, sqs y lambda. Pero con el protocolo HTTP (S), dos servicios podrían comunicarse entre sí a través de SNS sin la necesidad de SQS en absoluto. Entonces, ¿por qué incorporar SQS a la mezcla?

Cuando se utiliza HTTP (S) como protocolo de suscripción para SNS, AWS ofrecerá reintentos en caso de que no se pueda localizar al suscriptor o responda con un error. Sin embargo, existen algunas limitaciones sobre lo que se puede lograr con este modelo. Especial atención si el suscriptor tiene un error y no puede procesar la notificación en absoluto. Es posible que se pierdan muchas notificaciones hasta que se publique un parche. Sin embargo, al suscribir un SQS a un topic de SNS, AWS garantizará la entrega a SQS. Quizás si el trabajador / consumidor de SQS tiene un error, no podrá procesar el mensaje, pero el mensaje puede permanecer en la cola hasta 14 días y aún puede enviarse a una cola de mensajes no entregados. Este mecanismo ofrece una opción mucho más segura para no perder nunca mensajes. Algunos microservicios funcionan con la premisa de que los mensajes siempre se pueden reproducir, pero ¿qué pasa si algunos suscriptores ya procesaron esos eventos? Por supuesto, los sistemas de reproducción son una opción válida siempre que los suscriptores puedan ser idempotentes. En este artículo, me centraré principalmente en evitar la pérdida de mensajes.

Laravel Worker

Laravel tiene un sistema de cola listo para usar y ofrece un controlador nativo que puede comunicarse con Amazon SQS. Al enviar un mensaje a SQS, Laravel almacenará suficiente información dentro del mensaje que le permitirá al worker comprender y procesar el job (mensaje) correctamente. Sin embargo, al obtener un mensaje de SQS que fue insertado por SNS, el mensaje carecerá del atributo de job que utiliza el sistema del worker para saber qué trabajo procesar. Si solo dos sistemas se comunican a través de SNS, se podría argumentar que el sistema del editor podría especificar la clave del job para que el suscriptor de Laravel sepa qué hacer. Desafortunadamente, esa es una fuerte violación del sistema Pub / Sub porque el publisher debería conocer un detalle de implementación del suscriptor. Tampoco funciona con dos o más suscriptores, a menos que queramos que todos nuestros suscriptores tengan el mismo espacio de nombres de trabajo para procesar un evento del topic.

Para evitar esto, una opción interesante es vincular el Topic ARN de SNS a una clase de job específico. Siempre que se envíe un nuevo mensaje del tema autenticado por el usuario a SQS, el sistema de workers de Laravel debe usar la clase ProcessUserAuthenticatedJob. Para lograr esto, es necesario crear un controlador de cola Laravel personalizado. Decidí llamar a este controlador personalizado sns para dar la sensación de que funcionan los mensajes de AWS SNS (a través de SQS).

Implementación

En un proveedor de servicios, podemos conectar un evento afterResolving en el contenedor de Laravel. De esa manera, siempre que la clase QueueManager se resuelva fuera del contenedor por primera vez, podemos asegurarnos de agregarle nuestro controlador personalizado.

La clase JobMap es responsable de asignar un ARN de tema a un administrador de jobs.

La devolución de llamada dada al QueueManager con el nombre de sns debería devolver una implementación de Illuminate\Queue\Connectors\ConnectorInterface. Aquí decidí extenderme desde SqsConnector para aprovechar el método getDefaultConfiguration() ya proporcionado por el sistema Laravel Worker. No hay ninguna necesidad real de esta herencia, excepto para tomar prestado ese método.

<?php

namespace App\Components\Queue;

use Illuminate\Config\Repository;
use Illuminate\Queue\QueueManager;
use Illuminate\Support\ServiceProvider;

class SnsQueueServiceProvider extends ServiceProvider
{
    public function register()
    {
        $this->app->afterResolving(QueueManager::class, function (QueueManager $manager) {
            $config = $this->app->make(Repository::class);

            $manager->addConnector('sns', function () use ($config) {
                $map = new JobMap($config->get('queue.map'));

                return new SnsConnector($map);
            });
        });
    }
}

Según la interfaz de Laravel, un conector de cola debería devolver un objeto de cola. Como deseamos trabajar con mensajes de Sqs, es mucho más fácil simplemente heredar del objeto SqsQueue de Laravel y anular el método pop.

<?php

namespace App\Components\Queue;

use Exception;

class JobMap
{
    private $map;

    public function __construct(array $map)
    {
        $this->map = $map;
    }

    public function fromTopic(string $topic): string
    {
        $job = array_search($topic, $this->map);

        if (! $job) {
            throw new Exception("Topic $topic no fue mapeado a ningún job");
        }

        return $job;
    }
}

Todo lo demás para que el objeto Queue funcione ya viene de la caja de la clase SqsQueue. Tenga en cuenta que hemos cargado el queue.map desde el proveedor de servicios, lo hemos enviado a SnsConnector, SnsQueue y ahora lo estamos inyectando en SnsJob.

Nota: podríamos haber utilizado fácilmente una estrategia de Fachada / Localizador de servicios para resolver automáticamente el mapeo desde el interior del job y, personalmente, no tengo ningún problema con eso. Sin embargo, si está trabajando en un equipo diverso al que no le gusta Facade, la inyección de dependencia explícita permite que la clase Job no dependa de un recurso externo ni esté acoplado a él.

Por último, con la clase SnsJob podemos extender desde SqsJob proporcionado por Laravel al flujo de trabajo normal de los mensajes de trabajo de Sqs al mismo tiempo que anulamos el método de carga útil para inyectar el atributo del trabajo. Ahí es donde entra en juego la configuración de queue.map. Siempre que un tema de Sns específico inyecta un mensaje en la cola, podemos indicarle a Laravel qué trabajo debe ejecutar.

<?php

namespace App\Components\Queue;

use Aws\Sqs\SqsClient;
use Illuminate\Queue\Connectors\ConnectorInterface;
use Illuminate\Queue\Connectors\SqsConnector;
use Illuminate\Support\Arr;

class SnsConnector extends SqsConnector implements ConnectorInterface
{
    private $map;

    public function __construct(JobMap $map)
    {
        $this->map = $map;
    }

    public function connect(array $config)
    {
        $config = $this->getDefaultConfiguration($config);

        if ($config['key'] && $config['secret']) {
            $config['credentials'] = Arr::only($config, ['key', 'secret', 'token']);
        }

        return new SnsQueue(
            new SqsClient($config), $config['queue'], $this->map
        );
    }
}

Para asegurarnos de que todo funcione según lo previsto, podemos aprovechar localstack con un contenedor SQS localmente. La configuración de Localstack está fuera del alcance de este artículo y no se tratará. La prueba que me gustaría ejecutar se parece un poco a la siguiente clase.

<?php

namespace App\Components\Queue;

use Aws\Sqs\SqsClient;
use Illuminate\Queue\SqsQueue;

class SnsQueue extends SqsQueue
{
    private $map;

    public function __construct(SqsClient $sqs, string $default, JobMap $map)
    {
        parent::__construct($sqs, $default);

        $this->map = $map;
    }

    public function pop($queue = null)
    {
        $response = $this->sqs->receiveMessage([
            'QueueUrl' => $queue = $this->getQueue($queue),
            'AttributeNames' => ['ApproximateReceiveCount'],
        ]);

        if (! is_null($response['Messages']) && count($response['Messages']) > 0) {
            return new SnsJob(
                $this->container, $this->sqs, $response['Messages'][0],
                $this->connectionName, $queue, $this->map
            );
        }
    }
}

El método setUp envía un mensaje al contenedor localstack SQS. El plano de prueba se puede traducir como:

  • Organizar: agregue el tema SNS al queue.map en el sistema de configuración de Laravel
  • Actuar: Invoca el sistema Trabajador de cola de Laravel.
  • Afirmar: Verifique que el mensaje se haya obtenido correctamente.

El contenido de message.json se puede ver a continuación.

<?php

namespace App\Components\Queue;

use Exception;
use Aws\Sqs\SqsClient;
use Illuminate\Container\Container;
use Illuminate\Queue\Jobs\SqsJob;

class SnsJob extends SqsJob
{
    private $map;

    public function __construct(Container $container, SqsClient $sqs, array $job, string $connectionName, string $queue, JobMap $map)
    {
        parent::__construct($container, $sqs, $job, $connectionName, $queue);
        $this->map = $map;
    }

    public function payload()
    {
        $payload = parent::payload();

        if (! isset($payload['TopicArn'])) {
            throw new Exception(sprintf('Message with id [%s] does not have Topic ARN', $this->getJobId()));
        }

        $payload['job'] = $this->map->fromTopic($payload['TopicArn']);

        $payload['data'] = $payload['Message'];

        return $payload;
    }
}

La última pieza que falta es el rasgo ProvidesSqs que ayuda a configurar la integración localstack para tantas clases de prueba como necesitemos.

<?php

namespace Tests\Unit\SnsQueue;

use Illuminate\Config\Repository;
use Illuminate\Contracts\Queue\Job;
use Tests\TestCase;
use Tests\Utils\ProvidesSqs;

class SnsQueueTest extends TestCase
{
    use ProvidesSqs;

    protected function setUp(): void
    {
        parent::setUp();

        FakeHandler::$data = null;

        $this->sendMessageToSqs(file_get_contents(__DIR__ . '/message.json'));
    }

    /**
     * @test
     */
    public function work_sqs_message_published_by_sns()
    {
        $config = $this->app->make(Repository::class);

        $config->set('queue.map', [
            FakeHandler::class => 'arn:aws:sns:eu-west-1:111111111111:my-topic-arn',
        ]);

        $this->artisan('queue:work', ['--once' => true]);

        self::assertEquals(
            'Laravel can work SQS messages published by SNS!',
            FakeHandler::$data->content
        );
    }
}

class FakeHandler
{
    public static $data;

    public function fire(Job $job, $data)
    {
        $data = json_decode($data);

        self::$data = $data;
    }
}

Cuando se invoca sendMessageToSqs, hará lo siguiente:

  • Cree un cliente de AWS Sqs.
  • Crea un nuevo Sq en localstack.
  • Purgue la cola en caso de que haya mensajes de pruebas anteriores.
  • Enviar un mensaje a la cola
  • Configure el sistema Laravel Queue para usar el controlador sns de forma predeterminada.
  • Configure el sistema Laravel Queue para trabajar con los mensajes de esta cola.

Eso es todo.

Categorized in:

Tagged in:

, , ,