Flusso a blocchi di Spring Integration con Loop e più SQS HTTP e asincroni

Aug 26 2020

Ho un flusso che

1. Starts with a config map -> MainGateway.start(configMap) -> void
2. Splits map into multiple messages per entry
3. For every config entry do the following using an orchestrator java class:
   BEGIN LOOP (offset and limit)
      Data d = HTTPGateway.getData();
      PublishGateway.sendMessage(d); -> Send to 2 SQS queues    
   END LOOP

Requisito Devo programmare questo flusso tramite cron. Un'opzione è fornire un endpoint HTTP che avvierà il flusso. Ma poi la seconda richiesta HTTP dovrebbe attendere / timeout / errore fino al completamento della prima.

Domanda stavo esaminando la barriera per implementare il blocco per il thread del flusso fino al completamento e avere un solo processore http a thread singolo, quindi in una sola volta viene elaborata solo 1 richiesta e posso sapere quando il flusso è completo. (Il LOOP termina per tutti gli oggetti della voce di configurazione e tutti i messaggi a SQS vengono riconosciuti). Come posso raggiungere questo obiettivo? Ho un ciclo e sto usando il canale pub-sub con esecutori per configurazioni parallele e invio SQS parallelo.

Ho tagliato il XML configsotto per chiarezza.

   <!-- Bring in list of Configs to process -->
    <int:gateway service-interface="Gateway"
                 default-request-channel="configListChannel" />

    <int:chain input-channel="configListChannel" output-channel="configChannel">
        <!-- Split the list to one instance of config per message -->
        <int:splitter/>
        <int:filter expression="payload.enablePolling" />
    </int:chain>

    <!-- Manually orchestrate a loop to query a system as per config and publish messages to SQS -->
    <bean class="Orchestrator" id="orchestrator" />
    <int:service-activator ref="orchestrator" method="getData" input-channel="configChannel" />

    <!-- The flow from this point onwards is triggered inside a loop controlled by the Orchestrator
         The following Gateway calls are inside Orchestrators loop -->

    <!-- Create a Http request from the Orchestrator using a Gateway -->
    <int:gateway service-interface="HttpGateway">
        <int:method name="getData"
                    request-channel="requestChannel"
                    payload-expression="#args[0]">
        </int:method>
    </int:gateway>

    <!-- Transform request object to json and invoke Http endpoint -->
    <int:chain input-channel="requestChannel" id="httpRequestChain">
        <int:object-to-json-transformer />
        <int-http:outbound-gateway url-expression="headers['config'].url"
                                   http-method="POST"
                                   expected-response-type="java.lang.String"
        />
    </int:chain>

    <!-- Publish Messages to Outbound Gateway -->
    <task:executor id="executor" pool-size="5" />
    <int:publish-subscribe-channel id="publishChannel" task-executor="executor" />
    <int:gateway service-interface="PublishGateway" >
        <int:method name="publishToOutbound" payload-expression="#args[0]" request-channel="publishChannel" />
    </int:gateway>


    <!-- Route to System A SQS with transformations (omitted here)-->
    <int-aws:sqs-outbound-channel-adapter sqs="amazonSQS" channel="publishChannel" queue="system-a-queue" success-channel="successChannel" failure-channel="errorChannel"/>

    <!-- Route to System B SQS with transformations (omitted here)-->
    <int-aws:sqs-outbound-channel-adapter sqs="amazonSQS" channel="publishChannel" queue="system-b-queue" success-channel="successChannel" failure-channel="errorChannel"/>

    <int:logging-channel-adapter logger-name="sqsCallbackLogger" log-full-message="true" channel="successChannel" />

Nel frattempo, sto cercando di adattare l'esempio della barriera ABC da spring-integration-samplesal mio caso d'uso.

Risposte

1 ArtemBilan Aug 26 2020 at 17:54

Come hai sottolineato nel tuo commento, un aggregatorapproccio potrebbe essere utilizzato nella tua soluzione.

In questo modo hai aggregato i risultati di quelle richieste SQS parallele e attendi una risposta di aggregazione nel richiedente originale. In questo modo verrà davvero bloccato anche se le parti interne del tuo flusso sono ancora concorrenti. Chiami un gateway e la risposta sarà dall'aggregatore.