faibles performances de chargement lors de l'insertion par lots de lignes dans Spanner à l'aide de jdbc

Aug 19 2020

Contexte: j'essaie de charger des fichiers de données au format TSV (vidés de la base de données MySQL) dans une table GCP Spanner.

  • bibliothèque cliente: la dépendance officielle Spanner JDBC v1.15.0
  • schéma de table: deux colonnes de type chaîne et dix colonnes de type int
  • Instance GCP Spanner: configurée en tant que nam6 multirégional avec 5 nœuds

Mon programme de chargement s'exécute dans la VM GCP et est le client exclusif qui accède à l'instance Spanner. La validation automatique est activée. L'insertion par lots est la seule opération DML exécutée par mon programme et la taille du lot est d'environ 1500. Dans chaque commit, elle utilise pleinement la limite de mutation, qui est de 20000. Et en même temps, la taille de commit est inférieure à 5 Mo (les valeurs de deux colonnes de type chaîne sont de petite taille). Les lignes sont partitionnées en fonction de la première colonne de la clé primaire afin que chaque validation puisse être envoyée à très peu de partitions pour de meilleures performances.

Avec toute la configuration et l'optimisation ci-dessus, le taux d'insertion n'est que d'environ 1 000 lignes par seconde. Cela me déçoit vraiment car j'ai plus de 800 millions de lignes à insérer. J'ai remarqué que le document officiel mentionnait les env. l'écriture de crête (QPS total) est de 1800 pour l'instance Spanner multirégionale.

J'ai donc deux questions ici:

  1. Compte tenu de ces QPS d'écriture de pointe, cela signifie-t-il que GCP ne s'attend pas ou n'assiste pas les clients à migrer de grands ensembles de données vers l'instance Spanner multirégionale?
  2. Je voyais la latence de lecture élevée de la surveillance Spanner. Je n'ai aucune demande de lecture. Je suppose que pendant l'écriture des lignes, Spanner doit d'abord lire et vérifier si une ligne avec la même clé primaire existe. Si ma supposition est juste, pourquoi cela prend-il autant de temps? Sinon, pourrais-je obtenir des conseils sur la façon dont ces opérations de lecture se produisent?

Réponses

KnutOlavLoite Aug 19 2020 at 15:50

Je ne vois pas exactement comment vous configurez l'application cliente qui charge les données. Ma première impression est que votre application cliente n'exécute peut-être pas suffisamment de transactions en parallèle. Vous devriez normalement être en mesure d'insérer beaucoup plus de 1000 lignes / seconde, mais cela nécessiterait d'exécuter plusieurs transactions en parallèle, éventuellement à partir de plusieurs VM. J'ai utilisé l'exemple simple suivant pour tester le débit de charge de ma machine locale vers une instance Spanner à un seul nœud, ce qui m'a donné un débit d'environ 1500 lignes / seconde.

Une configuration multi-nœuds utilisant une application cliente s'exécutant sur une ou plusieurs machines virtuelles dans la même région réseau que votre instance Spanner devrait pouvoir atteindre des volumes plus élevés que cela.

import com.google.api.client.util.Base64;
import com.google.common.base.Stopwatch;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class TestJdbc {

  public static void main(String[] args) {
    final int threads = 512;
    ExecutorService executor = Executors.newFixedThreadPool(threads);
    watch = Stopwatch.createStarted();
    for (int i = 0; i < threads; i++) {
      executor.submit(new InsertRunnable());
    }
  }

  static final AtomicLong rowCount = new AtomicLong();
  static Stopwatch watch;

  static final class InsertRunnable implements Runnable {
    @Override
    public void run() {
      try (Connection connection =
          DriverManager.getConnection(
              "jdbc:cloudspanner:/projects/my-project/instances/my-instance/databases/my-db")) {
        while (true) {
          try (PreparedStatement ps =
              connection.prepareStatement("INSERT INTO Test (Id, Col1, Col2) VALUES (?, ?, ?)")) {
            for (int i = 0; i < 150; i++) {
              ps.setLong(1, rnd.nextLong());
              ps.setString(2, randomString(100));
              ps.setString(3, randomString(100));
              ps.addBatch();
              rowCount.incrementAndGet();
            }
            ps.executeBatch();
          }
          System.out.println("Rows inserted: " + rowCount);
          System.out.println("Rows/second: " + rowCount.get() / watch.elapsed(TimeUnit.SECONDS));
        }
      } catch (SQLException e) {
        throw new RuntimeException(e);
      }
    }

    private final Random rnd = new Random();

    private String randomString(int maxLength) {
      byte[] bytes = new byte[rnd.nextInt(maxLength / 2) + 1];
      rnd.nextBytes(bytes);
      return Base64.encodeBase64String(bytes);
    }
  }
}

