HCatalog - Читатель и писатель
HCatalog содержит API передачи данных для параллельного ввода и вывода без использования MapReduce. Этот API использует базовую абстракцию хранилища таблиц и строк для чтения данных из кластера Hadoop и записи в него данных.
API передачи данных содержит в основном три класса; это -
HCatReader - Считывает данные из кластера Hadoop.
HCatWriter - Записывает данные в кластер Hadoop.
DataTransferFactory - Создает экземпляры читателя и писателя.
Этот API подходит для настройки главного-подчиненного узла. Давайте обсудим подробнееHCatReader и HCatWriter.
HCatReader
HCatReader - это внутренний по отношению к HCatalog абстрактный класс, который абстрагирует сложность базовой системы, из которой должны быть получены записи.
Sr. No. | Название и описание метода |
---|---|
1 | Public abstract ReaderContext prepareRead() throws HCatException Это должно быть вызвано на главном узле, чтобы получить ReaderContext, который затем должен быть сериализован и отправлен подчиненным узлам. |
2 | Public abstract Iterator <HCatRecorder> read() throws HCaException Это должно вызываться на подчиненных узлах для чтения HCatRecords. |
3 | Public Configuration getConf() Он вернет объект класса конфигурации. |
Класс HCatReader используется для чтения данных из HDFS. Чтение - это двухэтапный процесс, в котором первый шаг выполняется на главном узле внешней системы. Второй шаг выполняется параллельно на нескольких подчиненных узлах.
Чтения выполняются на ReadEntity. Прежде чем вы начнете читать, вам нужно определить ReadEntity, из которого следует читать. Это можно сделать черезReadEntity.Builder. Вы можете указать имя базы данных, имя таблицы, раздел и строку фильтра. Например -
ReadEntity.Builder builder = new ReadEntity.Builder();
ReadEntity entity = builder.withDatabase("mydb").withTable("mytbl").build(); 10.
Приведенный выше фрагмент кода определяет объект ReadEntity («сущность»), содержащий таблицу с именем mytbl в базе данных с именем mydb, который можно использовать для чтения всех строк этой таблицы. Обратите внимание, что эта таблица должна существовать в HCatalog до начала этой операции.
После определения ReadEntity вы получаете экземпляр HCatReader, используя ReadEntity и конфигурацию кластера -
HCatReader reader = DataTransferFactory.getHCatReader(entity, config);
Следующим шагом является получение ReaderContext от читателя следующим образом:
ReaderContext cntxt = reader.prepareRead();
HCatWriter
Эта абстракция является внутренней для HCatalog. Это сделано для облегчения записи в HCatalog из внешних систем. Не пытайтесь создать это напрямую. Вместо этого используйте DataTransferFactory.
Sr. No. | Название и описание метода |
---|---|
1 | Public abstract WriterContext prepareRead() throws HCatException Внешняя система должна вызывать этот метод только один раз с главного узла. Он возвращаетWriterContext. Это должно быть сериализовано и отправлено на подчиненные узлы для созданияHCatWriter там. |
2 | Public abstract void write(Iterator<HCatRecord> recordItr) throws HCaException Этот метод следует использовать на подчиненных узлах для выполнения записи. RecordItr - это объект-итератор, содержащий коллекцию записей для записи в HCatalog. |
3 | Public abstract void abort(WriterContext cntxt) throws HCatException Этот метод следует вызывать на главном узле. Основная цель этого метода - очистка в случае сбоев. |
4 | public abstract void commit(WriterContext cntxt) throws HCatException Этот метод следует вызывать на главном узле. Цель этого метода - выполнить фиксацию метаданных. |
Подобно чтению, запись также представляет собой двухэтапный процесс, в котором первый шаг выполняется на главном узле. Впоследствии второй этап выполняется параллельно на подчиненных узлах.
Записи делаются на WriteEntity который может быть построен аналогично чтению -
WriteEntity.Builder builder = new WriteEntity.Builder();
WriteEntity entity = builder.withDatabase("mydb").withTable("mytbl").build();
Приведенный выше код создает объект WriteEntity, entity
который можно использовать для записи в таблицу с именемmytbl в базе данных mydb.
После создания WriteEntity следующим шагом будет получение WriterContext -
HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config);
WriterContext info = writer.prepareWrite();
Все вышеперечисленные шаги выполняются на главном узле. Затем главный узел сериализует объект WriterContext и делает его доступным для всех ведомых устройств.
На подчиненных узлах вам необходимо получить HCatWriter с помощью WriterContext следующим образом:
HCatWriter writer = DataTransferFactory.getHCatWriter(context);
Затем writerпринимает итератор в качестве аргумента для write
метода -
writer.write(hCatRecordItr);
В writer затем звонит getNext() на этом итераторе в цикле и записывает все записи, прикрепленные к итератору.
В TestReaderWriter.javaфайл используется для тестирования классов HCatreader и HCatWriter. Следующая программа демонстрирует, как использовать HCatReader и HCatWriter API для чтения данных из исходного файла и последующей записи их в целевой файл.
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hive.HCatalog.common.HCatException;
import org.apache.hive.HCatalog.data.transfer.DataTransferFactory;
import org.apache.hive.HCatalog.data.transfer.HCatReader;
import org.apache.hive.HCatalog.data.transfer.HCatWriter;
import org.apache.hive.HCatalog.data.transfer.ReadEntity;
import org.apache.hive.HCatalog.data.transfer.ReaderContext;
import org.apache.hive.HCatalog.data.transfer.WriteEntity;
import org.apache.hive.HCatalog.data.transfer.WriterContext;
import org.apache.hive.HCatalog.mapreduce.HCatBaseTest;
import org.junit.Assert;
import org.junit.Test;
public class TestReaderWriter extends HCatBaseTest {
@Test
public void test() throws MetaException, CommandNeedRetryException,
IOException, ClassNotFoundException {
driver.run("drop table mytbl");
driver.run("create table mytbl (a string, b int)");
Iterator<Entry<String, String>> itr = hiveConf.iterator();
Map<String, String> map = new HashMap<String, String>();
while (itr.hasNext()) {
Entry<String, String> kv = itr.next();
map.put(kv.getKey(), kv.getValue());
}
WriterContext cntxt = runsInMaster(map);
File writeCntxtFile = File.createTempFile("hcat-write", "temp");
writeCntxtFile.deleteOnExit();
// Serialize context.
ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(writeCntxtFile));
oos.writeObject(cntxt);
oos.flush();
oos.close();
// Now, deserialize it.
ObjectInputStream ois = new ObjectInputStream(new FileInputStream(writeCntxtFile));
cntxt = (WriterContext) ois.readObject();
ois.close();
runsInSlave(cntxt);
commit(map, true, cntxt);
ReaderContext readCntxt = runsInMaster(map, false);
File readCntxtFile = File.createTempFile("hcat-read", "temp");
readCntxtFile.deleteOnExit();
oos = new ObjectOutputStream(new FileOutputStream(readCntxtFile));
oos.writeObject(readCntxt);
oos.flush();
oos.close();
ois = new ObjectInputStream(new FileInputStream(readCntxtFile));
readCntxt = (ReaderContext) ois.readObject();
ois.close();
for (int i = 0; i < readCntxt.numSplits(); i++) {
runsInSlave(readCntxt, i);
}
}
private WriterContext runsInMaster(Map<String, String> config) throws HCatException {
WriteEntity.Builder builder = new WriteEntity.Builder();
WriteEntity entity = builder.withTable("mytbl").build();
HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config);
WriterContext info = writer.prepareWrite();
return info;
}
private ReaderContext runsInMaster(Map<String, String> config,
boolean bogus) throws HCatException {
ReadEntity entity = new ReadEntity.Builder().withTable("mytbl").build();
HCatReader reader = DataTransferFactory.getHCatReader(entity, config);
ReaderContext cntxt = reader.prepareRead();
return cntxt;
}
private void runsInSlave(ReaderContext cntxt, int slaveNum) throws HCatException {
HCatReader reader = DataTransferFactory.getHCatReader(cntxt, slaveNum);
Iterator<HCatRecord> itr = reader.read();
int i = 1;
while (itr.hasNext()) {
HCatRecord read = itr.next();
HCatRecord written = getRecord(i++);
// Argh, HCatRecord doesnt implement equals()
Assert.assertTrue("Read: " + read.get(0) + "Written: " + written.get(0),
written.get(0).equals(read.get(0)));
Assert.assertTrue("Read: " + read.get(1) + "Written: " + written.get(1),
written.get(1).equals(read.get(1)));
Assert.assertEquals(2, read.size());
}
//Assert.assertFalse(itr.hasNext());
}
private void runsInSlave(WriterContext context) throws HCatException {
HCatWriter writer = DataTransferFactory.getHCatWriter(context);
writer.write(new HCatRecordItr());
}
private void commit(Map<String, String> config, boolean status,
WriterContext context) throws IOException {
WriteEntity.Builder builder = new WriteEntity.Builder();
WriteEntity entity = builder.withTable("mytbl").build();
HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config);
if (status) {
writer.commit(context);
} else {
writer.abort(context);
}
}
private static HCatRecord getRecord(int i) {
List<Object> list = new ArrayList<Object>(2);
list.add("Row #: " + i);
list.add(i);
return new DefaultHCatRecord(list);
}
private static class HCatRecordItr implements Iterator<HCatRecord> {
int i = 0;
@Override
public boolean hasNext() {
return i++ < 100 ? true : false;
}
@Override
public HCatRecord next() {
return getRecord(i);
}
@Override
public void remove() {
throw new RuntimeException();
}
}
}
Вышеупомянутая программа считывает данные из HDFS в виде записей и записывает данные записи в mytable