📅  最后修改于: 2020-11-30 04:26:28             🧑  作者: Mango
HCatalog包含用于并行输入和输出而无需使用MapReduce的数据传输API。该API使用表和行的基本存储抽象来从Hadoop集群读取数据并将数据写入其中。
数据传输API主要包含三个类;这些是-
HCatReader-从Hadoop集群读取数据。
HCatWriter-将数据写入Hadoop集群。
DataTransferFactory-生成读取器和写入器实例。
该API适用于主从节点设置。让我们进一步讨论HCatReader和HCatWriter 。
HCatReader是HCatalog内部的抽象类,抽象出要从中检索记录的底层系统的复杂性。
Sr.No. | Method Name & Description |
---|---|
1 |
Public abstract ReaderContext prepareRead() throws HCatException This should be called at master node to obtain ReaderContext which then should be serialized and sent slave nodes. |
2 |
Public abstract Iterator This should be called at slaves nodes to read HCatRecords. |
3 |
Public Configuration getConf() It will return the configuration class object. |
HCatReader类用于从HDFS读取数据。读取是一个两步过程,其中第一步发生在外部系统的主节点上。第二步在多个从节点上并行执行。
读取是在ReadEntity上完成的。在开始阅读之前,您需要定义一个要从中读取的ReadEntity。这可以通过ReadEntity.Builder完成。您可以指定数据库名称,表名称,分区和过滤器字符串。例如-
ReadEntity.Builder builder = new ReadEntity.Builder();
ReadEntity entity = builder.withDatabase("mydb").withTable("mytbl").build(); 10.
上面的代码段定义了一个ReadEntity对象(“实体”),该对象包含一个名为mydb的数据库中名为mytbl的表,该表可用于读取该表的所有行。请注意,在开始此操作之前,该表必须存在于HCatalog中。
定义ReadEntity之后,您可以使用ReadEntity和集群配置获取HCatReader的实例-
HCatReader reader = DataTransferFactory.getHCatReader(entity, config);
下一步是从阅读器获取ReaderContext,如下所示:
ReaderContext cntxt = reader.prepareRead();
这种抽象是HCatalog的内部功能。这是为了方便从外部系统写入HCatalog。不要尝试直接实例化它。而是使用DataTransferFactory。
Sr.No. | Method Name & Description |
---|---|
1 |
Public abstract WriterContext prepareRead() throws HCatException External system should invoke this method exactly once from a master node. It returns a WriterContext. This should be serialized and sent to slave nodes to construct HCatWriter there. |
2 |
Public abstract void write(Iterator This method should be used at slave nodes to perform writes. The recordItr is an iterator object that contains the collection of records to be written into HCatalog. |
3 |
Public abstract void abort(WriterContext cntxt) throws HCatException This method should be called at the master node. The primary purpose of this method is to do cleanups in case of failures. |
4 |
public abstract void commit(WriterContext cntxt) throws HCatException This method should be called at the master node. The purpose of this method is to do metadata commit. |
类似于读取,写入也是一个两步过程,其中第一步发生在主节点上。随后,第二步并行发生在从属节点上。
写入是在WriteEntity上完成的,该实体可以类似于读取的方式构造-
WriteEntity.Builder builder = new WriteEntity.Builder();
WriteEntity entity = builder.withDatabase("mydb").withTable("mytbl").build();
上面的代码创建了一个WriteEntity对象entity
,该entity
可用于写入数据库mydb中名为mytbl的表中。
创建WriteEntity之后,下一步是获取WriterContext-
HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config);
WriterContext info = writer.prepareWrite();
以上所有步骤都在主节点上进行。然后,主节点会序列化WriterContext对象,并使它可用于所有从属对象。
在从属节点上,您需要使用WriterContext获取HCatWriter,如下所示:
HCatWriter writer = DataTransferFactory.getHCatWriter(context);
然后,作者将迭代器作为write
方法的参数-
writer.write(hCatRecordItr);
然后,编写者在循环中在此迭代器上调用getNext() ,并写出附加到迭代器的所有记录。
TestReaderWriter.java文件用于测试HCatreader和HCatWriter类。以下程序演示了如何使用HCatReader和HCatWriter API从源文件读取数据,然后将其写入目标文件。
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hive.HCatalog.common.HCatException;
import org.apache.hive.HCatalog.data.transfer.DataTransferFactory;
import org.apache.hive.HCatalog.data.transfer.HCatReader;
import org.apache.hive.HCatalog.data.transfer.HCatWriter;
import org.apache.hive.HCatalog.data.transfer.ReadEntity;
import org.apache.hive.HCatalog.data.transfer.ReaderContext;
import org.apache.hive.HCatalog.data.transfer.WriteEntity;
import org.apache.hive.HCatalog.data.transfer.WriterContext;
import org.apache.hive.HCatalog.mapreduce.HCatBaseTest;
import org.junit.Assert;
import org.junit.Test;
public class TestReaderWriter extends HCatBaseTest {
@Test
public void test() throws MetaException, CommandNeedRetryException,
IOException, ClassNotFoundException {
driver.run("drop table mytbl");
driver.run("create table mytbl (a string, b int)");
Iterator> itr = hiveConf.iterator();
Map map = new HashMap();
while (itr.hasNext()) {
Entry kv = itr.next();
map.put(kv.getKey(), kv.getValue());
}
WriterContext cntxt = runsInMaster(map);
File writeCntxtFile = File.createTempFile("hcat-write", "temp");
writeCntxtFile.deleteOnExit();
// Serialize context.
ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(writeCntxtFile));
oos.writeObject(cntxt);
oos.flush();
oos.close();
// Now, deserialize it.
ObjectInputStream ois = new ObjectInputStream(new FileInputStream(writeCntxtFile));
cntxt = (WriterContext) ois.readObject();
ois.close();
runsInSlave(cntxt);
commit(map, true, cntxt);
ReaderContext readCntxt = runsInMaster(map, false);
File readCntxtFile = File.createTempFile("hcat-read", "temp");
readCntxtFile.deleteOnExit();
oos = new ObjectOutputStream(new FileOutputStream(readCntxtFile));
oos.writeObject(readCntxt);
oos.flush();
oos.close();
ois = new ObjectInputStream(new FileInputStream(readCntxtFile));
readCntxt = (ReaderContext) ois.readObject();
ois.close();
for (int i = 0; i < readCntxt.numSplits(); i++) {
runsInSlave(readCntxt, i);
}
}
private WriterContext runsInMaster(Map config) throws HCatException {
WriteEntity.Builder builder = new WriteEntity.Builder();
WriteEntity entity = builder.withTable("mytbl").build();
HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config);
WriterContext info = writer.prepareWrite();
return info;
}
private ReaderContext runsInMaster(Map config,
boolean bogus) throws HCatException {
ReadEntity entity = new ReadEntity.Builder().withTable("mytbl").build();
HCatReader reader = DataTransferFactory.getHCatReader(entity, config);
ReaderContext cntxt = reader.prepareRead();
return cntxt;
}
private void runsInSlave(ReaderContext cntxt, int slaveNum) throws HCatException {
HCatReader reader = DataTransferFactory.getHCatReader(cntxt, slaveNum);
Iterator itr = reader.read();
int i = 1;
while (itr.hasNext()) {
HCatRecord read = itr.next();
HCatRecord written = getRecord(i++);
// Argh, HCatRecord doesnt implement equals()
Assert.assertTrue("Read: " + read.get(0) + "Written: " + written.get(0),
written.get(0).equals(read.get(0)));
Assert.assertTrue("Read: " + read.get(1) + "Written: " + written.get(1),
written.get(1).equals(read.get(1)));
Assert.assertEquals(2, read.size());
}
//Assert.assertFalse(itr.hasNext());
}
private void runsInSlave(WriterContext context) throws HCatException {
HCatWriter writer = DataTransferFactory.getHCatWriter(context);
writer.write(new HCatRecordItr());
}
private void commit(Map config, boolean status,
WriterContext context) throws IOException {
WriteEntity.Builder builder = new WriteEntity.Builder();
WriteEntity entity = builder.withTable("mytbl").build();
HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config);
if (status) {
writer.commit(context);
} else {
writer.abort(context);
}
}
private static HCatRecord getRecord(int i) {
List
上面的程序以记录形式从HDFS读取数据,并将记录数据写入mytable