basse prestazioni di caricamento durante l'inserimento in batch di righe in Spanner utilizzando jdbc
Background: sto cercando di caricare file di dati in formato TSV (scaricati dal database MySQL) in una tabella GCP Spanner.
- libreria client: la dipendenza ufficiale di Spanner JDBC v1.15.0
- schema della tabella: due colonne di tipo stringa e dieci colonne di tipo int
- Istanza GCP Spanner: configurata come nam6 multiregionale con 5 nodi
Il mio programma di caricamento viene eseguito nella VM GCP ed è il client esclusivo che accede all'istanza di Spanner. Il commit automatico è abilitato. L'inserimento in batch è l'unica operazione DML eseguita dal mio programma e la dimensione del batch è di circa 1500. In ogni commit, utilizza completamente il limite di mutazione, che è 20000. E allo stesso tempo, la dimensione del commit è inferiore a 5 MB (i valori di due colonne di tipo stringa sono di piccole dimensioni). Le righe vengono partizionate in base alla prima colonna della chiave primaria in modo che ogni commit possa essere inviato a pochissime partizioni per prestazioni migliori.
Con tutta la configurazione e l'ottimizzazione di cui sopra, la velocità di inserimento è solo di circa 1k righe al secondo. Questo mi delude davvero perché ho più di 800 milioni di righe da inserire. Ho notato che il documento ufficiale menzionava il ca. il picco di scrittura (totale QPS) è 1800 per l'istanza Spanner in più regioni.
Quindi ho due domande qui:
- Considerando un picco di scrittura così basso QPS, significa che GCP non si aspetta o non supporta i clienti per la migrazione di set di dati di grandi dimensioni all'istanza Spanner multi-regione?
- Stavo vedendo l'elevata latenza di lettura dal monitoraggio di Spanner. Non ho richieste di lettura. La mia ipotesi è che durante la scrittura di righe Spanner debba prima leggere e verificare se esiste una riga con la stessa chiave primaria. Se la mia ipotesi è corretta, perché ci vuole così tanto tempo? In caso contrario, posso ottenere indicazioni su come avvengono queste operazioni di lettura?
Risposte
Non mi è del tutto chiaro esattamente come stai configurando l'applicazione client che sta caricando i dati. La mia impressione iniziale è che la tua applicazione client potrebbe non eseguire abbastanza transazioni in parallelo. Normalmente dovresti essere in grado di inserire più di 1.000 righe al secondo, ma richiederebbe l'esecuzione di più transazioni in parallelo, possibilmente da più VM. Ho usato il seguente semplice esempio per testare il throughput di carico dalla mia macchina locale a un'istanza Spanner a nodo singolo e questo mi ha dato un throughput di circa 1.500 righe / secondo.
Una configurazione multi-nodo che utilizza un'applicazione client in esecuzione in una o più VM nella stessa area di rete dell'istanza Spanner dovrebbe essere in grado di raggiungere volumi più elevati di quelli.
import com.google.api.client.util.Base64;
import com.google.common.base.Stopwatch;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class TestJdbc {
public static void main(String[] args) {
final int threads = 512;
ExecutorService executor = Executors.newFixedThreadPool(threads);
watch = Stopwatch.createStarted();
for (int i = 0; i < threads; i++) {
executor.submit(new InsertRunnable());
}
}
static final AtomicLong rowCount = new AtomicLong();
static Stopwatch watch;
static final class InsertRunnable implements Runnable {
@Override
public void run() {
try (Connection connection =
DriverManager.getConnection(
"jdbc:cloudspanner:/projects/my-project/instances/my-instance/databases/my-db")) {
while (true) {
try (PreparedStatement ps =
connection.prepareStatement("INSERT INTO Test (Id, Col1, Col2) VALUES (?, ?, ?)")) {
for (int i = 0; i < 150; i++) {
ps.setLong(1, rnd.nextLong());
ps.setString(2, randomString(100));
ps.setString(3, randomString(100));
ps.addBatch();
rowCount.incrementAndGet();
}
ps.executeBatch();
}
System.out.println("Rows inserted: " + rowCount);
System.out.println("Rows/second: " + rowCount.get() / watch.elapsed(TimeUnit.SECONDS));
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
private final Random rnd = new Random();
private String randomString(int maxLength) {
byte[] bytes = new byte[rnd.nextInt(maxLength / 2) + 1];
rnd.nextBytes(bytes);
return Base64.encodeBase64String(bytes);
}
}
}
Ci sono anche un paio di altre cose che potresti provare a mettere a punto per ottenere risultati migliori:
- La riduzione del numero di righe per batch potrebbe produrre risultati complessivi migliori.
- Se possibile, l'utilizzo di
InsertOrUpdate
oggetti di mutazione è molto più efficiente rispetto all'utilizzo di istruzioni DML (vedere l'esempio sotto).
Esempio di utilizzo al Mutation
posto di DML:
import com.google.api.client.util.Base64;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.jdbc.CloudSpannerJdbcConnection;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class TestJdbc {
public static void main(String[] args) {
final int threads = 512;
ExecutorService executor = Executors.newFixedThreadPool(threads);
watch = Stopwatch.createStarted();
for (int i = 0; i < threads; i++) {
executor.submit(new InsertOrUpdateMutationRunnable());
}
}
static final AtomicLong rowCount = new AtomicLong();
static Stopwatch watch;
static final class InsertOrUpdateMutationRunnable implements Runnable {
@Override
public void run() {
try (Connection connection =
DriverManager.getConnection(
"jdbc:cloudspanner:/projects/my-project/instances/my-instance/databases/my-db")) {
CloudSpannerJdbcConnection csConnection = connection.unwrap(CloudSpannerJdbcConnection.class);
CloudSpannerJdbcConnection csConnection =
connection.unwrap(CloudSpannerJdbcConnection.class);
while (true) {
ImmutableList.Builder<Mutation> builder = ImmutableList.builder();
for (int i = 0; i < 150; i++) {
builder.add(
Mutation.newInsertOrUpdateBuilder("Test")
.set("Id")
.to(rnd.nextLong())
.set("Col1")
.to(randomString(100))
.set("Col2")
.to(randomString(100))
.build());
rowCount.incrementAndGet();
}
csConnection.write(builder.build());
System.out.println("Rows inserted: " + rowCount);
System.out.println("Rows/second: " + rowCount.get() / watch.elapsed(TimeUnit.SECONDS));
}
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
private final Random rnd = new Random();
private String randomString(int maxLength) {
byte[] bytes = new byte[rnd.nextInt(maxLength / 2) + 1];
rnd.nextBytes(bytes);
return Base64.encodeBase64String(bytes);
}
}
}
Il semplice esempio sopra mi dà una velocità effettiva di circa 35.000 righe / secondo senza ulteriori regolazioni.
INFORMAZIONI AGGIUNTIVE 21-08-2020 : Il motivo per cui gli oggetti mutazione sono più efficienti delle istruzioni DML (batch), è che le istruzioni DML vengono convertite internamente in query di lettura da Cloud Spanner, che vengono quindi utilizzate per creare mutazioni. Questa conversione deve essere eseguita per ogni istruzione DML in un batch, il che significa che un batch DML con 1.500 semplici istruzioni di inserimento attiverà 1.500 query di lettura (piccole) e dovrà essere convertito in 1.500 mutazioni. Questo è molto probabilmente anche il motivo alla base della latenza di lettura che stai vedendo nel tuo monitoraggio.
Altrimenti ti dispiacerebbe condividere qualche informazione in più sull'aspetto della tua applicazione client e su quante istanze stai eseguendo?
Con più di 800 milioni di righe da inserire e visto che sei un programmatore Java, posso suggerire di utilizzare Beam su Dataflow?
Il programma di scrittura in Beam è progettato per essere il più efficiente possibile con le sue scritture, raggruppando le righe in base a una chiave simile e raggruppandole in batch mentre lo fai. Beam on Dataflow può anche utilizzare diverse VM worker per eseguire più letture di file e scritture spanner in parallelo ...
Con un'istanza di chiave multiregione, dovresti essere in grado di ottenere circa 1800 righe per nodo al secondo di velocità di inserimento (di più se le righe sono piccole e raggruppate, come suggerisce la risposta di Knut) e con 5 nodi chiave, probabilmente puoi avere tra 10 e 20 thread dell'importatore in esecuzione in parallelo, sia utilizzando il programma di importazione che utilizzando Dataflow.
(informativa: sono il manutentore di Beam SpannerIO)