项目作者: YaoQi17

项目描述 :
利用SpringBoot整合HBase,基于HBaseJavaAPI的二次封装,可以直接引用jar包使用,目前测试已支持HBase1.1.2和HBase1.4.6以及HBase2.0.2三个版本。
高级语言: Java
项目地址: git://github.com/YaoQi17/HBase-Component.git


HBase 组件接口文档


使用说明

基本概念

table: 表

columnFamily:列族,一个表下可以有多个列族,但是不建议设置多个列族,HBase建议设计长窄型的表而不是短宽型。

qualifier:列,一个列族下可以有多列,一个表中的列可以是不对齐的,但是这样效率不高,同一张表中的列最好是相同的。

cell:一列数据下的一个单元格,一个列下可以有多个单元格,根据版本号区分,默认每次读取最新版本的数据,cell下的存储是数据本身。

row: 行,多列数据组成一行,一行中有多个qualifier。

rowKey: 行健,用于唯一标识一行数据,一行下有多列,行健的设计直接关系到查询的效率。

HBase配置

以下配置为最基础配置,缺一不可。

  1. HBase:
  2. conf:
  3. quorum: 192.168.80.234:2181,192.168.80.235:2181,192.168.80.241:2181
  4. znodeParent: /hbase-unsecure
  5. #如果有更多配置,写在config下,例如:
  6. #config:
  7. # key: value
  8. # key: value

如果需要更多配置,需要在config中配置,以key-value的形式书写。

参数说明

quorum是HBase中zookeeper的配置,znodeParent是HBase配置在zookeeper中的路径。

带kerberos认证的配置如下:

  1. # HBase配置
  2. HBase:
  3. conf:
  4. quorum: 192.168.80.54:2181,192.168.80.62:2181,192.168.80.64:2181,192.168.80.78:2181,192.168.80.52:2181
  5. znodeParent: /hbase-secure
  6. config:
  7. #keytab地址路径
  8. user-keytab: D:\semptian\environment\security/hbase.service.keytab
  9. #如果使用kb认证请写kerberos否则不用写
  10. authMethod: kerberos
  11. #master.kerberos.principal
  12. masterPrincipal: hbase/node248@SEMPTIAN.COM
  13. #regionserver.kerberos.principal regionserverPrincipal/hbaseSitePath 二选一,一些环境下使用hbaseSitePath更简单
  14. #regionserverPrincipal: root/_HOST@SEMPTIAN.COM
  15. #hbseSite配置文件路径D:\kerberos\hbase-kb\hbase-site.xml
  16. hbaseSitePath: D:\semptian\environment\security/hbase-site.xml
  17. #coreSitePath: D:\kerberos\dev-hbase-kb\core-site.xml
  18. # 刷新认证周期,单位:小时
  19. refreshAuth: 8

认证配置参数说明

user-keytab 和 masterPrincipal 是对应的,一个对应用户的认证文件,一个对应用户。
hbaseSitePath 是hbase-site.xml的文件路径。

在jdk中会自动设置认证过期时间为24小时,所以在组件中需要对认证进行刷新,重新建立连接。refreshAuth 就是配置刷新认证的周期,一般小于24小时,
建议12小时,因为重新建立连接需要消耗一些资源。

简单示例

引入组件jar包:

  1. <dependency>
  2. <groupId>com.semptian.hbase.component</groupId>
  3. <artifactId>hbase-component</artifactId>
  4. <version>1.0.1-SNAPSHOT</version>
  5. </dependency>

在需要的地方注入HBaseOperations接口,该接口的实现类是HBaseTemplate,通过这个类来操作HBase。

  1. @Autowired
  2. private HBaseOperations hBaseDao;

查询一条数据,通过rowKey查询:

  1. public void testQueryTable() {
  2. Result result = hBaseDao.queryByTableNameAndRowKey(
  3. "LBS", 9223372036854775803L);
  4. System.out.println(result.isEmpty());
  5. result.listCells().forEach(cell -> {
  6. System.out.println(
  7. "row:" + Bytes.toLong(CellUtil.cloneRow(cell)) +
  8. ",family:"+ Bytes.toString(CellUtil.cloneFamily(cell)) +
  9. ", qualifier: " + Bytes.toString(CellUtil.cloneQualifier(cell)) +
  10. ", value:" + Bytes.toString(CellUtil.cloneValue(cell)));
  11. });
  12. }

表的基本操作

新建表

创建表通过HBaseTemplate就可以实现,HBaseTemplate类中带有这个方法。

操作示例:

  1. hBaseDao.createTable("HBASE-COMPONENT_1", "CF1", "CF2");

上述代码创建了一张表,HBASE-COMPONENT_1 是表名,CF1,CF2代表这个表有两个列族。

