низкая производительность загрузки при пакетной вставке строк в Spanner с использованием jdbc

Aug 19 2020

Предыстория: я пытаюсь загрузить файлы данных в формате TSV (выгруженные из базы данных MySQL) в таблицу GCP Spanner.

  • клиентская библиотека: официальная зависимость Spanner JDBC v1.15.0
  • схема таблицы: два столбца с строковым типом и десять столбцов с типом int
  • Экземпляр GCP Spanner: настроен как многорегиональный nam6 с 5 узлами

Моя программа загрузки работает на виртуальной машине GCP и является единственным клиентом, имеющим доступ к экземпляру Spanner. Автоматическая фиксация включена. Пакетная вставка - единственная операция DML, выполняемая моей программой, и размер пакета составляет около 1500. В каждой фиксации она полностью использует предел мутации, который составляет 20000. И в то же время размер фиксации меньше 5 МБ (значения двух столбцов строкового типа - малогабаритные). Строки разбиваются на разделы на основе первого столбца первичного ключа, поэтому каждую фиксацию можно отправлять в очень небольшое количество разделов для повышения производительности.

Со всей конфигурацией и оптимизацией, описанной выше, скорость вставки составляет всего около 1 тыс. Строк в секунду. Это меня действительно разочаровывает, потому что мне нужно вставить более 800 миллионов строк. Я заметил, что в официальном документе упоминается ок. пиковая запись (всего QPS) составляет 1800 для многорегионального экземпляра Spanner.

Итак, у меня есть два вопроса:

  1. Принимая во внимание такое низкое пиковое количество запросов в секунду при записи, означает ли это, что GCP не ожидает или не поддерживает клиентов для переноса больших наборов данных в многорегиональный экземпляр Spanner?
  2. Я видел высокую задержку чтения из мониторинга Spanner. У меня нет запросов на чтение. Я предполагаю, что во время записи строк Spanner должен сначала прочитать и проверить, существует ли строка с тем же первичным ключом. Если моя догадка верна, почему это занимает так много времени? Если нет, могу ли я получить какие-либо указания о том, как происходят эти операции чтения?

Ответы

KnutOlavLoite Aug 19 2020 at 15:50

Мне не совсем понятно, как именно вы настраиваете клиентское приложение, загружающее данные. Мое первое впечатление таково, что ваше клиентское приложение может не выполнять достаточно транзакций параллельно. Обычно вы должны иметь возможность вставлять значительно больше 1000 строк в секунду, но для этого потребуется, чтобы вы выполняли несколько транзакций параллельно, возможно, с нескольких виртуальных машин. Я использовал следующий простой пример, чтобы проверить пропускную способность нагрузки с моего локального компьютера на экземпляр Spanner с одним узлом, и это дало мне пропускную способность примерно 1500 строк в секунду.

Установка с несколькими узлами с использованием клиентского приложения, запущенного на одной или нескольких виртуальных машинах в том же сетевом регионе, что и ваш экземпляр Spanner, должна обеспечивать более высокие объемы, чем это.

import com.google.api.client.util.Base64;
import com.google.common.base.Stopwatch;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class TestJdbc {

  public static void main(String[] args) {
    final int threads = 512;
    ExecutorService executor = Executors.newFixedThreadPool(threads);
    watch = Stopwatch.createStarted();
    for (int i = 0; i < threads; i++) {
      executor.submit(new InsertRunnable());
    }
  }

  static final AtomicLong rowCount = new AtomicLong();
  static Stopwatch watch;

  static final class InsertRunnable implements Runnable {
    @Override
    public void run() {
      try (Connection connection =
          DriverManager.getConnection(
              "jdbc:cloudspanner:/projects/my-project/instances/my-instance/databases/my-db")) {
        while (true) {
          try (PreparedStatement ps =
              connection.prepareStatement("INSERT INTO Test (Id, Col1, Col2) VALUES (?, ?, ?)")) {
            for (int i = 0; i < 150; i++) {
              ps.setLong(1, rnd.nextLong());
              ps.setString(2, randomString(100));
              ps.setString(3, randomString(100));
              ps.addBatch();
              rowCount.incrementAndGet();
            }
            ps.executeBatch();
          }
          System.out.println("Rows inserted: " + rowCount);
          System.out.println("Rows/second: " + rowCount.get() / watch.elapsed(TimeUnit.SECONDS));
        }
      } catch (SQLException e) {
        throw new RuntimeException(e);
      }
    }

    private final Random rnd = new Random();

    private String randomString(int maxLength) {
      byte[] bytes = new byte[rnd.nextInt(maxLength / 2) + 1];
      rnd.nextBytes(bytes);
      return Base64.encodeBase64String(bytes);
    }
  }
}

