bajo rendimiento de carga al insertar filas por lotes en Spanner usando jdbc

Aug 19 2020

Antecedentes: Estoy intentando cargar archivos de datos con formato TSV (descargados de la base de datos MySQL) en una tabla de GCP Spanner.

  • biblioteca cliente: la dependencia oficial de Spanner JDBC v1.15.0
  • esquema de tabla: dos columnas de tipo cadena y diez columnas de tipo int
  • Instancia de GCP Spanner: configurada como nam6 multirregión con 5 nodos

Mi programa de carga se ejecuta en GCP VM y es el cliente exclusivo que accede a la instancia de Spanner. La confirmación automática está habilitada. La inserción por lotes es la única operación DML ejecutada por mi programa y el tamaño del lote es de alrededor de 1500. En cada confirmación, utiliza por completo el límite de mutación, que es 20000. Y, al mismo tiempo, el tamaño de la confirmación es inferior a 5 MB (los valores de dos columnas con tipo de cadena son de tamaño pequeño). Las filas se dividen en función de la primera columna de la clave principal para que cada confirmación se pueda enviar a muy pocas particiones para un mejor rendimiento.

Con toda la configuración y la optimización anteriores, la tasa de inserción es de solo alrededor de 1k filas por segundo. Esto realmente me decepciona porque tengo más de 800 millones de filas para insertar. Me di cuenta de que el documento oficial mencionó aprox. la escritura máxima (QPS total) es 1800 para la instancia de Spanner de múltiples regiones.

Entonces tengo dos preguntas aquí:

  1. Teniendo en cuenta un QPS de escritura de pico tan bajo, ¿significa que GCP no espera o no admite que los clientes migren grandes conjuntos de datos a la instancia de Spanner de múltiples regiones?
  2. Estaba viendo la alta latencia de lectura de la supervisión de Spanner. No tengo solicitudes de lectura. Supongo que mientras escribe filas, Spanner primero debe leer y verificar si existe una fila con la misma clave principal. Si mi conjetura es correcta, ¿por qué lleva tanto tiempo? Si no es así, ¿podría obtener alguna orientación sobre cómo se realizan estas operaciones de lectura?

Respuestas

KnutOlavLoite Aug 19 2020 at 15:50

No me queda muy claro exactamente cómo está configurando la aplicación cliente que carga los datos. Mi impresión inicial es que es posible que su aplicación cliente no esté ejecutando suficientes transacciones en paralelo. Normalmente debería poder insertar significativamente más de 1,000 filas / segundo, pero sería necesario que ejecute múltiples transacciones en paralelo, posiblemente desde múltiples VM. Utilicé el siguiente ejemplo simple para probar el rendimiento de carga de mi máquina local a una instancia de Spanner de un solo nodo, y eso me dio un rendimiento de aproximadamente 1,500 filas / segundo.

Una configuración de varios nodos que utilice una aplicación cliente que se ejecute en una o más VM en la misma región de red que su instancia de Spanner debería poder lograr volúmenes más altos que eso.

import com.google.api.client.util.Base64;
import com.google.common.base.Stopwatch;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class TestJdbc {

  public static void main(String[] args) {
    final int threads = 512;
    ExecutorService executor = Executors.newFixedThreadPool(threads);
    watch = Stopwatch.createStarted();
    for (int i = 0; i < threads; i++) {
      executor.submit(new InsertRunnable());
    }
  }

  static final AtomicLong rowCount = new AtomicLong();
  static Stopwatch watch;

  static final class InsertRunnable implements Runnable {
    @Override
    public void run() {
      try (Connection connection =
          DriverManager.getConnection(
              "jdbc:cloudspanner:/projects/my-project/instances/my-instance/databases/my-db")) {
        while (true) {
          try (PreparedStatement ps =
              connection.prepareStatement("INSERT INTO Test (Id, Col1, Col2) VALUES (?, ?, ?)")) {
            for (int i = 0; i < 150; i++) {
              ps.setLong(1, rnd.nextLong());
              ps.setString(2, randomString(100));
              ps.setString(3, randomString(100));
              ps.addBatch();
              rowCount.incrementAndGet();
            }
            ps.executeBatch();
          }
          System.out.println("Rows inserted: " + rowCount);
          System.out.println("Rows/second: " + rowCount.get() / watch.elapsed(TimeUnit.SECONDS));
        }
      } catch (SQLException e) {
        throw new RuntimeException(e);
      }
    }

    private final Random rnd = new Random();

    private String randomString(int maxLength) {
      byte[] bytes = new byte[rnd.nextInt(maxLength / 2) + 1];
      rnd.nextBytes(bytes);
      return Base64.encodeBase64String(bytes);
    }
  }
}

