HCatalog - Reader Writer

HCatalog chứa API truyền dữ liệu cho đầu vào và đầu ra song song mà không cần sử dụng MapReduce. API này sử dụng trừu tượng lưu trữ cơ bản của các bảng và hàng để đọc dữ liệu từ cụm Hadoop và ghi dữ liệu vào đó.

API truyền dữ liệu chủ yếu chứa ba lớp; đó là -

  • HCatReader - Đọc dữ liệu từ một cụm Hadoop.

  • HCatWriter - Ghi dữ liệu vào một cụm Hadoop.

  • DataTransferFactory - Tạo cá thể người đọc và người viết.

API này phù hợp để thiết lập nút chủ-tớ. Hãy để chúng tôi thảo luận thêm vềHCatReaderHCatWriter.

HCatReader

HCatReader là một lớp trừu tượng bên trong HCatalog và trừu tượng hóa sự phức tạp của hệ thống cơ bản từ nơi các bản ghi sẽ được truy xuất.

Sr.No. Tên & Mô tả phương pháp
1

Public abstract ReaderContext prepareRead() throws HCatException

Điều này sẽ được gọi tại nút chính để lấy ReaderContext mà sau đó sẽ được tuần tự hóa và gửi các nút phụ.

2

Public abstract Iterator <HCatRecorder> read() throws HCaException

Điều này sẽ được gọi tại các nút nô lệ để đọc HCatRecords.

3

Public Configuration getConf()

Nó sẽ trả về đối tượng lớp cấu hình.

Lớp HCatReader được sử dụng để đọc dữ liệu từ HDFS. Đọc là một quá trình gồm hai bước, trong đó bước đầu tiên xảy ra trên nút chính của hệ thống bên ngoài. Bước thứ hai được thực hiện song song trên nhiều nút phụ.

Các bài đọc được thực hiện trên một ReadEntity. Trước khi bắt đầu đọc, bạn cần xác định ReadEntity để đọc. Điều này có thể được thực hiện thông quaReadEntity.Builder. Bạn có thể chỉ định tên cơ sở dữ liệu, tên bảng, phân vùng và chuỗi bộ lọc. Ví dụ -

ReadEntity.Builder builder = new ReadEntity.Builder();
ReadEntity entity = builder.withDatabase("mydb").withTable("mytbl").build(); 10.

Đoạn mã trên xác định đối tượng ReadEntity (“thực thể”), bao gồm một bảng có tên mytbl trong một cơ sở dữ liệu có tên mydb, có thể được sử dụng để đọc tất cả các hàng của bảng này. Lưu ý rằng bảng này phải tồn tại trong HCatalog trước khi bắt đầu thao tác này.

Sau khi xác định ReadEntity, bạn có được một phiên bản của HCatReader bằng cách sử dụng cấu hình ReadEntity và cụm -

HCatReader reader = DataTransferFactory.getHCatReader(entity, config);

Bước tiếp theo là lấy ReaderContext từ trình đọc như sau:

ReaderContext cntxt = reader.prepareRead();

HCatWriter

Sự trừu tượng này là nội bộ của HCatalog. Điều này là để tạo điều kiện cho việc ghi vào HCatalog từ các hệ thống bên ngoài. Đừng cố gắng thực hiện điều này một cách trực tiếp. Thay vào đó, hãy sử dụng DataTransferFactory.

Sr.No. Tên & Mô tả phương pháp
1

Public abstract WriterContext prepareRead() throws HCatException

Hệ thống bên ngoài sẽ gọi phương thức này chính xác một lần từ một nút chính. Nó trả về mộtWriterContext. Điều này sẽ được tuần tự hóa và gửi đến các nút phụ để xây dựngHCatWriter ở đó.

2

Public abstract void write(Iterator<HCatRecord> recordItr) throws HCaException

Phương thức này nên được sử dụng tại các nút nô lệ để thực hiện ghi. RecordItr là một đối tượng trình lặp có chứa tập hợp các bản ghi được ghi vào HCatalog.

3

Public abstract void abort(WriterContext cntxt) throws HCatException

Phương thức này nên được gọi tại nút chính. Mục đích chính của phương pháp này là dọn dẹp trong trường hợp hỏng hóc.

4

public abstract void commit(WriterContext cntxt) throws HCatException

Phương thức này nên được gọi tại nút chính. Mục đích của phương pháp này là thực hiện cam kết siêu dữ liệu.

Tương tự như đọc, ghi cũng là một quá trình gồm hai bước, trong đó bước đầu tiên xảy ra trên nút chính. Sau đó, bước thứ hai xảy ra song song trên các nút phụ.

Viết được thực hiện trên một WriteEntity có thể được xây dựng theo cách tương tự như đọc -

WriteEntity.Builder builder = new WriteEntity.Builder();
WriteEntity entity = builder.withDatabase("mydb").withTable("mytbl").build();

Đoạn mã trên tạo một đối tượng WriteEntity entitycó thể được sử dụng để ghi vào một bảng có tênmytbl trong cơ sở dữ liệu mydb.

Sau khi tạo WriteEntity, bước tiếp theo là lấy WriterContext -

HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config);
WriterContext info = writer.prepareWrite();

Tất cả các bước trên đều xảy ra trên nút chính. Sau đó, nút chính sẽ tuần tự hóa đối tượng WriterContext và cung cấp nó cho tất cả các nô lệ.

Trên các nút nô lệ, bạn cần lấy một HCatWriter bằng WriterContext như sau:

HCatWriter writer = DataTransferFactory.getHCatWriter(context);

Sau đó, writerlấy một trình lặp làm đối số cho writephương thức -

writer.write(hCatRecordItr);

