jdbc를 사용하여 Spanner에 행을 일괄 삽입하는 동안 낮은로드 성능
배경 : TSV 형식의 데이터 파일 (MySQL 데이터베이스에서 덤프 됨)을 GCP Spanner 테이블에로드하려고합니다.
- 클라이언트 라이브러리 : 공식 Spanner JDBC 종속성 v1.15.0
- 테이블 스키마 : 2 개의 문자열 유형 열과 10 개의 int 유형 열
- GCP Spanner 인스턴스 : 노드가 5 개인 다중 지역 nam6으로 구성됨
내 로딩 프로그램은 GCP VM에서 실행되며 Spanner 인스턴스에 액세스하는 독점 클라이언트입니다. 자동 커밋이 활성화됩니다. 배치 삽입은 내 프로그램에서 실행되는 유일한 DML 작업이며 배치 크기는 약 1500입니다. 각 커밋에서 변형 제한 (20000)을 완전히 사용합니다. 동시에 커밋 크기는 5MB 미만입니다 (값 두 개의 문자열 유형 열 중 작은 크기). 행은 기본 키의 첫 번째 열을 기준으로 분할되므로 각 커밋은 성능 향상을 위해 매우 적은 파티션으로 전송 될 수 있습니다.
위의 모든 구성과 최적화를 통해 삽입 속도는 초당 약 1k 행에 불과합니다. 삽입 할 행이 8 억 개가 넘기 때문에 이것은 정말 실망 스럽습니다. 나는 공식 문서 가 약을 언급 했다는 것을 알아 차렸다 . 다중 지역 Spanner 인스턴스의 최대 쓰기 (총 QPS)는 1800입니다.
그래서 여기에 두 가지 질문이 있습니다.
- 이러한 낮은 피크 쓰기 QPS를 고려할 때 GCP가 대규모 데이터 세트를 다중 지역 Spanner 인스턴스로 마이그레이션하는 것을 고객이 기대하지 않거나 지원하지 않는다는 의미입니까?
- Spanner 모니터링에서 높은 읽기 지연 시간을 확인했습니다. 읽기 요청이 없습니다. 내 생각 엔 쓰기 행을 Spanner는 먼저 읽고 동일한 기본 키를 가진 행이 있는지 확인해야합니다. 내 추측이 맞다면 왜 그렇게 많은 시간이 걸리나요? 그렇지 않은 경우 이러한 읽기 작업이 어떻게 발생하는지에 대한 지침을 얻을 수 있습니까?
답변
데이터를로드하는 클라이언트 응용 프로그램을 어떻게 설정하는지 정확히 알 수 없습니다. 내 첫 인상은 클라이언트 응용 프로그램이 병렬로 충분한 트랜잭션을 실행하지 않을 수 있다는 것입니다. 일반적으로 초당 1,000 개 이상의 행을 삽입 할 수 있어야하지만 여러 VM에서 병렬로 여러 트랜잭션을 실행해야합니다. 다음의 간단한 예를 사용하여 로컬 머신에서 단일 노드 Spanner 인스턴스로 의 부하 처리량을 테스트했으며 , 처리량은 초당 약 1,500 행이었습니다.
Cloud Spanner 인스턴스와 동일한 네트워크 지역에있는 하나 이상의 VM에서 실행되는 클라이언트 애플리케이션을 사용하는 다중 노드 설정은 그보다 더 많은 볼륨을 달성 할 수 있어야합니다.
import com.google.api.client.util.Base64;
import com.google.common.base.Stopwatch;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class TestJdbc {
public static void main(String[] args) {
final int threads = 512;
ExecutorService executor = Executors.newFixedThreadPool(threads);
watch = Stopwatch.createStarted();
for (int i = 0; i < threads; i++) {
executor.submit(new InsertRunnable());
}
}
static final AtomicLong rowCount = new AtomicLong();
static Stopwatch watch;
static final class InsertRunnable implements Runnable {
@Override
public void run() {
try (Connection connection =
DriverManager.getConnection(
"jdbc:cloudspanner:/projects/my-project/instances/my-instance/databases/my-db")) {
while (true) {
try (PreparedStatement ps =
connection.prepareStatement("INSERT INTO Test (Id, Col1, Col2) VALUES (?, ?, ?)")) {
for (int i = 0; i < 150; i++) {
ps.setLong(1, rnd.nextLong());
ps.setString(2, randomString(100));
ps.setString(3, randomString(100));
ps.addBatch();
rowCount.incrementAndGet();
}
ps.executeBatch();
}
System.out.println("Rows inserted: " + rowCount);
System.out.println("Rows/second: " + rowCount.get() / watch.elapsed(TimeUnit.SECONDS));
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
private final Random rnd = new Random();
private String randomString(int maxLength) {
byte[] bytes = new byte[rnd.nextInt(maxLength / 2) + 1];
rnd.nextBytes(bytes);
return Base64.encodeBase64String(bytes);
}
}
}
더 나은 결과를 얻기 위해 조정할 수있는 몇 가지 다른 사항도 있습니다.
- 일괄 처리 당 행 수를 줄이면 더 나은 전체 결과를 얻을 수 있습니다.
- 가능하면
InsertOrUpdate
변형 객체를 사용하는 것이 DML 문을 사용하는 것보다 훨씬 효율적입니다 (아래 예제 참조).
Mutation
DML 대신 사용 하는 예 :
import com.google.api.client.util.Base64;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.jdbc.CloudSpannerJdbcConnection;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class TestJdbc {
public static void main(String[] args) {
final int threads = 512;
ExecutorService executor = Executors.newFixedThreadPool(threads);
watch = Stopwatch.createStarted();
for (int i = 0; i < threads; i++) {
executor.submit(new InsertOrUpdateMutationRunnable());
}
}
static final AtomicLong rowCount = new AtomicLong();
static Stopwatch watch;
static final class InsertOrUpdateMutationRunnable implements Runnable {
@Override
public void run() {
try (Connection connection =
DriverManager.getConnection(
"jdbc:cloudspanner:/projects/my-project/instances/my-instance/databases/my-db")) {
CloudSpannerJdbcConnection csConnection = connection.unwrap(CloudSpannerJdbcConnection.class);
CloudSpannerJdbcConnection csConnection =
connection.unwrap(CloudSpannerJdbcConnection.class);
while (true) {
ImmutableList.Builder<Mutation> builder = ImmutableList.builder();
for (int i = 0; i < 150; i++) {
builder.add(
Mutation.newInsertOrUpdateBuilder("Test")
.set("Id")
.to(rnd.nextLong())
.set("Col1")
.to(randomString(100))
.set("Col2")
.to(randomString(100))
.build());
rowCount.incrementAndGet();
}
csConnection.write(builder.build());
System.out.println("Rows inserted: " + rowCount);
System.out.println("Rows/second: " + rowCount.get() / watch.elapsed(TimeUnit.SECONDS));
}
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
private final Random rnd = new Random();
private String randomString(int maxLength) {
byte[] bytes = new byte[rnd.nextInt(maxLength / 2) + 1];
rnd.nextBytes(bytes);
return Base64.encodeBase64String(bytes);
}
}
}
위의 간단한 예는 추가 조정없이 초당 약 35,000 행의 처리량을 제공합니다.
추가 정보 2020-08-21 : 변형 객체가 (일괄) DML 문보다 효율적인 이유는 DML 문이 내부적으로 Cloud Spanner에서 쿼리를 읽도록 변환 된 다음 변형을 만드는 데 사용되기 때문입니다. 이 변환은 일괄 처리의 모든 DML 문에 대해 수행되어야합니다. 즉, 1,500 개의 간단한 삽입 문이있는 DML 일괄 처리는 1,500 (소규모) 읽기 쿼리를 트리거하고 1,500 개의 변형으로 변환해야합니다. 이것은 아마도 모니터링에서 보는 읽기 대기 시간의 원인 일 것입니다.
그렇지 않으면 클라이언트 응용 프로그램의 모양과 실행중인 인스턴스의 수에 대한 추가 정보를 공유 할 수 있습니까?
삽입 할 행이 8 억 개가 넘고 Java 프로그래머임을 확인한 경우 Beam on Dataflow를 사용하는 것이 좋습니다.
Beam 의 스패너 작성기 는 쓰기를 통해 가능한 한 효율적으로 설계되었습니다. 비슷한 키로 행을 그룹화하고 수행하는대로 일괄 처리합니다. Dataflow의 Beam은 여러 작업자 VM을 사용하여 여러 파일 읽기 및 스패너 쓰기를 병렬로 실행할 수도 있습니다.
다중 지역 스패너 인스턴스를 사용하면 초당 삽입 속도 당 노드 당 약 1800 개의 행 을 가져올 수 있어야하며 (Knut의 응답에서 알 수 있듯이 행이 작고 일괄 처리 된 경우 더 많음) 5 개의 스패너 노드를 사용하면 아마도 10 ~ 20 개 사이가 될 수 있습니다. 가져 오기 프로그램을 사용하든 Dataflow를 사용하든 병렬로 실행되는 가져 오기 스레드
(공개 : 저는 Beam SpannerIO 유지 관리자입니다)