Есть также несколько других вещей, которые вы можете попробовать настроить, чтобы добиться лучших результатов:

  • Уменьшение количества строк в пакете может дать лучшие общие результаты.
  • Если возможно, использование InsertOrUpdateобъектов мутации намного эффективнее, чем использование операторов DML (см. Пример ниже).

Пример использования Mutationвместо DML:

import com.google.api.client.util.Base64;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.jdbc.CloudSpannerJdbcConnection;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class TestJdbc {

  public static void main(String[] args) {
    final int threads = 512;
    ExecutorService executor = Executors.newFixedThreadPool(threads);
    watch = Stopwatch.createStarted();
    for (int i = 0; i < threads; i++) {
      executor.submit(new InsertOrUpdateMutationRunnable());
    }
  }

  static final AtomicLong rowCount = new AtomicLong();
  static Stopwatch watch;

  static final class InsertOrUpdateMutationRunnable implements Runnable {
    @Override
    public void run() {
      try (Connection connection =
          DriverManager.getConnection(
              "jdbc:cloudspanner:/projects/my-project/instances/my-instance/databases/my-db")) {
        CloudSpannerJdbcConnection csConnection = connection.unwrap(CloudSpannerJdbcConnection.class);
        CloudSpannerJdbcConnection csConnection =
            connection.unwrap(CloudSpannerJdbcConnection.class);
        while (true) {
          ImmutableList.Builder<Mutation> builder = ImmutableList.builder();
          for (int i = 0; i < 150; i++) {
            builder.add(
                Mutation.newInsertOrUpdateBuilder("Test")
                    .set("Id")
                    .to(rnd.nextLong())
                    .set("Col1")
                    .to(randomString(100))
                    .set("Col2")
                    .to(randomString(100))
                    .build());
            rowCount.incrementAndGet();
          }
          csConnection.write(builder.build());
          System.out.println("Rows inserted: " + rowCount);
          System.out.println("Rows/second: " + rowCount.get() / watch.elapsed(TimeUnit.SECONDS));
        }
        }
      } catch (SQLException e) {
        throw new RuntimeException(e);
      }
    }

    private final Random rnd = new Random();

    private String randomString(int maxLength) {
      byte[] bytes = new byte[rnd.nextInt(maxLength / 2) + 1];
      rnd.nextBytes(bytes);
      return Base64.encodeBase64String(bytes);
    }
  }
}

Приведенный выше простой пример дает мне пропускную способность около 35 000 строк в секунду без какой-либо дополнительной настройки.

ДОПОЛНИТЕЛЬНАЯ ИНФОРМАЦИЯ 2020-08-21 : Причина, по которой объекты мутации более эффективны, чем (пакетные) операторы DML, заключается в том, что операторы DML внутренне преобразуются Cloud Spanner в запросы чтения, которые затем используются для создания мутаций. Это преобразование необходимо выполнять для каждого оператора DML в пакете, а это означает, что пакет DML с 1500 простыми операторами вставки вызовет 1500 (небольших) запросов чтения и должен быть преобразован в 1500 мутаций. Скорее всего, это также причина задержки чтения, которую вы видите в своем мониторинге.

Не могли бы вы иначе поделиться дополнительной информацией о том, как выглядит ваше клиентское приложение и сколько его экземпляров вы используете?

RedPandaCurios Aug 20 2020 at 23:30

Имея более 800 миллионов строк для вставки и учитывая, что вы программист на Java, могу ли я предложить использовать Beam on Dataflow?

Устройство записи гаечного ключа в Beam спроектировано так, чтобы быть максимально эффективным с его записью - группирование строк по одинаковому ключу и их группирование, как вы это делаете. Beam on Dataflow также может использовать несколько рабочих виртуальных машин для параллельного выполнения нескольких операций чтения и записи с помощью гаечного ключа ...

С экземпляром мультирегионального гаечного ключа вы должны иметь возможность получать примерно 1800 строк на узел в секунду со скоростью вставки (больше, если строки маленькие и групповые, как предлагает ответ Кнута), а с 5 узлами гаечного ключа у вас, вероятно, может быть от 10 до 20 потоки импортера, работающие параллельно - независимо от того, используется ли ваша программа импорта или используется поток данных.

(раскрытие: я сопровождаю Beam SpannerIO)