学完hbase shell API的基本操作之后,可以通过Java API 对hbase基本操作实现一把。
  1. 基本概念
java类 对应数据模型
HBaseConfiguration HBase配置类
HBaseAdmin HBase管理Admin类
Table HBase Table操作类
Put HBase添加操作数据模型
Get HBase单个查询操作数据模型
Scan HBase Scan检索操作数据模型
Result HBase单个查询的结果模型
ResultScanner HBase检索结果模型

2.首先封装一个hbase connection工具类,这是一个单例的类,用于获取获取连接,关闭连接,获取hbase table对象。

package com.zsk.hbase.api;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;

import java.io.IOException;

public class HBaseConn {
    private static final HBaseConn INSTANCE = new HBaseConn();
    private static  Configuration configuration; //hbase配置
    private static  Connection connection; //hbase connection
    private HBaseConn(){
        try{
            if (configuration==null){
                 configuration = HBaseConfiguration.create();
                 configuration.set("hbase.zookeeper.quorum","192.168.217.129:2181");
            }

        }catch (Exception e){
            e.printStackTrace();
        }
    }
    private  Connection getConnection(){
        if (connection==null || connection.isClosed()){
            try{
                connection = ConnectionFactory.createConnection(configuration);
            }catch (Exception e){
                e.printStackTrace();
            }
        }
        return connection;
    }
    public static Connection getHBaseConn(){
        return INSTANCE.getConnection();
    }
    public static Table getTable(String tableName) throws IOException {
        return INSTANCE.getConnection().getTable(TableName.valueOf(tableName));
    }
    public static void closeConn(){
        if (connection!=null){
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

}

3.具体操作hbase 工具类


package com.zsk.hbase.api;

import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.util.Bytes;

import java.util.Arrays;
import java.util.List;

public class HBaseUtil {
    /**
     * 创建表
     * @param tableName 创建表的表名称
     * @param cfs 列簇的集合
     * @return
     */
    public static boolean createTable(String tableName, String[] cfs) {
        try (HBaseAdmin admin = (HBaseAdmin) HBaseConn.getHBaseConn().getAdmin()) {
            if (admin.tableExists(tableName)) {
                return false;
            }
            HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
            Arrays.stream(cfs).forEach(cf -> {
                HColumnDescriptor columnDescriptor = new HColumnDescriptor(cf);
                columnDescriptor.setMaxVersions(1);
                tableDescriptor.addFamily(columnDescriptor);
            });
            admin.createTable(tableDescriptor);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return true;
    }

    /**
     * 删除表
     * @param tableName 表名称
     * @return
     */
    public static boolean deleteTable(String tableName){
        try(HBaseAdmin admin = (HBaseAdmin)HBaseConn.getHBaseConn().getAdmin()){
            if (!admin.tableExists(tableName)){
                return false;
            }
            admin.disableTable(tableName);
            admin.deleteTable(tableName);
        }catch (Exception e){
            e.printStackTrace();
        }
        return true;
    }

    /**
     * 插入数据
     * @param tableName
     * @param rowkey
     * @param cfName
     * @param qualifer
     * @param data
     * @return
     */
    public static boolean putRow(String tableName,String rowkey,String cfName,String qualifer,String data){
        try(Table table = HBaseConn.getTable(tableName)){
            Put put = new Put(Bytes.toBytes(rowkey));
            put.addColumn(Bytes.toBytes(cfName),Bytes.toBytes(qualifer),Bytes.toBytes(data));
            table.put(put);
        }catch (Exception e){
            e.printStackTrace();
        }
        return true;
    }

    /**
     *批量出入数据
     * @param tableName
     * @param puts
     * @return
     */
    public static boolean putRows(String tableName, List<Put> puts){
        try(Table table = HBaseConn.getTable(tableName)){
            table.put(puts);
        }catch (Exception e){
            e.printStackTrace();
        }
        return true;
    }

    /**
     * 查询单条数据
     * @param tableName
     * @param rowkey
     * @return
     */
    public static Result getRow(String tableName,String rowkey){
        try( Table table = HBaseConn.getTable(tableName)){
            Get get = new Get(Bytes.toBytes(rowkey));
            return table.get(get);
        }catch (Exception e){
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 带有过滤器的插入数据
     * @param tableName
     * @param rowkey
     * @param filterList
     * @return
     */
    public static Result getRow(String tableName, String rowkey, FilterList filterList){
        try( Table table = HBaseConn.getTable(tableName)){
            Get get = new Get(Bytes.toBytes(rowkey));
            get.setFilter(filterList);
            Result result = table.get(get);
            System.out.println("rowkey == "+Bytes.toString(result.getRow()));
            System.out.println("basic:name == "+Bytes.toString(result.getValue(Bytes.toBytes("basic"), Bytes.toBytes("name"))));
            System.out.println("basic:age == "+Bytes.toString(result.getValue(Bytes.toBytes("basic"), Bytes.toBytes("age"))));
            System.out.println("basic:sex == "+Bytes.toString(result.getValue(Bytes.toBytes("basic"), Bytes.toBytes("sex"))));
            System.out.println("basic:salary == "+Bytes.toString(result.getValue(Bytes.toBytes("extend"), Bytes.toBytes("salary"))));
            System.out.println("basic:job == "+Bytes.toString(result.getValue(Bytes.toBytes("extend"), Bytes.toBytes("job"))));
            return result;
        }catch (Exception e){
            e.printStackTrace();
        }
        return null;
    }

    /**
     *scan扫描数据,
     * @param tableName
     * @return
     */
    public static ResultScanner getScanner(String tableName){
        try( Table table = HBaseConn.getTable(tableName)){
            Scan scan = new Scan();
            scan.setCaching(1000);
            ResultScanner results = table.getScanner(scan);
            results.forEach(result -> {
                System.out.println("rowkey == "+Bytes.toString(result.getRow()));
                System.out.println("basic:name == "+Bytes.toString(result.getValue(Bytes.toBytes("basic"), Bytes.toBytes("name"))));
                System.out.println("basic:age == "+Bytes.toString(result.getValue(Bytes.toBytes("basic"), Bytes.toBytes("age"))));
                System.out.println("basic:sex == "+Bytes.toString(result.getValue(Bytes.toBytes("basic"), Bytes.toBytes("sex"))));
                System.out.println("basic:salary == "+Bytes.toString(result.getValue(Bytes.toBytes("extend"), Bytes.toBytes("salary"))));
                System.out.println("basic:job == "+Bytes.toString(result.getValue(Bytes.toBytes("extend"), Bytes.toBytes("job"))));
            });
            return results;
        }catch (Exception e){
            e.printStackTrace();
        }
        return null;
    }

    /**
     *can 检索数据,控制startrow,stoprow 注意包括startrow 不包括stoprow,
     * @param tableName
     * @param startKey
     * @param stopKey
     * @return
     */
    public static ResultScanner getScanner(String tableName,String startKey,String stopKey){
        try( Table table = HBaseConn.getTable(tableName)){
            Scan scan = new Scan();
            scan.setStartRow(Bytes.toBytes(startKey));
            scan.setStopRow(Bytes.toBytes(stopKey));
            scan.setCaching(1000);
            ResultScanner results = table.getScanner(scan);
            results.forEach(result -> {
                System.out.println("rowkey == "+Bytes.toString(result.getRow()));
                System.out.println("basic:name == "+Bytes.toString(result.getValue(Bytes.toBytes("basic"), Bytes.toBytes("name"))));
                System.out.println("basic:age == "+Bytes.toString(result.getValue(Bytes.toBytes("basic"), Bytes.toBytes("age"))));
                System.out.println("basic:sex == "+Bytes.toString(result.getValue(Bytes.toBytes("basic"), Bytes.toBytes("sex"))));
                System.out.println("basic:salary == "+Bytes.toString(result.getValue(Bytes.toBytes("extend"), Bytes.toBytes("salary"))));
                System.out.println("basic:job == "+Bytes.toString(result.getValue(Bytes.toBytes("extend"), Bytes.toBytes("job"))));
            });
            return results;
        }catch (Exception e){
            e.printStackTrace();
        }
        return null;
    }

    /**
     * scan 检索数据,控制startrow,stoprow 注意包括startrow 不包括stoprow,filterList对查询过滤
     * @param tableName
     * @param startKey
     * @param stopKey
     * @param filterList
     * @return
     */
    public static ResultScanner getScanner(String tableName,String startKey,String stopKey,FilterList filterList){
        try( Table table = HBaseConn.getTable(tableName)){
            Scan scan = new Scan();
            scan.setFilter(filterList);
            scan.setStartRow(Bytes.toBytes(startKey));
            scan.setStopRow(Bytes.toBytes(stopKey));
            scan.setCaching(1000);
            ResultScanner results = table.getScanner(scan);
            results.forEach(result -> {
                System.out.println("rowkey == "+Bytes.toString(result.getRow()));
                System.out.println("basic:name == "+Bytes.toString(result.getValue(Bytes.toBytes("basic"), Bytes.toBytes("name"))));
                System.out.println("basic:age == "+Bytes.toString(result.getValue(Bytes.toBytes("basic"), Bytes.toBytes("age"))));
                System.out.println("basic:sex == "+Bytes.toString(result.getValue(Bytes.toBytes("basic"), Bytes.toBytes("sex"))));
                System.out.println("basic:salary == "+Bytes.toString(result.getValue(Bytes.toBytes("extend"), Bytes.toBytes("salary"))));
                System.out.println("basic:job == "+Bytes.toString(result.getValue(Bytes.toBytes("extend"), Bytes.toBytes("job"))));
            });
            return results;
        }catch (Exception e){
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 删除行
     * @param tableName
     * @param rowkey
     * @return
     */
    public static boolean deleteRow(String tableName,String rowkey){
        try( Table table = HBaseConn.getTable(tableName)){
            Delete delete = new Delete(Bytes.toBytes(rowkey));
            table.delete(delete);
        }catch (Exception e){
            e.printStackTrace();
        }
        return true;
    }

    /**
     *删除列簇
     * @param tableName
     * @param cfName
     * @return
     */
    public static boolean deleteColumnFamily(String tableName,String cfName){
        try(HBaseAdmin admin = (HBaseAdmin)HBaseConn.getHBaseConn().getAdmin()){
            admin.deleteColumn(tableName,cfName);
        }catch (Exception e){
            e.printStackTrace();
        }
        return true;
    }
    /**
     * 删除列
     * @param tableName
     * @param cfName
     * @return
    */
    public static boolean deleteQualifier(String tableName,String rowkey,String cfName,String qualiferName){
        try(Table table = HBaseConn.getTable(tableName)){
            Delete delete = new Delete(Bytes.toBytes(rowkey));
            delete.addColumn(Bytes.toBytes(cfName),Bytes.toBytes(qualiferName));
            table.delete(delete);
        }catch (Exception e){
            e.printStackTrace();
        }
        return true;
    }
}

总结:

  • hbase 数据类型只有bytes类型,而关系型数据库 可以有多种多样的数据类型。所以我们根据rowkey,cf,qualifer等条件进行查询的时候,需要将对应的字段通过Bytes.toBytes()方法进行封装。同样,对于返回的result,也需要通过Bytes.toString()方法转为string对象。

  • hbase 不支持更新操作,要实现更新操作,只能够先删除该数据 然后重新插入。

  • 我们可以看到某些情况下只需要Table实例,即可实现响应操作,但是某些情况需要HBaseAdmin实例,才能进行修改,删除等操作。具体什么时候,使用那种操作呢?通过前面文章HBase读写流程https://blog.csdn.net/zhangshk_/article/details/83663642应该不难明白。对于修改删除列簇,还有涉及表的创建删除等操作,是需要HBaseAdmin操作的,因为这些操作需要涉及meta表的更新。而其他的操作,只需要通过zookeeper获取数据的regionserver,通过regionserver就可完成读写请求。

Logo

欢迎加入 MCP 技术社区!与志同道合者携手前行,一同解锁 MCP 技术的无限可能!

更多推荐