读写HDFS SequenceFile数据

PXF HDFS连接器支持SequenceFile格式的二进制数据。本节描述如何使用PXF读写HDFS SequenceFile数据,包括如何创建引用HDFS文件的外部表,查询和向外部表写入数据。

先决条件

在尝试从HDFS读取或向HDFS写入数据之前,请确保已满足PXF Hadoop先决条件

创建外部表

PXF HDFS连接器 hdfs:SequenceFile 配置文件支持读写HDFS中SequenceFile格式的二进制数据。 当您将记录插入可写外部表中时,您插入的数据块将写入指定目录中的一个或多个文件。

注意: 使用可写配置创建的外部表仅INSERT操作。 如果要查询插入的数据,则必须单独创建一个引用相关HDFS目录的可读外部表。

使用以下语法创建一个引用HDFS目录的Greenplum数据库外部表:

CREATE [WRITABLE] EXTERNAL TABLE <table_name>
    ( <column_name> <data_type> [, ...] | LIKE <other_table> )
LOCATION ('pxf://<path-to-hdfs-dir>
    ?PROFILE=hdfs:SequenceFile[&SERVER=<server_name>][&<custom-option>=<value>[...]]')
FORMAT 'CUSTOM' (<formatting-properties>)
[DISTRIBUTED BY (<column_name> [, ... ] ) | DISTRIBUTED RANDOMLY];

CREATE EXTERNAL TABLE命令中使用的特定关键字和值见下表中描述。

关键字
<path‑to‑hdfs‑dir> HDFS数据存储中目录或文件的绝对路径
PROFILE PROFILE 关键字必须指定为 hdfs:SequenceFile.
SERVER=<server_name> PXF用于访问数据的命名服务器配置。可选的; 如果未指定,PXF将使用default服务器。
<custom‑option> <custom-option> 描述见下方.
FORMAT 使用 FORMATCUSTOM’ 时指定 (FORMATTER='pxfwritable_export') (写入) 或 (FORMATTER='pxfwritable_import') (读取).
DISTRIBUTED BY 如果您计划将现有Greenplum数据库表中的数据加载到可写外部表,考虑在可写外部表上使用与Greenplum表相同的分布策略或<字段名>。 这样做可以避免数据加载操作中segment节点间额外的数据移动。

SequenceFile 格式数据可以选择采用record或block压缩。 PXF hdfs:SequenceFile 配置支持以下压缩编解码器:

  • org.apache.hadoop.io.compress.DefaultCodec
  • org.apache.hadoop.io.compress.BZip2Codec

使用 hdfs:SequenceFile 配置文件写入SequenceFile格式数据时,则必须提供Java类的名称,以用于序列化/反序列化二进制数据。 这个类必须为数据模式中引用的每种数据类型提供读写方法。

您可以通过 CREATE EXTERNAL TABLE LOCATION 子句的自定义选项指定压缩编解码器和Java 序列化类。 hdfs:SequenceFile 配置文件支持以下自定义选项:

选项 值描述
COMPRESSION_CODEC 压缩编码器类名称。 如果此选项未提供,Greenplum 数据库不执行数据压缩。 支持的压缩编解码器包括:
org.apache.hadoop.io.compress.DefaultCodec
org.apache.hadoop.io.compress.BZip2Codec
org.apache.hadoop.io.compress.GzipCodec
COMPRESSION_TYPE 采用的压缩类型; 支持的值有 RECORD (默认) 或 BLOCK.
DATA-SCHEMA 编写器序列化/反序列化类的名称。 此类所在的jar文件必须位于PXF类路径中。 hdfs:SequenceFile 配置文件需要此选项,并且没有默认值。
THREAD-SAFE 确定表查询是否可以在多线程模式下运行的布尔值。 默认为 TRUE。 将此选项设置为 FALSE 以处理单个线程中所有非线程安全操作 (例如,压缩)的请求。

读写二进制数据

当您要读写HDFS中 SequenceFile 格式的数据,请使用HDFS 连接器 hdfs:SequenceFile 配置文件。 这种类型的文件由二进制键/值对组成。 SequenceFile 格式是MapReduce作业之间常见的数据传输格式。

示例: 将二进制数据写入HDFS

在此示例中,您将创建一个名为 PxfExample_CustomWritable 的Java类,该类将对先前示例中使用的示例模式中的字段进行序列化/反序列化。然后,您将使用此类访问通过hdfs:SequenceFile配置文件创建的可写外部表,该表使用默认的PXF服务器。

执行以下步骤来创建Java类和可写外部表。

  1. 准备创建示例Java类:

    $ mkdir -p pxfex/com/example/pxf/hdfs/writable/dataschema
    $ cd pxfex/com/example/pxf/hdfs/writable/dataschema
    $ vi PxfExample_CustomWritable.java
    
  2. 将以下文本复制并黏贴到 PxfExample_CustomWritable.java 文件中:

    package com.example.pxf.hdfs.writable.dataschema;
    
    import org.apache.hadoop.io.*;
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import java.lang.reflect.Field;
    
    /**
    * PxfExample_CustomWritable class - used to serialize and deserialize data with
    * text, int, and float data types
    */
    public class PxfExample_CustomWritable implements Writable {
    
    public String st1, st2;
    public int int1;
    public float ft;
    
    public PxfExample_CustomWritable() {
        st1 = new String("");
        st2 = new String("");
        int1 = 0;
        ft = 0.f;
    }
    
    public PxfExample_CustomWritable(int i1, int i2, int i3) {
    
        st1 = new String("short_string___" + i1);
        st2 = new String("short_string___" + i1);
        int1 = i2;
        ft = i1 * 10.f * 2.3f;
    
    }
    
    String GetSt1() {
        return st1;
    }
    
    String GetSt2() {
        return st2;
    }
    
    int GetInt1() {
        return int1;
    }
    
    float GetFt() {
        return ft;
    }
    
    @Override
    public void write(DataOutput out) throws IOException {
    
        Text txt = new Text();
        txt.set(st1);
        txt.write(out);
        txt.set(st2);
        txt.write(out);
    
        IntWritable intw = new IntWritable();
        intw.set(int1);
        intw.write(out);
    
        FloatWritable fw = new FloatWritable();
        fw.set(ft);
        fw.write(out);
    }
    
    @Override
    public void readFields(DataInput in) throws IOException {
    
        Text txt = new Text();
        txt.readFields(in);
        st1 = txt.toString();
        txt.readFields(in);
        st2 = txt.toString();
    
        IntWritable intw = new IntWritable();
        intw.readFields(in);
        int1 = intw.get();
    
        FloatWritable fw = new FloatWritable();
        fw.readFields(in);
        ft = fw.get();
    }
    
    public void printFieldTypes() {
        Class myClass = this.getClass();
        Field[] fields = myClass.getDeclaredFields();
    
        for (int i = 0; i < fields.length; i++) {
            System.out.println(fields[i].getType().getName());
        }
    }
    }
    
  3. 编译并为 PxfExample_CustomWritable 创建Java类JAR文件。 为您的Hadoop发行版提供一个包含 hadoop-common.jar 文件的类路径。 例如, 如果您安装了Hortonworks Data Platform Hadoop客户端:

    $ javac -classpath /usr/hdp/current/hadoop-client/hadoop-common.jar  PxfExample_CustomWritable.java
    $ cd ../../../../../../
    $ jar cf pxfex-customwritable.jar com
    $ cp pxfex-customwritable.jar /tmp/
    

    (您的Hadoop库类路径可能不同)

  4. pxfex-customwritable.jar 文件复制到Greenplum 数据库master节点。 例如:

    $ scp pxfex-customwritable.jar gpadmin@gpmaster:/home/gpadmin
    
  5. 登录到您的Greenplum数据库master节点:

    $ ssh gpadmin@<gpmaster>
    
  6. pxfex-customwritable.jar JAR文件复制到用户运行时类库目录中,并记下位置。 例如, 如果 PXF_CONF=/usr/local/greenplum-pxf

    gpadmin@gpmaster$ cp /home/gpadmin/pxfex-customwritable.jar /usr/local/greenplum-pxf/lib/pxfex-customwritable.jar
    
  7. 将PXF配置同步到Greenplum数据库集群。 例如:

    gpadmin@gpmaster$ $GPHOME/pxf/bin/pxf cluster sync
    
  8. 重启PXF中所述,在每个Greenplum数据库segment主机上重启PXF。

  9. 使用PXF hdfs:SequenceFile 配置文件创建Greenplum数据库可写外部表。 在 DATA-SCHEMA 自定义选项 <custom-option> 中标识您在上面创建的序列化/反序列化Java类。 创建可写外部表时,使用 BZip2 压缩并指定 BLOCK 压缩模式。

    postgres=# CREATE WRITABLE EXTERNAL TABLE pxf_tbl_seqfile (location text, month text, number_of_orders integer, total_sales real)
                LOCATION ('pxf://data/pxf_examples/pxf_seqfile?PROFILE=hdfs:SequenceFile&DATA-SCHEMA=com.example.pxf.hdfs.writable.dataschema.PxfExample_CustomWritable&COMPRESSION_TYPE=BLOCK&COMPRESSION_CODEC=org.apache.hadoop.io.compress.BZip2Codec')
              FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export');
    

    注意 'CUSTOM' FORMAT 格式化属性 <formatting-properties> 指定为内置的 pxfwritable_export 格式化程序。

  10. 通过直接插入 pxf_tbl_seqfile 表将一些数据写入 pxf_seqfile HDFS目录中。 例如:

    postgres=# INSERT INTO pxf_tbl_seqfile VALUES ( 'Frankfurt', 'Mar', 777, 3956.98 );
    postgres=# INSERT INTO pxf_tbl_seqfile VALUES ( 'Cleveland', 'Oct', 3812, 96645.37 );
    
  11. 回想一下,Greenplum 数据库不支持直接查询一个可写外部表。要读取pxf_seqfile中的数据,请创建一个引用此HDFS目录的可读外部表:

    postgres=# CREATE EXTERNAL TABLE read_pxf_tbl_seqfile (location text, month text, number_of_orders integer, total_sales real)
                LOCATION ('pxf://data/pxf_examples/pxf_seqfile?PROFILE=hdfs:SequenceFile&DATA-SCHEMA=com.example.pxf.hdfs.writable.dataschema.PxfExample_CustomWritable')
              FORMAT 'CUSTOM' (FORMATTER='pxfwritable_import');
    

    当您通过 hdfs:SequenceFile 配置文件读取HDFS数据是必须指定 DATA-SCHEMA 自定义选项 <custom-option>。 您无需提供压缩相关的选项。

  12. 查询可读外部表 read_pxf_tbl_seqfile

    gpadmin=# SELECT * FROM read_pxf_tbl_seqfile ORDER BY total_sales;
    
     location  | month | number_of_orders | total_sales 
    -----------+-------+------------------+-------------
     Frankfurt | Mar   |              777 |     3956.98
     Cleveland | Oct   |             3812 |     96645.4
    (2 rows)
    

读取记录键

当Greenplum数据库外部表引用SequenceFile或其他以键-值对存储行的数据格式时, 您可以通过使用 recordkey 关键字作为字段名称,在Greenplum查询中访问记录键的值。

recordkey 字段类型必须与记录键类型相对应,就像其他字段必须与HDFS数据匹配一样。. 

您可以将 recordkey 定义为以下任何Hadoop类型:

  • BooleanWritable
  • ByteWritable
  • DoubleWritable
  • FloatWritable
  • IntWritable
  • LongWritable
  • Text

如果没有为行定义记录键, Greenplum数据库将返回处理该行的segment的id。

示例: 使用记录键

创建一个可读外部表,以访问您在 示例: 将二进制数据写入HDFS 创建的可写外部表 pxf_tbl_seqfile 的记录键。 本例中将 recordkey 定义为 int8 类型。

postgres=# CREATE EXTERNAL TABLE read_pxf_tbl_seqfile_recordkey(recordkey int8, location text, month text, number_of_orders integer, total_sales real)
                LOCATION ('pxf://data/pxf_examples/pxf_seqfile?PROFILE=hdfs:SequenceFile&DATA-SCHEMA=com.example.pxf.hdfs.writable.dataschema.PxfExample_CustomWritable')
          FORMAT 'CUSTOM' (FORMATTER='pxfwritable_import');
gpadmin=# SELECT * FROM read_pxf_tbl_seqfile_recordkey;
 recordkey |  location   | month | number_of_orders | total_sales 
-----------+-------------+-------+------------------+-------------
         2 | Frankfurt   | Mar   |              777 |     3956.98
         1 | Cleveland   | Oct   |             3812 |     96645.4
(2 rows)

当您将行插入到可写外部表时,您没有定义记录键, 因此 recordkey 标识为处理该行的segment。