如果有多个列族可以往后面加,列族不建议设置很多个。

删除表

  1. hBaseDao.dropTable("HBASE-COMPONENT_1");

参数是表名,通过表名删除表。

判断表是否存在

  1. hBaseDao.tableExists("lbs");

这里的表名是区分大小写的。返回值:boolean。

新增数据

新增一条数据

需要注意的是在HBase中的存储的数据是不分格式的,都是以字节数组的形式存储,因此在存储一条数据时需要将数据都转化成字节数组。

String格式的数据能直接转换为字节数组getBytes(),但是其他格式的数据需要借助工具作转换。

这里需要格外注意rowKey的格式,用什么格式存就决定了用什么格式取。

  1. hBaseDao.put("HBase-component", "1534154424340", "CF1", "test_1", Bytes.toBytes("testData"));

参数说明:

  1. (1) tableName 目标数据表
  2. (2) rowName rowKey
  3. (3) familyName 列族名
  4. (4) qualifier 列名
  5. (5) data 字节数组类型的数据

这里新增一条数据是填充数据到一个cell中去。

批量新增数据

  1. String rowKey = String.valueOf(System.currentTimeMillis());
  2. Put put = new Put(rowKey.getBytes());
  3. String defaultColumn = "CF1";
  4. String column1 = "col1";
  5. String column2 = "col2";
  6. String column3 = "col3";
  7. String value = "test";
  8. put.addColumn(defaultColumn.getBytes(), column1.getBytes(), value.getBytes());
  9. put.addColumn(defaultColumn.getBytes(), column2.getBytes(), value.getBytes());
  10. put.addColumn(defaultColumn.getBytes(), column3.getBytes(), value.getBytes());
  11. List<Put> putList = new ArrayList<>();
  12. putList.add(put);
  13. putList.add(put);
  14. putList.add(put);
  15. putList.add(put);
  16. putList.add(put);
  17. hBaseDao.putBatch("HBase-component", putList);

批量插入数据就是使用多个Put对象,putBatch(…)方法的参数:表名,putList(多个put的集合)。
注意批量插入数据也都是插入字节数组格式的数据。

删除数据

删除一条数据

  1. hBaseDao.delete("HBase-component", "1534210201115", "CF1", "col2");

参数说明:

(1) 表名

(2) rowKey

(3) 列族名

(4) 列名

这里删除是删除一个cell下的数据

批量删除数据

  1. String tableName = "HBase-component";
  2. String rowKey1 = "1534164113922";
  3. String rowKey2 = "1534168248328";
  4. List<Delete> deleteList = new ArrayList<>();
  5. Delete delete = new Delete(rowKey1.getBytes());
  6. Delete delete1 = new Delete(rowKey2.getBytes());
  7. deleteList.add(delete);
  8. deleteList.add(delete1);
  9. hBaseDao.deleteBatch(tableName, deleteList);

批量删除需要借助Delete对象。

查询

单条结果查询

  1. Result result = hBaseDao.queryByTableNameAndRowKey("LBS", 9223372036854775803L);
  2. System.out.println(result.isEmpty());
  3. result.listCells().forEach(cell -> {
  4. System.out.println(
  5. " row:" + Bytes.toLong(CellUtil.cloneRow(cell)) +
  6. " family:"+ Bytes.toString(CellUtil.cloneFamily(cell)) +
  7. " qualifier: " + Bytes.toString(CellUtil.cloneQualifier(cell)) +
  8. " value:" + Bytes.toString(CellUtil.cloneValue(cell)));
  9. });

queryByTableNameAndRowKey()该方法是通过表名和rowKey查询数据,这里的rowKey支持多种类型,Long,double,Integer几种类型。
至于这里传什么类型的参数,取决于插入数据时rowKey的类型,虽然HBase里存储的都是字节数组,但是对类型是敏感的,如果类型对不上可能会出错。

