Jak ponownie zgłosić wiadomość do kolejki Back of a Rabbit MQ przez Spring

Nov 20 2020

Piszę konsumenta SpringBoot RabbitMQ i muszę od czasu do czasu ponownie umieścić wiadomość w kolejce do TYŁU kolejki

Pomyślałem, że tak właśnie działa negatywne potwierdzenie, ale basicReject(deliveryTag, true)po prostu umieszcza wiadomość z powrotem tak blisko jej pierwotnej pozycji w kolejce, jak to tylko możliwe, co w moim przypadku pojedynczym znajduje się z powrotem na PRZÓD kolejki.

Moją pierwszą myślą było użycie kolejki utraconych wiadomości przesyłającej z powrotem do kolejki wiadomości w pewnym przedziale czasu ( podobnym do podejścia opisanego w tej odpowiedzi ), ale wolałbym nie tworzyć dodatkowej kolejki, jeśli istnieje sposób, aby po prostu ponownie ustawić kolejkę do TYŁ początkowej kolejki

Moja poniższa struktura po prostu zużywa wiadomość i nie dodaje jej ponownie do kolejki.

Jak można to osiągnąć bez DLQ?

@ServiceActivator(inputChannel = "amqpInputChannel")
    public void handle(@Payload String message,
                       @Header(AmqpHeaders.CHANNEL) Channel channel,
                       @Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag){

    try{

        methodThatThrowsRequeueError();
        methodThatThrowsMoveToErrorQueueError();

    } catch (RequeueError re) {

        channel.basicAck(deliveryTag, false);
        sendMessageToBackOfQueue(message);
        return;

    } catch (MoveToErrorQueueError me) {
        //Structured the same as sendMessageToBackOfQueue, works fine
        moveMessageToErrorQueue(message);
    }
    
    channel.basicAck(deliveryTag, false);
}

private void sendMessageToBackOfQueue(String message) {
        try {
            rabbitTemplate.convertAndSend(
                exchangeName,
                routingKeyRequeueMessage,
                message,
                message -> {
                    message.getMessageProperties().setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
                    return message;
                }
            );
        } catch (AmqpException amqpEx) {
            //error handling which is not triggered...
        }
    }

Odpowiedzi

DapperDan Nov 25 2020 at 01:48

TL; DR : Nie ma sposobu, aby przesłać wiadomość z usługi nasłuchującej z powrotem do kolejki źródłowej bez pośrednika.

Istnieje kilka opcji związanych z kolejkami utraconych wiadomości / wymianami utraconych wiadomości, ale znalezione przez nas rozwiązanie inne niż DLQ / DLX było czasową wymianą, pseudo DLX, jeśli wolisz. Głównie:

Wiadomość wchodzi do MessageExchange (MsgX), która jest propagowana do kolejki usług (SvcQ). Usługa (Svc) Pobiera komunikat z SvcQ.

Po ustaleniu, że wiadomość powinna zostać wysłana na tył SvcQ, Svc powinien:

  1. Wyślij potwierdzenie do SvcQ.
  2. Wyślij wiadomość na inną giełdę, naszą czasową psuedo-DLX
  3. Pseudo-DLX można skonfigurować tak, aby wysyłał komunikaty do (BACK OF !!) SvcQ w pewnym przedziale czasowym