HCatalog - wejściowy format wyjściowy

Plik HCatInputFormat i HCatOutputFormatinterfejsy są używane do odczytu danych z HDFS i po przetworzeniu, zapisują wynikowe dane do HDFS przy użyciu zadania MapReduce. Omówmy interfejsy formatu wejściowego i wyjściowego.

HCatInputFormat

Plik HCatInputFormatjest używany z zadaniami MapReduce do odczytu danych z tabel zarządzanych przez HCatalog. HCatInputFormat udostępnia interfejs API Hadoop 0,20 MapReduce do odczytywania danych tak, jakby zostały opublikowane w tabeli.

Sr.No. Nazwa i opis metody
1

public static HCatInputFormat setInput(Job job, String dbName, String tableName)throws IOException

Ustaw dane wejściowe do użycia w zadaniu. Odpytuje metastore z podaną specyfikacją wejściową i serializuje pasujące partycje do konfiguracji zadania dla zadań MapReduce.

2

public static HCatInputFormat setInput(Configuration conf, String dbName, String tableName) throws IOException

Ustaw dane wejściowe do użycia w zadaniu. Odpytuje metastore z podaną specyfikacją wejściową i serializuje pasujące partycje do konfiguracji zadania dla zadań MapReduce.

3

public HCatInputFormat setFilter(String filter)throws IOException

Ustaw filtr w tabeli wejściowej.

4

public HCatInputFormat setProperties(Properties properties) throws IOException

Ustaw właściwości formatu wejściowego.

Plik HCatInputFormat API zawiera następujące metody -

  • setInput
  • setOutputSchema
  • getTableSchema

Używać HCatInputFormat aby odczytać dane, najpierw utwórz wystąpienie InputJobInfo odczytać niezbędne informacje z tabeli, a następnie zadzwonić setInput z InputJobInfo.

Możesz użyć setOutputSchema metoda włączenia pliku projection schema, aby określić pola wyjściowe. Jeśli schemat nie zostanie określony, zostaną zwrócone wszystkie kolumny w tabeli. Możesz użyć metody getTableSchema, aby określić schemat tabeli dla określonej tabeli wejściowej.

HCatOutputFormat

HCatOutputFormat jest używany z zadaniami MapReduce do zapisywania danych w tabelach zarządzanych przez HCatalog. HCatOutputFormat uwidacznia interfejs API Hadoop 0,20 MapReduce do zapisywania danych w tabeli. Gdy zadanie MapReduce używa HCatOutputFormat do zapisu danych wyjściowych, używany jest domyślny format OutputFormat skonfigurowany dla tabeli, a nowa partycja jest publikowana w tabeli po zakończeniu zadania.

Sr.No. Nazwa i opis metody
1

public static void setOutput (Configuration conf, Credentials credentials, OutputJobInfo outputJobInfo) throws IOException

Ustaw informacje o danych wyjściowych do zapisania dla zadania. Wysyła kwerendę do serwera metadanych, aby znaleźć StorageHandler do użycia dla tabeli. Zgłasza błąd, jeśli partycja jest już opublikowana.

2

public static void setSchema (Configuration conf, HCatSchema schema) throws IOException

Ustaw schemat danych zapisywanych na partycję. Schemat tabeli jest używany domyślnie dla partycji, jeśli nie zostanie wywołany.

3

public RecordWriter <WritableComparable<?>, HCatRecord > getRecordWriter (TaskAttemptContext context)throws IOException, InterruptedException

Zdobądź nagrywarkę do pracy. Używa domyślnego OutputFormat obiektu StorageHandler, aby pobrać moduł zapisujący.

4

public OutputCommitter getOutputCommitter (TaskAttemptContext context) throws IOException, InterruptedException

Pobierz program zatwierdzający dane wyjściowe dla tego formatu wyjściowego. Zapewnia prawidłowe zatwierdzenie danych wyjściowych.

Plik HCatOutputFormat API zawiera następujące metody -

  • setOutput
  • setSchema
  • getTableSchema

Pierwsze wywołanie HCatOutputFormat musi być setOutput; każde inne wywołanie zgłosi wyjątek z informacją, że format wyjściowy nie został zainicjowany.

Schemat zapisywanych danych jest określony przez setSchemametoda. Musisz wywołać tę metodę, podając schemat danych, które piszesz. Jeśli Twoje dane mają ten sam schemat co schemat tabeli, możesz użyćHCatOutputFormat.getTableSchema() aby pobrać schemat tabeli, a następnie przekazać go dalej do setSchema().

Przykład

Poniższy program MapReduce odczytuje dane z jednej tabeli, w przypadku której zakłada, że ​​w drugiej kolumnie („kolumna 1”) znajduje się liczba całkowita, i zlicza, ile wystąpień każdej znalezionej odrębnej wartości. Oznacza to, że działa tak samo, jak „select col1, count(*) from $table group by col1;”.

Na przykład, jeśli wartości w drugiej kolumnie to {1, 1, 1, 3, 3, 5}, program wygeneruje następujące dane wyjściowe wartości i zliczeń -

1, 3
3, 2
5, 1

