HBase Java API 基本操作
学完hbase shell API的基本操作之后,可以通过Java API 对hbase基本操作实现一把。基本概念java类对应数据模型HBaseConfigurationHBase配置类HBaseAdminHBase管理Admin类TableHBase Table操作类PutHBase添加操作数据模型GetHBase单个查询操作数据模...
学完hbase shell API的基本操作之后,可以通过Java API 对hbase基本操作实现一把。
- 基本概念
java类 | 对应数据模型 |
---|---|
HBaseConfiguration | HBase配置类 |
HBaseAdmin | HBase管理Admin类 |
Table | HBase Table操作类 |
Put | HBase添加操作数据模型 |
Get | HBase单个查询操作数据模型 |
Scan | HBase Scan检索操作数据模型 |
Result | HBase单个查询的结果模型 |
ResultScanner | HBase检索结果模型 |
2.首先封装一个hbase connection工具类,这是一个单例的类,用于获取获取连接,关闭连接,获取hbase table对象。
package com.zsk.hbase.api;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import java.io.IOException;
public class HBaseConn {
private static final HBaseConn INSTANCE = new HBaseConn();
private static Configuration configuration; //hbase配置
private static Connection connection; //hbase connection
private HBaseConn(){
try{
if (configuration==null){
configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum","192.168.217.129:2181");
}
}catch (Exception e){
e.printStackTrace();
}
}
private Connection getConnection(){
if (connection==null || connection.isClosed()){
try{
connection = ConnectionFactory.createConnection(configuration);
}catch (Exception e){
e.printStackTrace();
}
}
return connection;
}
public static Connection getHBaseConn(){
return INSTANCE.getConnection();
}
public static Table getTable(String tableName) throws IOException {
return INSTANCE.getConnection().getTable(TableName.valueOf(tableName));
}
public static void closeConn(){
if (connection!=null){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
3.具体操作hbase 工具类
package com.zsk.hbase.api;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.util.Bytes;
import java.util.Arrays;
import java.util.List;
public class HBaseUtil {
/**
* 创建表
* @param tableName 创建表的表名称
* @param cfs 列簇的集合
* @return
*/
public static boolean createTable(String tableName, String[] cfs) {
try (HBaseAdmin admin = (HBaseAdmin) HBaseConn.getHBaseConn().getAdmin()) {
if (admin.tableExists(tableName)) {
return false;
}
HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
Arrays.stream(cfs).forEach(cf -> {
HColumnDescriptor columnDescriptor = new HColumnDescriptor(cf);
columnDescriptor.setMaxVersions(1);
tableDescriptor.addFamily(columnDescriptor);
});
admin.createTable(tableDescriptor);
} catch (Exception e) {
e.printStackTrace();
}
return true;
}
/**
* 删除表
* @param tableName 表名称
* @return
*/
public static boolean deleteTable(String tableName){
try(HBaseAdmin admin = (HBaseAdmin)HBaseConn.getHBaseConn().getAdmin()){
if (!admin.tableExists(tableName)){
return false;
}
admin.disableTable(tableName);
admin.deleteTable(tableName);
}catch (Exception e){
e.printStackTrace();
}
return true;
}
/**
* 插入数据
* @param tableName
* @param rowkey
* @param cfName
* @param qualifer
* @param data
* @return
*/
public static boolean putRow(String tableName,String rowkey,String cfName,String qualifer,String data){
try(Table table = HBaseConn.getTable(tableName)){
Put put = new Put(Bytes.toBytes(rowkey));
put.addColumn(Bytes.toBytes(cfName),Bytes.toBytes(qualifer),Bytes.toBytes(data));
table.put(put);
}catch (Exception e){
e.printStackTrace();
}
return true;
}
/**
*批量出入数据
* @param tableName
* @param puts
* @return
*/
public static boolean putRows(String tableName, List<Put> puts){
try(Table table = HBaseConn.getTable(tableName)){
table.put(puts);
}catch (Exception e){
e.printStackTrace();
}
return true;
}
/**
* 查询单条数据
* @param tableName
* @param rowkey
* @return
*/
public static Result getRow(String tableName,String rowkey){
try( Table table = HBaseConn.getTable(tableName)){
Get get = new Get(Bytes.toBytes(rowkey));
return table.get(get);
}catch (Exception e){
e.printStackTrace();
}
return null;
}
/**
* 带有过滤器的插入数据
* @param tableName
* @param rowkey
* @param filterList
* @return
*/
public static Result getRow(String tableName, String rowkey, FilterList filterList){
try( Table table = HBaseConn.getTable(tableName)){
Get get = new Get(Bytes.toBytes(rowkey));
get.setFilter(filterList);
Result result = table.get(get);
System.out.println("rowkey == "+Bytes.toString(result.getRow()));
System.out.println("basic:name == "+Bytes.toString(result.getValue(Bytes.toBytes("basic"), Bytes.toBytes("name"))));
System.out.println("basic:age == "+Bytes.toString(result.getValue(Bytes.toBytes("basic"), Bytes.toBytes("age"))));
System.out.println("basic:sex == "+Bytes.toString(result.getValue(Bytes.toBytes("basic"), Bytes.toBytes("sex"))));
System.out.println("basic:salary == "+Bytes.toString(result.getValue(Bytes.toBytes("extend"), Bytes.toBytes("salary"))));
System.out.println("basic:job == "+Bytes.toString(result.getValue(Bytes.toBytes("extend"), Bytes.toBytes("job"))));
return result;
}catch (Exception e){
e.printStackTrace();
}
return null;
}
/**
*scan扫描数据,
* @param tableName
* @return
*/
public static ResultScanner getScanner(String tableName){
try( Table table = HBaseConn.getTable(tableName)){
Scan scan = new Scan();
scan.setCaching(1000);
ResultScanner results = table.getScanner(scan);
results.forEach(result -> {
System.out.println("rowkey == "+Bytes.toString(result.getRow()));
System.out.println("basic:name == "+Bytes.toString(result.getValue(Bytes.toBytes("basic"), Bytes.toBytes("name"))));
System.out.println("basic:age == "+Bytes.toString(result.getValue(Bytes.toBytes("basic"), Bytes.toBytes("age"))));
System.out.println("basic:sex == "+Bytes.toString(result.getValue(Bytes.toBytes("basic"), Bytes.toBytes("sex"))));
System.out.println("basic:salary == "+Bytes.toString(result.getValue(Bytes.toBytes("extend"), Bytes.toBytes("salary"))));
System.out.println("basic:job == "+Bytes.toString(result.getValue(Bytes.toBytes("extend"), Bytes.toBytes("job"))));
});
return results;
}catch (Exception e){
e.printStackTrace();
}
return null;
}
/**
*can 检索数据,控制startrow,stoprow 注意包括startrow 不包括stoprow,
* @param tableName
* @param startKey
* @param stopKey
* @return
*/
public static ResultScanner getScanner(String tableName,String startKey,String stopKey){
try( Table table = HBaseConn.getTable(tableName)){
Scan scan = new Scan();
scan.setStartRow(Bytes.toBytes(startKey));
scan.setStopRow(Bytes.toBytes(stopKey));
scan.setCaching(1000);
ResultScanner results = table.getScanner(scan);
results.forEach(result -> {
System.out.println("rowkey == "+Bytes.toString(result.getRow()));
System.out.println("basic:name == "+Bytes.toString(result.getValue(Bytes.toBytes("basic"), Bytes.toBytes("name"))));
System.out.println("basic:age == "+Bytes.toString(result.getValue(Bytes.toBytes("basic"), Bytes.toBytes("age"))));
System.out.println("basic:sex == "+Bytes.toString(result.getValue(Bytes.toBytes("basic"), Bytes.toBytes("sex"))));
System.out.println("basic:salary == "+Bytes.toString(result.getValue(Bytes.toBytes("extend"), Bytes.toBytes("salary"))));
System.out.println("basic:job == "+Bytes.toString(result.getValue(Bytes.toBytes("extend"), Bytes.toBytes("job"))));
});
return results;
}catch (Exception e){
e.printStackTrace();
}
return null;
}
/**
* scan 检索数据,控制startrow,stoprow 注意包括startrow 不包括stoprow,filterList对查询过滤
* @param tableName
* @param startKey
* @param stopKey
* @param filterList
* @return
*/
public static ResultScanner getScanner(String tableName,String startKey,String stopKey,FilterList filterList){
try( Table table = HBaseConn.getTable(tableName)){
Scan scan = new Scan();
scan.setFilter(filterList);
scan.setStartRow(Bytes.toBytes(startKey));
scan.setStopRow(Bytes.toBytes(stopKey));
scan.setCaching(1000);
ResultScanner results = table.getScanner(scan);
results.forEach(result -> {
System.out.println("rowkey == "+Bytes.toString(result.getRow()));
System.out.println("basic:name == "+Bytes.toString(result.getValue(Bytes.toBytes("basic"), Bytes.toBytes("name"))));
System.out.println("basic:age == "+Bytes.toString(result.getValue(Bytes.toBytes("basic"), Bytes.toBytes("age"))));
System.out.println("basic:sex == "+Bytes.toString(result.getValue(Bytes.toBytes("basic"), Bytes.toBytes("sex"))));
System.out.println("basic:salary == "+Bytes.toString(result.getValue(Bytes.toBytes("extend"), Bytes.toBytes("salary"))));
System.out.println("basic:job == "+Bytes.toString(result.getValue(Bytes.toBytes("extend"), Bytes.toBytes("job"))));
});
return results;
}catch (Exception e){
e.printStackTrace();
}
return null;
}
/**
* 删除行
* @param tableName
* @param rowkey
* @return
*/
public static boolean deleteRow(String tableName,String rowkey){
try( Table table = HBaseConn.getTable(tableName)){
Delete delete = new Delete(Bytes.toBytes(rowkey));
table.delete(delete);
}catch (Exception e){
e.printStackTrace();
}
return true;
}
/**
*删除列簇
* @param tableName
* @param cfName
* @return
*/
public static boolean deleteColumnFamily(String tableName,String cfName){
try(HBaseAdmin admin = (HBaseAdmin)HBaseConn.getHBaseConn().getAdmin()){
admin.deleteColumn(tableName,cfName);
}catch (Exception e){
e.printStackTrace();
}
return true;
}
/**
* 删除列
* @param tableName
* @param cfName
* @return
*/
public static boolean deleteQualifier(String tableName,String rowkey,String cfName,String qualiferName){
try(Table table = HBaseConn.getTable(tableName)){
Delete delete = new Delete(Bytes.toBytes(rowkey));
delete.addColumn(Bytes.toBytes(cfName),Bytes.toBytes(qualiferName));
table.delete(delete);
}catch (Exception e){
e.printStackTrace();
}
return true;
}
}
总结:
-
hbase 数据类型只有bytes类型,而关系型数据库 可以有多种多样的数据类型。所以我们根据rowkey,cf,qualifer等条件进行查询的时候,需要将对应的字段通过Bytes.toBytes()方法进行封装。同样,对于返回的result,也需要通过Bytes.toString()方法转为string对象。
-
hbase 不支持更新操作,要实现更新操作,只能够先删除该数据 然后重新插入。
-
我们可以看到某些情况下只需要Table实例,即可实现响应操作,但是某些情况需要HBaseAdmin实例,才能进行修改,删除等操作。具体什么时候,使用那种操作呢?通过前面文章HBase读写流程https://blog.csdn.net/zhangshk_/article/details/83663642应该不难明白。对于修改删除列簇,还有涉及表的创建删除等操作,是需要HBaseAdmin操作的,因为这些操作需要涉及meta表的更新。而其他的操作,只需要通过zookeeper获取数据的regionserver,通过regionserver就可完成读写请求。
更多推荐
所有评论(0)