Java开发Hbase示例

使用Hbase操作数据

    package com.sunteng.clickidc.test;

    import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
import org.apache.hadoop.hbase.util.Bytes; /*
* 不需要实现数据库连接池,内置
* MAVEN依赖错误,使用另外的客户端包
* http://blog.sina.com.cn/s/blog_6a67b5c50100zbrx.html
*
* <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-shaded-client -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-shaded-client</artifactId>
<version>1.2.2</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency> * */
public class HbaseExample { /*
* 不强制性创建表
*
* @tableName 表名
* @family 列族列表
* @config 配置信息
*/
public static void creatTable(String tableName, String[] family, Configuration config) throws Exception {
try (Connection connection = ConnectionFactory.createConnection(config);
Admin admin = connection.getAdmin()) {
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
for (int i = 0; i < family.length; i++) {
desc.addFamily(new HColumnDescriptor(family[i]));
}
if (admin.tableExists(desc.getTableName())) {
System.out.println("table Exists!");
throw new Exception("table Exists!");
} else {
admin.createTable(desc);
System.out.println("create table Success!");
}
}
} /*
* 强制性创建表
*
* @tableName 表名
* @family 列族列表
* @config 配置信息
*/
public static void creatTableForce(String tableName, String[] family, Configuration config) throws Exception {
try (Connection connection = ConnectionFactory.createConnection(config);
Admin admin = connection.getAdmin()) {
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
for (int i = 0; i < family.length; i++) {
desc.addFamily(new HColumnDescriptor(family[i]));
}
if (admin.tableExists(desc.getTableName())) {
admin.disableTable(desc.getTableName());
admin.deleteTable(desc.getTableName());
}
admin.createTable(desc);
System.out.println("create table Success!"); }
} /*
* 删表
* @tableName 表名
* @config 配置信息
*
*/
public static void deleteTable(String tableName, Configuration config) throws Exception {
try (Connection connection = ConnectionFactory.createConnection(config);
Admin admin = connection.getAdmin()) {
TableName tn = TableName.valueOf(tableName);
if (admin.tableExists(tn)) {
admin.disableTable(tn);
admin.deleteTable(tn);
}
}
} /*
* 查看已有表
*
* @config 配置信息
*
*/
public static HTableDescriptor[] listTables(Configuration config) throws IOException {
try (Connection connection = ConnectionFactory.createConnection(config);
Admin admin = connection.getAdmin()) {
HTableDescriptor hTableDescriptors[] = admin.listTables();
return hTableDescriptors;
}
} /*
* 插入数据
*
* @tableName 表名
* @config 配置信息
* @rowkey 行key
* @colFamily 列族
* @col 子列
* @val 值
*
* */
public static void instertRow(String tableName, Configuration config, String rowkey, String colFamily, String col, String val) throws Exception {
try (Connection connection = ConnectionFactory.createConnection(config)) {
Table table = connection.getTable(TableName.valueOf(tableName));
Put put = new Put(Bytes.toBytes(rowkey));
put.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes(col), Bytes.toBytes(val));
table.put(put); //批量插入
/* List<Put> putList = new ArrayList<Put>();
puts.add(put);
table.put(putList);*/
table.close();
System.out.printf("adding success!!Table:%s,Row:%s,Column=%s:%s,Value=%s\n", tableName, rowkey, colFamily, col, val);
}
} /*
* 删除数据
*
* @tableName 表名
* @config 配置信息
* @rowkey 行key
* @colFamily 列族
* @col 子列
*
* */
public static void deleRow(String tableName, Configuration config, String rowkey, String colFamily, String col) throws Exception {
try (Connection connection = ConnectionFactory.createConnection(config)) {
Table table = connection.getTable(TableName.valueOf(tableName));
Delete delete = new Delete(Bytes.toBytes(rowkey));
//删除指定列族
if (colFamily != null && col == null)
delete.addFamily(Bytes.toBytes(colFamily));
//删除指定列
if (colFamily != null && col != null)
delete.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes(col));
table.delete(delete);
//批量删除
/* List<Delete> deleteList = new ArrayList<Delete>();
deleteList.add(delete);
table.delete(deleteList);*/
table.close();
}
} public static void deleRow(String tableName, Configuration config, String rowkey, String colFamily) throws Exception {
deleRow(tableName, config, rowkey, colFamily, null);
} public static void deleRow(String tableName, Configuration config, String rowkey) throws Exception {
deleRow(tableName, config, rowkey, null, null);
} /*
* 根据rowkey查找数据
*
* @tableName 表名
* @config 配置信息
* @rowkey 行key
* @colFamily 列族
* @col 子列
*
* */
public static Result getData(String tableName, Configuration config, String rowkey, String colFamily, String col) throws Exception {
try (Connection connection = ConnectionFactory.createConnection(config)) {
Table table = connection.getTable(TableName.valueOf(tableName));
Get get = new Get(Bytes.toBytes(rowkey));
if (colFamily != null && col == null)
get.addFamily(Bytes.toBytes(colFamily));
if (colFamily != null && col != null)
get.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes(col));
Result result = table.get(get);
table.close();
return result;
}
} public static Result getData(String tableName, Configuration config, String rowkey, String colFamily) throws Exception {
return getData(tableName, config, rowkey, colFamily, null);
} public static Result getData(String tableName, Configuration config, String rowkey) throws Exception {
return getData(tableName, config, rowkey, null, null);
} /*
* 批量查找数据
* @table 表名
* @config配置文件
* @startRow 开始的行key
* @stopRow 停止的行key
*
* hbase会将自己的元素按照key的ASCII码排序
* 找出5193开头的元素
*
* 5193:1
* 5193:2
* 5194:1
* 51939:1
* 51942:1
*
* scan.setStartRow("5193:#");
* scan.setStopRow("5193::");
*
* 原因:ASCII排序中:"#" < "0-9" < ":"
* 取出来的将是5193:后面跟着数字的元素
* */
public static List<Result> scanData(String tableName, Configuration config, String startRow, String stopRow, int limit) throws Exception {
try (Connection connection = ConnectionFactory.createConnection(config)) {
Table table = connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
if (startRow != null && stopRow != null) {
scan.setStartRow(Bytes.toBytes(startRow));
scan.setStopRow(Bytes.toBytes(stopRow));
}
scan.setBatch(limit);
List<Result> result = new ArrayList<Result>();
ResultScanner resultScanner = table.getScanner(scan);
for (Result r : resultScanner) {
result.add(r);
}
table.close();
return result;
}
} public static List<Result> scanData(String tableName, Configuration config, int limit) throws Exception {
return scanData(tableName, config, null, null, limit);
} /*
* 打印表
* @tables 打印的表描述对象
*
* */
public static void printTables(HTableDescriptor[] tables) {
for (HTableDescriptor t : tables) {
HColumnDescriptor[] columns = t.getColumnFamilies();
System.out.printf("tables:%s,columns-family:\n", t.getTableName());
for (HColumnDescriptor column : columns) {
System.out.printf("\t%s\n", column.getNameAsString());
}
}
} /*
* 格式化输出
* @result 结果
*
* */ public static void showCell(Result result) {
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
System.out.println("RowName:" + new String(CellUtil.cloneRow(cell)) + " ");
System.out.println("Timetamp:" + cell.getTimestamp() + " ");
System.out.println("column Family:" + new String(CellUtil.cloneFamily(cell)) + " ");
System.out.println("row Name:" + new String(CellUtil.cloneQualifier(cell)) + " ");
System.out.println("value:" + new String(CellUtil.cloneValue(cell)) + " ");
System.out.println("---------------");
}
} public static void main(String... args) {
Configuration config = HBaseConfiguration.create();
config.set("hbase.zookeeper.property.clientPort", "2181");
config.set("hbase.zookeeper.quorum", "192.168.11.73");
config.set("hbase.master", "192.168.11.73:60000");
String tablename = "visitor";
String[] column_family = {"value"};
try {
//创建表
// creatTableForce(tablename, column_family, config); //列出表信息
HTableDescriptor[] tables = listTables(config);
printTables(tables); //插入行
for (int i = 1; i < 5; i++)
instertRow(tablename, config, "row1", column_family[0], i + "", "value"); //获取单行值
Result result = getData(tablename, config, "row1", column_family[0]);
showCell(result); //扫描表,获取前20行
List<Result> results = scanData(tablename, config, 20);
for (Result r : results) {
showCell(r);
} } catch (Exception e) {
e.printStackTrace();
}
}
}
05-28 10:02