También hay un par de otras cosas que podría intentar ajustar para obtener mejores resultados:

  • Reducir el número de filas por lote podría producir mejores resultados generales.
  • Si es posible, usar InsertOrUpdateobjetos de mutación es mucho más eficiente que usar declaraciones DML (vea el ejemplo a continuación).

Ejemplo usando en Mutationlugar de DML:

import com.google.api.client.util.Base64;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.jdbc.CloudSpannerJdbcConnection;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class TestJdbc {

  public static void main(String[] args) {
    final int threads = 512;
    ExecutorService executor = Executors.newFixedThreadPool(threads);
    watch = Stopwatch.createStarted();
    for (int i = 0; i < threads; i++) {
      executor.submit(new InsertOrUpdateMutationRunnable());
    }
  }

  static final AtomicLong rowCount = new AtomicLong();
  static Stopwatch watch;

  static final class InsertOrUpdateMutationRunnable implements Runnable {
    @Override
    public void run() {
      try (Connection connection =
          DriverManager.getConnection(
              "jdbc:cloudspanner:/projects/my-project/instances/my-instance/databases/my-db")) {
        CloudSpannerJdbcConnection csConnection = connection.unwrap(CloudSpannerJdbcConnection.class);
        CloudSpannerJdbcConnection csConnection =
            connection.unwrap(CloudSpannerJdbcConnection.class);
        while (true) {
          ImmutableList.Builder<Mutation> builder = ImmutableList.builder();
          for (int i = 0; i < 150; i++) {
            builder.add(
                Mutation.newInsertOrUpdateBuilder("Test")
                    .set("Id")
                    .to(rnd.nextLong())
                    .set("Col1")
                    .to(randomString(100))
                    .set("Col2")
                    .to(randomString(100))
                    .build());
            rowCount.incrementAndGet();
          }
          csConnection.write(builder.build());
          System.out.println("Rows inserted: " + rowCount);
          System.out.println("Rows/second: " + rowCount.get() / watch.elapsed(TimeUnit.SECONDS));
        }
        }
      } catch (SQLException e) {
        throw new RuntimeException(e);
      }
    }

    private final Random rnd = new Random();

    private String randomString(int maxLength) {
      byte[] bytes = new byte[rnd.nextInt(maxLength / 2) + 1];
      rnd.nextBytes(bytes);
      return Base64.encodeBase64String(bytes);
    }
  }
}

El ejemplo simple anterior me da un rendimiento de aproximadamente 35,000 filas / segundo sin ningún ajuste adicional.

INFORMACIÓN ADICIONAL 2020-08-21 : La razón por la que los objetos de mutación son más eficientes que las declaraciones DML (por lotes) es que las declaraciones DML se convierten internamente en consultas de lectura mediante Cloud Spanner, que luego se utilizan para crear mutaciones. Esta conversión debe realizarse para cada declaración DML en un lote, lo que significa que un lote DML con 1,500 declaraciones de inserción simples activará 1,500 consultas de lectura (pequeñas) y debe convertirse en 1,500 mutaciones. Probablemente esta sea también la razón detrás de la latencia de lectura que está viendo en su monitoreo.

De lo contrario, ¿le importaría compartir más información sobre cómo se ve su aplicación cliente y cuántas instancias está ejecutando?

RedPandaCurios Aug 20 2020 at 23:30

Con más de 800 millones de filas para insertar, y dado que es un programador de Java, ¿puedo sugerirle que use Beam en Dataflow?

El escritor de llave inglesa en Beam está diseñado para ser lo más eficiente posible con sus escrituras: agrupa filas por una clave similar y las agrupa a medida que lo hace. Beam on Dataflow también puede usar varias VM de trabajadores para ejecutar múltiples lecturas de archivos y escrituras de llave en paralelo ...

Con una instancia de llave multirregión, debería poder obtener aproximadamente 1800 filas por nodo por segundo de velocidad de inserción (más si las filas son pequeñas y están agrupadas, como sugiere la respuesta de Knut) y con 5 nodos de llave, probablemente pueda tener entre 10 y 20 subprocesos de importación que se ejecutan en paralelo, ya sea utilizando su programa de importación o utilizando Dataflow.

(divulgación: soy el mantenedor de Beam SpannerIO)