Các writer sau đó gọi getNext() trên trình lặp này trong một vòng lặp và ghi ra tất cả các bản ghi được đính kèm với trình lặp.

Các TestReaderWriter.javatệp được sử dụng để kiểm tra các lớp HCatreader và HCatWriter. Chương trình sau đây trình bày cách sử dụng HCatReader và HCatWriter API để đọc dữ liệu từ tệp nguồn và sau đó ghi dữ liệu đó vào tệp đích.

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hadoop.mapreduce.InputSplit;

import org.apache.hive.HCatalog.common.HCatException;
import org.apache.hive.HCatalog.data.transfer.DataTransferFactory;
import org.apache.hive.HCatalog.data.transfer.HCatReader;
import org.apache.hive.HCatalog.data.transfer.HCatWriter;
import org.apache.hive.HCatalog.data.transfer.ReadEntity;
import org.apache.hive.HCatalog.data.transfer.ReaderContext;
import org.apache.hive.HCatalog.data.transfer.WriteEntity;
import org.apache.hive.HCatalog.data.transfer.WriterContext;
import org.apache.hive.HCatalog.mapreduce.HCatBaseTest;

import org.junit.Assert;
import org.junit.Test;

public class TestReaderWriter extends HCatBaseTest {
   @Test
   public void test() throws MetaException, CommandNeedRetryException,
      IOException, ClassNotFoundException {
		
      driver.run("drop table mytbl");
      driver.run("create table mytbl (a string, b int)");
		
      Iterator<Entry<String, String>> itr = hiveConf.iterator();
      Map<String, String> map = new HashMap<String, String>();
		
      while (itr.hasNext()) {
         Entry<String, String> kv = itr.next();
         map.put(kv.getKey(), kv.getValue());
      }
		
      WriterContext cntxt = runsInMaster(map);
      File writeCntxtFile = File.createTempFile("hcat-write", "temp");
      writeCntxtFile.deleteOnExit();
		
      // Serialize context.
      ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(writeCntxtFile));
      oos.writeObject(cntxt);
      oos.flush();
      oos.close();
		
      // Now, deserialize it.
      ObjectInputStream ois = new ObjectInputStream(new FileInputStream(writeCntxtFile));
      cntxt = (WriterContext) ois.readObject();
      ois.close();
      runsInSlave(cntxt);
      commit(map, true, cntxt);
		
      ReaderContext readCntxt = runsInMaster(map, false);
      File readCntxtFile = File.createTempFile("hcat-read", "temp");
      readCntxtFile.deleteOnExit();
      oos = new ObjectOutputStream(new FileOutputStream(readCntxtFile));
      oos.writeObject(readCntxt);
      oos.flush();
      oos.close();
		
      ois = new ObjectInputStream(new FileInputStream(readCntxtFile));
      readCntxt = (ReaderContext) ois.readObject();
      ois.close();
		
      for (int i = 0; i < readCntxt.numSplits(); i++) {
         runsInSlave(readCntxt, i);
      }
   }
	
   private WriterContext runsInMaster(Map<String, String> config) throws HCatException {
      WriteEntity.Builder builder = new WriteEntity.Builder();
      WriteEntity entity = builder.withTable("mytbl").build();
		
      HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config);
      WriterContext info = writer.prepareWrite();
      return info;
   }
	
   private ReaderContext runsInMaster(Map<String, String> config, 
      boolean bogus) throws HCatException {
      ReadEntity entity = new ReadEntity.Builder().withTable("mytbl").build();
      HCatReader reader = DataTransferFactory.getHCatReader(entity, config);
      ReaderContext cntxt = reader.prepareRead();
      return cntxt;
   }
	
   private void runsInSlave(ReaderContext cntxt, int slaveNum) throws HCatException {
      HCatReader reader = DataTransferFactory.getHCatReader(cntxt, slaveNum);
      Iterator<HCatRecord> itr = reader.read();
      int i = 1;
		
      while (itr.hasNext()) {
         HCatRecord read = itr.next();
         HCatRecord written = getRecord(i++);
			
         // Argh, HCatRecord doesnt implement equals()
         Assert.assertTrue("Read: " + read.get(0) + "Written: " + written.get(0),
         written.get(0).equals(read.get(0)));
			
         Assert.assertTrue("Read: " + read.get(1) + "Written: " + written.get(1),
         written.get(1).equals(read.get(1)));
			
         Assert.assertEquals(2, read.size());
      }
		
      //Assert.assertFalse(itr.hasNext());
   }
	
   private void runsInSlave(WriterContext context) throws HCatException {
      HCatWriter writer = DataTransferFactory.getHCatWriter(context);
      writer.write(new HCatRecordItr());
   }
	
   private void commit(Map<String, String> config, boolean status,
      WriterContext context) throws IOException {
      WriteEntity.Builder builder = new WriteEntity.Builder();
      WriteEntity entity = builder.withTable("mytbl").build();
      HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config);
		
      if (status) {
         writer.commit(context);
      } else {
         writer.abort(context);
      }
   }
	
   private static HCatRecord getRecord(int i) {
      List<Object> list = new ArrayList<Object>(2);
      list.add("Row #: " + i);
      list.add(i);
      return new DefaultHCatRecord(list);
   }
	
   private static class HCatRecordItr implements Iterator<HCatRecord> {
      int i = 0;
		
      @Override
      public boolean hasNext() {
         return i++ < 100 ? true : false;
      }
		
      @Override
      public HCatRecord next() {
         return getRecord(i);
      }
		
      @Override
      public void remove() {
         throw new RuntimeException();
      }
   }
}

Chương trình trên đọc dữ liệu từ HDFS dưới dạng bản ghi và ghi dữ liệu bản ghi vào mytable