Przyjrzyjmy się teraz kodowi programu -

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import org.apache.HCatalog.common.HCatConstants;
import org.apache.HCatalog.data.DefaultHCatRecord;
import org.apache.HCatalog.data.HCatRecord;
import org.apache.HCatalog.data.schema.HCatSchema;

import org.apache.HCatalog.mapreduce.HCatInputFormat;
import org.apache.HCatalog.mapreduce.HCatOutputFormat;
import org.apache.HCatalog.mapreduce.InputJobInfo;
import org.apache.HCatalog.mapreduce.OutputJobInfo;

public class GroupByAge extends Configured implements Tool {

   public static class Map extends Mapper<WritableComparable, 
      HCatRecord, IntWritable, IntWritable> {
      int age;
		
      @Override
      protected void map(
         WritableComparable key, HCatRecord value,
         org.apache.hadoop.mapreduce.Mapper<WritableComparable,
         HCatRecord, IntWritable, IntWritable>.Context context
      )throws IOException, InterruptedException {
         age = (Integer) value.get(1);
         context.write(new IntWritable(age), new IntWritable(1));
      }
   }
	
   public static class Reduce extends Reducer<IntWritable, IntWritable,
      WritableComparable, HCatRecord> {
      @Override
      protected void reduce(
         IntWritable key, java.lang.Iterable<IntWritable> values,
         org.apache.hadoop.mapreduce.Reducer<IntWritable, IntWritable,
         WritableComparable, HCatRecord>.Context context
      )throws IOException ,InterruptedException {
         int sum = 0;
         Iterator<IntWritable> iter = values.iterator();
			
         while (iter.hasNext()) {
            sum++;
            iter.next();
         }
			
         HCatRecord record = new DefaultHCatRecord(2);
         record.set(0, key.get());
         record.set(1, sum);
         context.write(null, record);
      }
   }
	
   public int run(String[] args) throws Exception {
      Configuration conf = getConf();
      args = new GenericOptionsParser(conf, args).getRemainingArgs();
		
      String serverUri = args[0];
      String inputTableName = args[1];
      String outputTableName = args[2];
      String dbName = null;
      String principalID = System
		
      .getProperty(HCatConstants.HCAT_METASTORE_PRINCIPAL);
      if (principalID != null)
      conf.set(HCatConstants.HCAT_METASTORE_PRINCIPAL, principalID);
      Job job = new Job(conf, "GroupByAge");
      HCatInputFormat.setInput(job, InputJobInfo.create(dbName, inputTableName, null));

      // initialize HCatOutputFormat
      job.setInputFormatClass(HCatInputFormat.class);
      job.setJarByClass(GroupByAge.class);
      job.setMapperClass(Map.class);
      job.setReducerClass(Reduce.class);
		
      job.setMapOutputKeyClass(IntWritable.class);
      job.setMapOutputValueClass(IntWritable.class);
      job.setOutputKeyClass(WritableComparable.class);
      job.setOutputValueClass(DefaultHCatRecord.class);
		
      HCatOutputFormat.setOutput(job, OutputJobInfo.create(dbName, outputTableName, null));
      HCatSchema s = HCatOutputFormat.getTableSchema(job);
      System.err.println("INFO: output schema explicitly set for writing:" + s);
      HCatOutputFormat.setSchema(job, s);
      job.setOutputFormatClass(HCatOutputFormat.class);
      return (job.waitForCompletion(true) ? 0 : 1);
   }
	
   public static void main(String[] args) throws Exception {
      int exitCode = ToolRunner.run(new GroupByAge(), args);
      System.exit(exitCode);
   }
}

Przed skompilowaniem powyższego programu musisz pobrać trochę jars i dodaj je do classpathdla tej aplikacji. Musisz pobrać wszystkie słoiki Hive i HCatalog (HCatalog-core-0.5.0.jar, hive-metastore-0.10.0.jar, libthrift-0.7.0.jar, hive-exec-0.10.0.jar, libfb303-0.7.0.jar, jdo2-api-2.3-ec.jar, slf4j-api-1.6.1.jar).

Użyj następujących poleceń, aby je skopiować jar pliki z local do HDFS i dodaj je do classpath.

bin/hadoop fs -copyFromLocal $HCAT_HOME/share/HCatalog/HCatalog-core-0.5.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-metastore-0.10.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/libthrift-0.7.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-exec-0.10.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/libfb303-0.7.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/jdo2-api-2.3-ec.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/slf4j-api-1.6.1.jar /tmp

export LIB_JARS=hdfs:///tmp/HCatalog-core-0.5.0.jar,
hdfs:///tmp/hive-metastore-0.10.0.jar,
hdfs:///tmp/libthrift-0.7.0.jar,
hdfs:///tmp/hive-exec-0.10.0.jar,
hdfs:///tmp/libfb303-0.7.0.jar,
hdfs:///tmp/jdo2-api-2.3-ec.jar,
hdfs:///tmp/slf4j-api-1.6.1.jar

Użyj następującego polecenia, aby skompilować i wykonać podany program.

$HADOOP_HOME/bin/hadoop jar GroupByAge tmp/hive

Teraz sprawdź katalog wyjściowy (hdfs: user / tmp / hive) pod kątem danych wyjściowych (part_0000, part_0001).