Il y a aussi quelques autres choses que vous pouvez essayer de régler pour obtenir de meilleurs résultats:

  • La réduction du nombre de lignes par lot pourrait donner de meilleurs résultats globaux.
  • Si possible, l'utilisation d' InsertOrUpdateobjets de mutation est beaucoup plus efficace que l'utilisation d'instructions DML (voir l'exemple ci-dessous).

Exemple d'utilisation Mutationau lieu de DML:

import com.google.api.client.util.Base64;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.jdbc.CloudSpannerJdbcConnection;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class TestJdbc {

  public static void main(String[] args) {
    final int threads = 512;
    ExecutorService executor = Executors.newFixedThreadPool(threads);
    watch = Stopwatch.createStarted();
    for (int i = 0; i < threads; i++) {
      executor.submit(new InsertOrUpdateMutationRunnable());
    }
  }

  static final AtomicLong rowCount = new AtomicLong();
  static Stopwatch watch;

  static final class InsertOrUpdateMutationRunnable implements Runnable {
    @Override
    public void run() {
      try (Connection connection =
          DriverManager.getConnection(
              "jdbc:cloudspanner:/projects/my-project/instances/my-instance/databases/my-db")) {
        CloudSpannerJdbcConnection csConnection = connection.unwrap(CloudSpannerJdbcConnection.class);
        CloudSpannerJdbcConnection csConnection =
            connection.unwrap(CloudSpannerJdbcConnection.class);
        while (true) {
          ImmutableList.Builder<Mutation> builder = ImmutableList.builder();
          for (int i = 0; i < 150; i++) {
            builder.add(
                Mutation.newInsertOrUpdateBuilder("Test")
                    .set("Id")
                    .to(rnd.nextLong())
                    .set("Col1")
                    .to(randomString(100))
                    .set("Col2")
                    .to(randomString(100))
                    .build());
            rowCount.incrementAndGet();
          }
          csConnection.write(builder.build());
          System.out.println("Rows inserted: " + rowCount);
          System.out.println("Rows/second: " + rowCount.get() / watch.elapsed(TimeUnit.SECONDS));
        }
        }
      } catch (SQLException e) {
        throw new RuntimeException(e);
      }
    }

    private final Random rnd = new Random();

    private String randomString(int maxLength) {
      byte[] bytes = new byte[rnd.nextInt(maxLength / 2) + 1];
      rnd.nextBytes(bytes);
      return Base64.encodeBase64String(bytes);
    }
  }
}

L'exemple simple ci-dessus me donne un débit d'environ 35 000 lignes / seconde sans autre réglage.

INFORMATIONS SUPPLÉMENTAIRES 21/08/2020 : La raison pour laquelle les objets de mutation sont plus efficaces que les instructions DML (par lots) est que les instructions DML sont converties en interne en requêtes de lecture par Cloud Spanner, qui sont ensuite utilisées pour créer des mutations. Cette conversion doit être effectuée pour chaque instruction DML d'un lot, ce qui signifie qu'un lot DML avec 1 500 instructions d'insertion simples déclenchera 1 500 (petites) requêtes de lecture et devra être converti en 1 500 mutations. C'est probablement aussi la raison de la latence de lecture que vous voyez dans votre surveillance.

Cela vous dérangerait-il autrement de partager plus d'informations sur l'apparence de votre application cliente et le nombre d'instances que vous exécutez?

RedPandaCurios Aug 20 2020 at 23:30

Avec plus de 800 millions de lignes à insérer et voyant que vous êtes un programmeur Java, puis-je suggérer d'utiliser Beam sur Dataflow?

Le graveur de clé dans Beam est conçu pour être aussi efficace que possible avec ses écritures - regroupant les lignes par une clé similaire et les regroupant comme vous le faites. Beam on Dataflow peut également utiliser plusieurs machines virtuelles de travail pour exécuter plusieurs lectures de fichiers et écritures clés en parallèle ...

Avec une instance de clé multirégion, vous devriez être en mesure d'obtenir environ 1800 lignes par nœud par seconde vitesse d'insertion (plus si les lignes sont petites et groupées, comme le suggère la réponse de Knut) et avec 5 nœuds de clé, vous pouvez probablement avoir entre 10 et 20 les threads d'importation s'exécutant en parallèle - que ce soit en utilisant votre programme d'importation ou en utilisant Dataflow.

(divulgation: je suis le mainteneur de Beam SpannerIO)