Como posso reenviar dados para um objeto SocketConnect sem criar um novo soquete?

Aug 15 2020

Eu tenho uma função Python que pode ser acessada por meio de uma conexão TCP na porta 1234. Em Wolfram Language, tenho o seguinte código:

ClearAll[getResFromPython];
getResFromPython[sock_][arg_String] := (
    WriteString[sock, ExportString[<|"arg" -> arg|>, "JSON"]];
    ReadString[sock]
);

Como a criação de um novo soquete não é exatamente instantânea - leva quase dois segundos no meu computador - eu gostaria de fazer o seguinte:

ClearAll[sock, vals];
sock = SocketConnect[{"localhost", 1234}, "TCP"];
vals = Map[getResFromPython[sock], {"arg1", "arg2", "arg3"}];
Close[sock];
(* Now do something with vals. *)

Isso não funciona. Só "arg1"é avaliado corretamente, e acho que é porque o soquete precisa ser liberado antes que o próximo argumento seja gravado nele. Mas não consigo encontrar nenhuma função na documentação que liberte o soquete. Existe uma WSFlushfunção, mas é para um propósito totalmente diferente.

Qual é a maneira correta de reutilizar uma conexão de soquete em uma lista de argumentos?

Editar

Caso alguém esteja interessado, aqui está minha configuração simples de Python:

# tcpserver.py
import socketserver
import urllib.parse
import json

def getVal(arg):
    return 1

class TCPHandler(socketserver.BaseRequestHandler):
    def handle(self):
        self.data = json.loads(self.request.recv(2048).strip())
        res = getVal(self.data["arg"])
        self.request.sendall(bytes(json.dumps({"res": res}), "utf-8"))

if __name__ == "__main__":
    HOST, PORT = "localhost", 1234
    with socketserver.TCPServer((HOST, PORT), SASTCPHandler) as server:
        server.serve_forever()

Respostas

4 flinty Aug 15 2020 at 22:41

Eu tenho lutado com isso por um tempo e finalmente percebi que o problema não é o Mathematica. TCPHandlerparece matar a conexão após o primeiro handle(). Não mantém viva uma conexão. Você pode ler e usar o código no final da minha resposta ou pode tentar conectar uma vez e escrever uma vez em lote. Isso significa que você grava todos os dados juntos para uma conexão usando a forma de sequência de WriteString[sock, string1, string2, ..., stringn].


Se você usar esta implementação básica de servidor abaixo, verá que o Mathematica é perfeitamente capaz de fazer vários WriteStringno mesmo soquete e uma única conexão:

import socket
from time import sleep

TCP_IP = '127.0.0.1'
TCP_PORT = 1234
BUFFER_SIZE = 2048

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((TCP_IP, TCP_PORT))
s.listen(1)

connection, addr = s.accept()
while True:
    try:
        connection.send("keeping alive".encode())
        data = connection.recv(BUFFER_SIZE)
        if data:
            print("received data:" + str(data))
            connection.send("got it!".encode())
        else:
            sleep(0.06)
    except KeyboardInterrupt:
        connection.close()
    except socket.error:
        print("client disconnected")
        connection.close()
        break

... e do Mathematica:

sock = SocketConnect[{"localhost", 1234}, "TCP"];
WriteString[sock, "Hey!"]
Print@ByteArrayToString@SocketReadMessage[sock]
WriteString[sock, "What's up?"]
Print@ByteArrayToString@SocketReadMessage[sock]
Close[sock]

Você deve ver o resultado do lado do servidor:

received data:b'Hey!'
received data:b"What's up?"
client disconnected

Portanto, sabemos que é possível. Vejamos um dump de pacote do Wireshark para você TCPServerver o que está errado. Monitorei a interface de loopback em vez de Ethernet ou Wi-Fi para produzir o rastreamento.

Parece que o servidor é quem está encerrando a conexão. O servidor envia um FINpacote pouco antes do Mathematica enviar o segundo item. O Mathematica não esperava e continua enviando o segundo item (o [PSH, ACK] final no trace) . O Mathematica não finaliza a conexão e cumpre com o FINentão o servidor envia um RSTe mata a conexão com força, nunca tratando dos pedidos restantes. Depois de TCPHandler.handle()retornar do processamento do primeiro item, o TCPServernão mantém a conexão.

Se quiser manter a conexão ativa durante o, handle()você terá que criar um loop e um atraso de votação como este:

# tcpserver.py
import socketserver
import urllib.parse
import json
from time import sleep

def getVal(arg):
    return 1

class TCPHandler(socketserver.BaseRequestHandler):
    def handle(self):
        alive = True
        while(alive):
            try:
                self.data = self.request.recv(2048)
                if self.data:
                    jsonresult = json.loads(self.data.strip())
                    res = getVal(jsonresult["arg"])
                    print(jsonresult)
                    self.request.sendall(bytes(json.dumps({"res": res}), "utf-8"))
                else:
                    print("no data")
                    time.sleep(0.06) # 60ms delay
                    continue
            except:
                print("finished!")
                alive = False


if __name__ == "__main__":
    HOST, PORT = "localhost", 1234
    with socketserver.TCPServer((HOST, PORT), TCPHandler) as server:
        server.socket.settimeout(None)
        server.serve_forever()

... e use a seguinte pequena modificação em seu código do Mathematica (eu usei no SocketReadMessagelugar de ReadString) :

ClearAll[getResFromPython];
getResFromPython[sock_][arg_String] := (
   WriteString[sock, ExportString[<|"arg" -> arg|>, "JSON"]];
   ByteArrayToString@SocketReadMessage[sock]);

ClearAll[sock, vals];
sock = SocketConnect[{"localhost", 1234}, "TCP"];
vals = Map[getResFromPython[sock], {"arg1", "arg2", "arg3"}];
Close[sock];

Finalmente funciona!

{'arg': 'arg1'}
{'arg': 'arg2'}
{'arg': 'arg3'}
no data
finished!

Também funciona com o envio de várias solicitações com atraso entre as quais você pode fazer assim:

sock = SocketConnect[{"localhost", 1234}, "TCP"];
vals = Reap[Do[
     Sow[Map[getResFromPython[sock], {"arg1", "arg2", "arg3"}]];
     Pause[1];
     , 5]] // Last // First
Close[sock];