File di recupero Spring Sftp utilizzando il gateway in uscita all'interno del gestore dei messaggi dell'adattatore in entrata

Aug 22 2020

Sto utilizzando Inbound Adapter utilizzando Java DSL per eseguire il polling dei file pdf dal server SFTP. Ho un caso d'uso in cui dopo aver recuperato il file pdf, l'applicazione recupererà il file di configurazione presente in formato CSV con lo stesso nome sul server SFTP. Dopo aver recuperato il file di configurazione, l'applicazione elaborerà il file pdf originale utilizzando le proprietà definite nel file di configurazione e lo caricherà di nuovo sul server SFTP utilizzando l'adattatore in uscita.

Sto affrontando problemi con il recupero dei file di configurazione all'interno del gestore sullo stesso thread utilizzando il gateway in uscita.

Ecco il mio codice:

Registra flussi di integrazione:

  for (String client : clientsArr) {
      this.flowContext.registration(getInboundIntegrationFlow(client)).register();
  }

  this.flowContext.registration(getOutboundIntegrationFlow()).register();
  this.flowContext.registration(sftpGatewayGetIntegrationFlow()).register();

Flusso di integrazione dell'adattatore in ingresso:


  @Autowired
  private SftpPdfMessageHandler messageHandler;

  private IntegrationFlow getInboundIntegrationFlow(String client) {

    String remoteDirectory = getRemoteDirectory(client);
    String localDirectory = getLocalDirectory(client);
    String inboundAdapterId = getInboundAdapterId(client);

    return IntegrationFlows
        .from(Sftp.inboundAdapter(sftpSessionFactory())
                .preserveTimestamp(true)
                .remoteDirectory(remoteDirectory)
                .autoCreateLocalDirectory(true)
                .localDirectory(new File(localDirectory))
                .maxFetchSize(Integer.parseInt(sftpProperties.getMaxFetchSize()))
                .filter(new SftpSimplePatternFileListFilter(sftpProperties.getRemoteFileFilter()))
                .deleteRemoteFiles(true),
            e -> e.id(inboundAdapterId)
                .autoStartup(true)
                .poller(Pollers
                    .fixedDelay(Long.parseLong(sftpProperties.getPollPeriodInSeconds()), TimeUnit.SECONDS)
                    .receiveTimeout(Long.parseLong(sftpProperties.getPollerTimeout()))
                    .maxMessagesPerPoll(Long.parseLong(sftpProperties.getMaxMessagesPerPoll()))
                ))
        .handle(inBoundHandler())
        .get();
  }

  public MessageHandler inBoundHandler() {
    return message -> {
      File file = (File) message.getPayload();
      messageHandler.handleMessage(file);
    };
  }

Flusso di integrazione dell'adattatore in uscita:

  private IntegrationFlow getOutboundIntegrationFlow() {

    return IntegrationFlows.from("sftpOutboundChannel")
        .handle(Sftp.outboundAdapter(sftpSessionFactory(), FileExistsMode.FAIL)
            .remoteDirectoryExpression(String.format("headers['%s']", FileHeaders.REMOTE_DIRECTORY))).get();
  }

  @Bean("sftpOutboundChannel")
  public MessageChannel sftpOutboundChannel() {
    return new DirectChannel();
  }

Gestore messaggi SFTP:

  @Async("sftpHandlerAsyncExecutor")
  public void handleMessage(File originalFile) {

    File configFile = fetchConfigFile();

    /*
      process original file and store processed file in output file path on local directory
     */
      
    boolean success = uploadFileToSftpServer(outputFilePath, client, entity);

    if (success) {
      deleteFileFromLocal(originalFile);
    }
  }

Flusso di integrazione GET del gateway in uscita:

  private IntegrationFlow sftpGatewayGetIntegrationFlow() {
    return IntegrationFlows.from("sftpGetInputChannel")
        .handle(Sftp.outboundGateway(sftpSessionFactory(),
            AbstractRemoteFileOutboundGateway.Command.GET, "payload")
            .options(AbstractRemoteFileOutboundGateway.Option.DELETE,
                AbstractRemoteFileOutboundGateway.Option.PRESERVE_TIMESTAMP)
            .localDirectoryExpression(String.format("headers['%s']", Constants.HEADER_LOCAL_DIRECTORY_NAME))
            .autoCreateLocalDirectory(true))
        .channel("nullChannel")
        .get();
  }

  @Bean("sftpGetInputChannel")
  public MessageChannel sftpGetInputChannel() {
    return new DirectChannel();
  }

messageHandler.handleMessage()metodo viene chiamato in async (con ThreadPoolTaskExecutor) che recupera internamente il file di configurazione utilizzando il gateway in uscita. Ma non sono riuscito a trovare un singolo canale in cui posso inviare e ricevere un payload di messaggi nello stesso thread. Ho trovato MessagingTemplate nei documenti primaverili ma non sono riuscito a trovare un modo per collegarlo al flusso di integrazione del gateway in uscita.

sftpGetMessageTemplate.sendAndReceive(sftpGetInputChannel, new GenericMessage<>("/dir/file.csv", headers))restituisce l' eccezione " Dispatcher non ha sottoscrittori per il canale " con DirectChannel ().

Sto cercando una soluzione in cui posso recuperare il file richiesto dal server con uno dei seguenti modi:

  • Integrare MessagingTemplate con IntegrationFlow (se possibile) utilizzando un canale appropriato.
  • Alcuni concatenamenti di gestori di messaggi nel flusso dell'adattatore in entrata in cui dopo aver eseguito il polling del file originale recupererà un altro file utilizzando il gateway in uscita sftp e quindi chiamerà il gestore finale con entrambi gli oggetti (file originale e file di configurazione). Sto cercando di ottenere una cosa simile usando il codice personalizzato sopra.
  • Qualsiasi altro modo per utilizzare i canali di invio e poller per il comando GET in un ambiente multi-thread.

L'applicazione deve decidere il percorso della directory in fase di esecuzione durante l'utilizzo del comando GET.

Risposte

1 ArtemBilan Aug 21 2020 at 23:08

Probabilmente hai bisogno di imparare cos'è un @MessagingGatewaye come farlo interagire con i canali sul tuo gateway in uscita.

Vedi i documenti per maggiori informazioni: https://docs.spring.io/spring-integration/docs/5.3.2.RELEASE/reference/html/messaging-endpoints.html#gateway

Se vuoi davvero ottenere un file di configurazione come risultato, non dovresti farlo .channel("nullChannel"). Con il gateway in mano ci sarà replyChannelun'intestazione con TemporaryReplyChannelun'istanza popolata dal gateway. Quindi nel tuo codice utilizzerai quell'interfaccia funzionale come API da chiamare.

In effetti quel gateway di messaggistica utilizza un file MessagingTemplate.sendAndReceive().