批量扫描

  1. // 构建scan
  2. Scan scan = new Scan();
  3. // 设置时间戳,计算时间差
  4. Long timeDifference = 2L * 30L * 24L * 60L * 60L * 1000L;
  5. Long endTime = System.currentTimeMillis();
  6. Long fromTime = endTime - timeDifference;
  7. // 设置时间过滤器
  8. FilterList filterList = new FilterList();
  9. Filter startTimeFilter = new SingleColumnValueFilter(
  10. DEFAULT_COLUMN_FAMILY.getBytes(),
  11. DATA_CREATE_TIME.getBytes(),
  12. CompareFilter.CompareOp.GREATER,
  13. Bytes.toBytes(fromTime)
  14. );
  15. Filter endTimeFilter = new SingleColumnValueFilter(
  16. DEFAULT_COLUMN_FAMILY.getBytes(),
  17. DATA_CREATE_TIME.getBytes(),
  18. CompareFilter.CompareOp.LESS,
  19. Bytes.toBytes(endTime)
  20. );
  21. filterList.addFilter(startTimeFilter);
  22. filterList.addFilter(endTimeFilter);
  23. scan.setFilter(filterList);
  24. // 获取结果集
  25. ResultScanner resultScanner = hBaseTemplate.queryByScan(TABLE_NAME, scan);
  26. // 遍历结果集
  27. try{
  28. if (resultScanner != null) {
  29. resultScanner.forEach(result -> {
  30. List<Cell> cellList = result.listCells();
  31. ...
  32. }
  33. }
  34. }finally{
  35. if (resultScanner != null) {
  36. resultScanner.close();
  37. }
  38. }

批量查询可以通过queryByScan()方法实现,第一个参数是表名,第二个参数是scan,通过构建不同的scan来查询,过滤器也是在构建scan对象是添加的,可以添加多个过滤器。

需要注意的是这里的ResultScanner类,在遍历结果集时需要使用try-finally结构,在使用完resultScanner对象之后关闭该对象。HBase官方文档上强调了这一点。因此在使用ResultScanner对象时需要格外注意。

常见过滤器:

行健过滤器:RowFilter

列族过滤器:FamilyFilter

值过滤器:ValueFilter

列过滤器:QualifierFilter

单列值过滤器:SingleColumnValueFilter(会返回满足条件的行)

单列值排除过滤器:SingleColumnExcludeFilter(返回排除了该列的结果,与单列值过滤器相反)

前缀过滤器:PrefixFilter(这个过滤器是针对行健的,在构造方法中传入字节数组形式的内容,过滤器会去匹配行健)

页数过滤器:PageFilter(使用pageFilter过滤器的时候需要注意,并不是设置了页数大小就能返回相应数目的结果)

  1. String tableName = "RECOMMEND_ENGINE_DATA_MODEL";
  2. Scan scan = new Scan();
  3. PageFilter pageFilter = new PageFilter(1);
  4. scan.setFilter(pageFilter);
  5. ResultScanner resultScanner = hBaseDao.queryByScan(tableName, scan);
  6. try{
  7. resultScanner.forEach(result -> {
  8. result.listCells().forEach(cell -> {
  9. // process
  10. });
  11. }finally{
  12. if (resultScanner != null) {
  13. resultScanner.close();
  14. }
  15. }

上面这段代码中设置了页面大小为1,预期是返回一条数据,但是结果会返回两条数据,这时返回的结果数会取决于regionServer的数量。

如果是FilterList,FilterList的顺序会影响PageFilter的效果。

一般比较型过滤器,需要用CompareFilter.CompareOp中的比较运算符。所有的过滤器都是用Scan对象去设置。

多过滤器查询

  1. String tableName = "HBase-component";
  2. Scan scan = new Scan();
  3. PageFilter pageFilter = new PageFilter(1);
  4. SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(
  5. "CF1".getBytes(),
  6. "col1".getBytes(),
  7. CompareFilter.CompareOp.EQUAL,
  8. new SubstringComparator("group"));
  9. singleColumnValueFilter.setFilterIfMissing(true);
  10. FilterList filterList = new FilterList();
  11. filterList.addFilter(singleColumnValueFilter);
  12. filterList.addFilter(pageFilter);
  13. scan.setFilter(filterList);
  14. ResultScanner resultScanner = hBaseDao.queryByScan(tableName, scan);
  15. try {
  16. resultScanner.forEach(result -> {
  17. result.listCells().forEach(cell -> {
  18. System.out.println(
  19. " row:" + Bytes.toString(CellUtil.cloneRow(cell)) +
  20. " family:"+ Bytes.toString(CellUtil.cloneFamily(cell)) +
  21. " qualifier: " + Bytes.toString(CellUtil.cloneQualifier(cell))+
  22. " value:" + Bytes.toString(CellUtil.cloneValue(cell)));
  23. });
  24. });
  25. } finally {
  26. if (resultScanner != null) {
  27. resultScanner.close();
  28. }
  29. }

多过滤器需要用到FilterList,也是直接设置到Scan对象中。多过滤器的时候需要注意过滤器的顺序问题,例如上面代码中如果将两个过滤器调换顺序,查询的结果也是不一样的。

结果集的映射

在HBase中,默认所有的顺序都是按照字母序排列,例如CF1列族下有多个列:col1、col2、col3,那么在遍历结果集时,listCells()中的cell的顺序总是按照列名的字母序来排列的。

所以cellList.get(0)就是对应col1中的数据,cellList.get(1)就是对应col2中的数据,cellList.get(2)就是对应col3中的数据。

如果列名为a、b、c那分别对应的下标为cellList.get(0)、cellList.get(1)、cellList.get(2)