本文档风哥主要介绍HBase Java API开发实战,包括API概述、连接管理、DDL操作、DML操作等内容,风哥教程参考HBase官方文档Java API、Client Architecture等内容,适合大数据开发运维人员在学习和测试中使用,如果要应用于生产环境则需要自行确认。更多视频教程www.fgedu.net.cn
Part01-基础概念与理论知识
1.1 HBase Java API概述
HBase Java API是HBase提供的原生Java客户端接口,用于开发HBase应用程序。学习交流加群风哥微信: itpux-com
- Connection:连接对象,管理与HBase集群的连接
- Admin:管理对象,用于DDL操作
- Table:表对象,用于DML操作
- Put:插入操作对象
- Get:查询单行操作对象
- Scan:扫描操作对象
- Delete:删除操作对象
1.2 API架构设计
HBase Java API架构设计详解:
1. Connection层
– 管理与集群的连接
– 维护连接池
– 处理故障转移
2. Admin层
– 表管理操作
– 集群管理操作
– 命名空间管理
3. Table层
– 数据读写操作
– 批量操作
– 事务操作
4. Result层
– 结果处理
– 数据解析
# API调用流程
Connection -> Admin/Table -> Operation -> Result
# 核心接口
1. Table接口
– put(Put put)
– get(Get get)
– scan(Scan scan)
– delete(Delete delete)
2. Admin接口
– createTable(TableDescriptor desc)
– disableTable(TableName tableName)
– deleteTable(TableName tableName)
3. Result接口
– getValue(byte[] family, byte[] qualifier)
– getRow()
– isEmpty()
1.3 连接管理机制
HBase连接管理机制详解:
1. 连接创建
– ConnectionFactory.createConnection()
– 重量级操作,建议单例
2. 连接池
– 内置连接池
– 管理多个RegionServer连接
3. 连接复用
– 同一JVM共享Connection
– Table对象轻量级
4. 连接关闭
– 关闭Connection释放资源
– 关闭Table不影响Connection
# 连接配置
Configuration config = HBaseConfiguration.create();
config.set(“hbase.zookeeper.quorum”, “192.168.1.60,192.168.1.61,192.168.1.62”);
config.set(“hbase.zookeeper.property.clientPort”, “2181”);
config.set(“hbase.client.retries.number”, “3”);
config.set(“hbase.client.pause”, “100”);
config.set(“hbase.rpc.timeout”, “60000”);
# 连接生命周期
创建Connection -> 获取Table -> 执行操作 -> 关闭Table -> 关闭Connection
# 最佳实践
1. 应用启动时创建Connection
2. 每次操作获取Table
3. 操作完成后关闭Table
4. 应用关闭时关闭Connection
Part02-生产环境规划与建议
2.1 项目结构规划
项目结构规划建议:
fgedu-hbase-demo/
├── pom.xml
├── src/
│ ├── main/
│ │ ├── java/
│ │ │ └── com/fgedu/hbase/
│ │ │ ├── config/
│ │ │ │ └── HBaseConfig.java
│ │ │ ├── connection/
│ │ │ │ └── HBaseConnectionFactory.java
│ │ │ ├── dao/
│ │ │ │ └── UserDao.java
│ │ │ ├── service/
│ │ │ │ └── UserService.java
│ │ │ └── util/
│ │ │ └── HBaseUtil.java
│ │ └── resources/
│ │ └── hbase-site.xml
│ └── test/
│ └── java/
│ └── com/fgedu/hbase/
│ └── HBaseTest.java
└── README.md
# Maven依赖
2.2 连接池规划
连接池规划建议:
1. 连接数配置
hbase.client.ipc.pool.type=thread-local
hbase.client.ipc.pool.size=10
2. 超时配置
hbase.rpc.timeout=60000
hbase.client.operation.timeout=120000
hbase.client.scanner.timeout.period=60000
3. 重试配置
hbase.client.retries.number=3
hbase.client.pause=100
hbase.client.retries.longer.multiplier=10
# 连接池实现
public class HBaseConnectionPool {
private static final Logger LOG = LoggerFactory.getLogger(HBaseConnectionPool.class);
private static volatile Connection connection;
public static Connection getConnection() throws IOException {
if (connection == null || connection.isClosed()) {
synchronized (HBaseConnectionPool.class) {
if (connection == null || connection.isClosed()) {
Configuration config = HBaseConfiguration.create();
connection = ConnectionFactory.createConnection(config);
LOG.info(“HBase connection created successfully”);
}
}
}
return connection;
}
public static void closeConnection() {
if (connection != null) {
try {
connection.close();
LOG.info(“HBase connection closed successfully”);
} catch (IOException e) {
LOG.error(“Failed to close HBase connection”, e);
}
}
}
}
2.3 异常处理规划
异常处理规划建议:
1. IOException
– 连接异常
– 网络异常
2. RetriesExhaustedException
– 重试次数耗尽
3. RegionOfflineException
– Region离线
4. TableNotFoundException
– 表不存在
5. DoNotRetryIOException
– 不重试的异常
# 异常处理策略
try {
// HBase操作
} catch (RetriesExhaustedException e) {
// 重试失败,记录日志并报警
LOG.error(“HBase operation failed after retries”, e);
throw new ServiceException(“服务暂时不可用,请稍后重试”);
} catch (RegionOfflineException e) {
// Region离线,等待或重试
LOG.warn(“Region is offline, waiting for recovery”, e);
Thread.sleep(5000);
// 重试操作
} catch (IOException e) {
// IO异常,记录日志
LOG.error(“HBase IO exception”, e);
throw new ServiceException(“数据操作失败”);
}
Part03-生产环境项目实施方案
3.1 连接管理实现
3.1.1 配置类实现
package com.fgedu.hbase.config;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
public class HBaseConfig {
private static final String ZOOKEEPER_QUORUM = “192.168.1.60,192.168.1.61,192.168.1.62”;
private static final String ZOOKEEPER_PORT = “2181”;
private static final String HBASE_RPC_TIMEOUT = “60000”;
private static final String HBASE_CLIENT_TIMEOUT = “120000”;
private static final String HBASE_RETRIES = “3”;
public static Configuration getConfiguration() {
Configuration config = HBaseConfiguration.create();
config.set(“hbase.zookeeper.quorum”, ZOOKEEPER_QUORUM);
config.set(“hbase.zookeeper.property.clientPort”, ZOOKEEPER_PORT);
config.set(“hbase.rpc.timeout”, HBASE_RPC_TIMEOUT);
config.set(“hbase.client.operation.timeout”, HBASE_CLIENT_TIMEOUT);
config.set(“hbase.client.retries.number”, HBASE_RETRIES);
return config;
}
}
# HBaseConnectionFactory.java
package com.fgedu.hbase.connection;
import com.fgedu.hbase.config.HBaseConfig;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
public class HBaseConnectionFactory {
private static final Logger LOG = LoggerFactory.getLogger(HBaseConnectionFactory.class);
private static volatile Connection connection;
private HBaseConnectionFactory() {}
public static Connection getConnection() throws IOException {
if (connection == null || connection.isClosed()) {
synchronized (HBaseConnectionFactory.class) {
if (connection == null || connection.isClosed()) {
Configuration config = HBaseConfig.getConfiguration();
connection = ConnectionFactory.createConnection(config);
LOG.info(“HBase connection created successfully”);
}
}
}
return connection;
}
public static void closeConnection() {
if (connection != null) {
synchronized (HBaseConnectionFactory.class) {
if (connection != null) {
try {
connection.close();
connection = null;
LOG.info(“HBase connection closed successfully”);
} catch (IOException e) {
LOG.error(“Failed to close HBase connection”, e);
}
}
}
}
}
}
3.1.2 连接测试
package com.fgedu.hbase.test;
import com.fgedu.hbase.connection.HBaseConnectionFactory;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.junit.Test;
import java.io.IOException;
public class ConnectionTest {
@Test
public void testConnection() throws IOException {
Connection connection = HBaseConnectionFactory.getConnection();
System.out.println(“Connection is closed: ” + connection.isClosed());
System.out.println(“Connection class: ” + connection.getClass().getName());
Admin admin = connection.getAdmin();
System.out.println(“Cluster status: ” + admin.getClusterMetrics());
System.out.println(“Master: ” + admin.getClusterMetrics().getMasterName());
admin.close();
HBaseConnectionFactory.closeConnection();
}
}
# 执行结果
Connection is closed: false
Connection class: org.apache.hadoop.hbase.client.ConnectionImpl
Cluster status: ClusterMetrics{liveNodes=3, deadNodes=0, master=fgedu-node1:16000, …}
Master: fgedu-node1:16000
HBase connection closed successfully
3.2 DDL操作实现
3.2.1 表管理实现
package com.fgedu.hbase.manager;
import com.fgedu.hbase.connection.HBaseConnectionFactory;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
public class HBaseTableManager {
public void createTable(String tableName, String[] columnFamilies) throws IOException {
try (Connection connection = HBaseConnectionFactory.getConnection();
Admin admin = connection.getAdmin()) {
TableName table = TableName.valueOf(tableName);
if (admin.tableExists(table)) {
System.out.println(“Table ” + tableName + ” already exists”);
return;
}
TableDescriptorBuilder tableBuilder = TableDescriptorBuilder.newBuilder(table);
for (String cf : columnFamilies) {
ColumnFamilyDescriptor familyDescriptor = ColumnFamilyDescriptorBuilder
.newBuilder(Bytes.toBytes(cf))
.setCompressionType(Compression.Algorithm.SNAPPY)
.setBloomFilterType(BloomType.ROW)
.setMaxVersions(1)
.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF)
.build();
tableBuilder.setColumnFamily(familyDescriptor);
}
admin.createTable(tableBuilder.build());
System.out.println(“Table ” + tableName + ” created successfully”);
}
}
public void createTableWithSplits(String tableName, String[] columnFamilies,
byte[][] splitKeys) throws IOException {
try (Connection connection = HBaseConnectionFactory.getConnection();
Admin admin = connection.getAdmin()) {
TableName table = TableName.valueOf(tableName);
if (admin.tableExists(table)) {
System.out.println(“Table ” + tableName + ” already exists”);
return;
}
TableDescriptorBuilder tableBuilder = TableDescriptorBuilder.newBuilder(table);
for (String cf : columnFamilies) {
ColumnFamilyDescriptor familyDescriptor = ColumnFamilyDescriptorBuilder
.newBuilder(Bytes.toBytes(cf))
.setCompressionType(Compression.Algorithm.SNAPPY)
.setBloomFilterType(BloomType.ROW)
.setMaxVersions(1)
.build();
tableBuilder.setColumnFamily(familyDescriptor);
}
admin.createTable(tableBuilder.build(), splitKeys);
System.out.println(“Table ” + tableName + ” created with splits successfully”);
}
}
public void disableTable(String tableName) throws IOException {
try (Connection connection = HBaseConnectionFactory.getConnection();
Admin admin = connection.getAdmin()) {
TableName table = TableName.valueOf(tableName);
if (!admin.tableExists(table)) {
System.out.println(“Table ” + tableName + ” does not exist”);
return;
}
if (admin.isTableDisabled(table)) {
System.out.println(“Table ” + tableName + ” is already disabled”);
return;
}
admin.disableTable(table);
System.out.println(“Table ” + tableName + ” disabled successfully”);
}
}
public void dropTable(String tableName) throws IOException {
try (Connection connection = HBaseConnectionFactory.getConnection();
Admin admin = connection.getAdmin()) {
TableName table = TableName.valueOf(tableName);
if (!admin.tableExists(table)) {
System.out.println(“Table ” + tableName + ” does not exist”);
return;
}
if (!admin.isTableDisabled(table)) {
admin.disableTable(table);
}
admin.deleteTable(table);
System.out.println(“Table ” + tableName + ” dropped successfully”);
}
}
public void listTables() throws IOException {
try (Connection connection = HBaseConnectionFactory.getConnection();
Admin admin = connection.getAdmin()) {
TableName[] tableNames = admin.listTableNames();
System.out.println(“Total tables: ” + tableNames.length);
for (TableName tableName : tableNames) {
System.out.println(” ” + tableName.getNameAsString());
}
}
}
}
3.2.2 表管理测试
package com.fgedu.hbase.test;
import com.fgedu.hbase.manager.HBaseTableManager;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
public class TableManagerTest {
private HBaseTableManager tableManager;
@Before
public void setUp() {
tableManager = new HBaseTableManager();
}
@Test
public void testCreateTable() throws IOException {
tableManager.createTable(“fgedu_user_api”, new String[]{“info”, “detail”});
}
@Test
public void testCreateTableWithSplits() throws IOException {
byte[][] splitKeys = new byte[9][];
for (int i = 1; i <= 9; i++) {
splitKeys[i - 1] = Bytes.toBytes(String.valueOf(i) + "0000000");
}
tableManager.createTableWithSplits(
"fgedu_order_api",
new String[]{"basic", "status"},
splitKeys
);
}
@Test
public void testListTables() throws IOException {
tableManager.listTables();
}
@Test
public void testDropTable() throws IOException {
tableManager.dropTable("fgedu_user_api");
}
}
# 执行结果
Table fgedu_user_api created successfully
Table fgedu_order_api created with splits successfully
Total tables: 2
fgedu_order_api
fgedu_user_api
Table fgedu_user_api dropped successfully
3.3 DML操作实现
3.3.1 数据插入实现
package com.fgedu.hbase.dao;
import com.fgedu.hbase.connection.HBaseConnectionFactory;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class HBaseDao {
private static final byte[] CF_INFO = Bytes.toBytes(“info”);
private static final byte[] COL_NAME = Bytes.toBytes(“name”);
private static final byte[] COL_AGE = Bytes.toBytes(“age”);
private static final byte[] COL_CITY = Bytes.toBytes(“city”);
public void put(String tableName, String rowKey, String family,
String qualifier, String value) throws IOException {
try (Connection connection = HBaseConnectionFactory.getConnection();
Table table = connection.getTable(TableName.valueOf(tableName))) {
Put put = new Put(Bytes.toBytes(rowKey));
put.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier), Bytes.toBytes(value));
table.put(put);
System.out.println(“Put data successfully: ” + rowKey);
}
}
public void putUser(String tableName, String userId, String name,
int age, String city) throws IOException {
try (Connection connection = HBaseConnectionFactory.getConnection();
Table table = connection.getTable(TableName.valueOf(tableName))) {
Put put = new Put(Bytes.toBytes(userId));
put.addColumn(CF_INFO, COL_NAME, Bytes.toBytes(name));
put.addColumn(CF_INFO, COL_AGE, Bytes.toBytes(age));
put.addColumn(CF_INFO, COL_CITY, Bytes.toBytes(city));
table.put(put);
System.out.println(“Put user successfully: ” + userId);
}
}
public void batchPut(String tableName, List
try (Connection connection = HBaseConnectionFactory.getConnection();
Table table = connection.getTable(TableName.valueOf(tableName))) {
table.put(puts);
System.out.println(“Batch put successfully: ” + puts.size() + ” rows”);
}
}
public List
List
for (int i = 1; i <= count; i++) { String userId = String.format("user_%08d", i); String name = "fgedu" + String.format("%02d", i % 100); int age = 20 + (i % 40); String[] cities = {"北京", "上海", "广州", "深圳", "杭州"}; String city = cities[i % cities.length]; Put put = new Put(Bytes.toBytes(userId)); put.addColumn(CF_INFO, COL_NAME, Bytes.toBytes(name)); put.addColumn(CF_INFO, COL_AGE, Bytes.toBytes(age)); put.addColumn(CF_INFO, COL_CITY, Bytes.toBytes(city)); puts.add(put); } return puts; } }
3.3.2 数据查询实现
package com.fgedu.hbase.dao;
import com.fgedu.hbase.connection.HBaseConnectionFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class HBaseQueryDao {
public void get(String tableName, String rowKey) throws IOException {
try (Connection connection = HBaseConnectionFactory.getConnection();
Table table = connection.getTable(TableName.valueOf(tableName))) {
Get get = new Get(Bytes.toBytes(rowKey));
Result result = table.get(get);
if (result.isEmpty()) {
System.out.println(“No data found for rowKey: ” + rowKey);
return;
}
System.out.println(“Row: ” + rowKey);
for (Cell cell : result.rawCells()) {
String family = Bytes.toString(CellUtil.cloneFamily(cell));
String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));
long timestamp = cell.getTimestamp();
System.out.println(” ” + family + “:” + qualifier + ” = ” + value +
” (timestamp: ” + timestamp + “)”);
}
}
}
public Map
Map
try (Connection connection = HBaseConnectionFactory.getConnection();
Table table = connection.getTable(TableName.valueOf(tableName))) {
Get get = new Get(Bytes.toBytes(rowKey));
Result rs = table.get(get);
for (Cell cell : rs.rawCells()) {
String family = Bytes.toString(CellUtil.cloneFamily(cell));
String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));
result.put(family + “:” + qualifier, value);
}
}
return result;
}
public void scan(String tableName, String startRow, String stopRow,
int limit) throws IOException {
try (Connection connection = HBaseConnectionFactory.getConnection();
Table table = connection.getTable(TableName.valueOf(tableName))) {
Scan scan = new Scan();
scan.withStartRow(Bytes.toBytes(startRow));
scan.withStopRow(Bytes.toBytes(stopRow));
scan.setLimit(limit);
try (ResultScanner scanner = table.getScanner(scan)) {
int count = 0;
for (Result result : scanner) {
String rowKey = Bytes.toString(result.getRow());
System.out.println(“Row: ” + rowKey);
for (Cell cell : result.rawCells()) {
String family = Bytes.toString(CellUtil.cloneFamily(cell));
String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));
System.out.println(” ” + family + “:” + qualifier + ” = ” + value);
}
count++;
if (count >= limit) {
break;
}
}
System.out.println(“Total rows scanned: ” + count);
}
}
}
public void scanWithFilter(String tableName, String prefix, int limit)
throws IOException {
try (Connection connection = HBaseConnectionFactory.getConnection();
Table table = connection.getTable(TableName.valueOf(tableName))) {
Scan scan = new Scan();
scan.setRowPrefixFilter(Bytes.toBytes(prefix));
scan.setLimit(limit);
try (ResultScanner scanner = table.getScanner(scan)) {
int count = 0;
for (Result result : scanner) {
String rowKey = Bytes.toString(result.getRow());
System.out.println(“Row: ” + rowKey);
for (Cell cell : result.rawCells()) {
String family = Bytes.toString(CellUtil.cloneFamily(cell));
String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));
System.out.println(” ” + family + “:” + qualifier + ” = ” + value);
}
count++;
}
System.out.println(“Total rows matched: ” + count);
}
}
}
}
Part04-生产案例与实战讲解
4.1 用户服务案例
package com.fgedu.hbase.service;
import com.fgedu.hbase.dao.HBaseDao;
import com.fgedu.hbase.dao.HBaseQueryDao;
import org.apache.hadoop.hbase.client.Put;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
import java.util.Map;
public class UserService {
private static final Logger LOG = LoggerFactory.getLogger(UserService.class);
private static final String TABLE_NAME = “fgedu_user_profile”;
private HBaseDao hbaseDao;
private HBaseQueryDao queryDao;
public UserService() {
this.hbaseDao = new HBaseDao();
this.queryDao = new HBaseQueryDao();
}
public void addUser(String userId, String name, int age, String city) {
try {
hbaseDao.putUser(TABLE_NAME, userId, name, age, city);
LOG.info(“User added successfully: {}”, userId);
} catch (IOException e) {
LOG.error(“Failed to add user: {}”, userId, e);
throw new RuntimeException(“Failed to add user”, e);
}
}
public Map
try {
return queryDao.getAsMap(TABLE_NAME, userId);
} catch (IOException e) {
LOG.error(“Failed to get user: {}”, userId, e);
throw new RuntimeException(“Failed to get user”, e);
}
}
public void batchAddUsers(int count) {
try {
List
hbaseDao.batchPut(TABLE_NAME, puts);
LOG.info(“Batch add users successfully: {} users”, count);
} catch (IOException e) {
LOG.error(“Failed to batch add users”, e);
throw new RuntimeException(“Failed to batch add users”, e);
}
}
public void listUsers(String prefix, int limit) {
try {
queryDao.scanWithFilter(TABLE_NAME, prefix, limit);
} catch (IOException e) {
LOG.error(“Failed to list users”, e);
throw new RuntimeException(“Failed to list users”, e);
}
}
}
# UserServiceTest.java
package com.fgedu.hbase.test;
import com.fgedu.hbase.service.UserService;
import org.junit.Before;
import org.junit.Test;
import java.util.Map;
public class UserServiceTest {
private UserService userService;
@Before
public void setUp() {
userService = new UserService();
}
@Test
public void testAddUser() {
userService.addUser(“user_00000001”, “fgedu01”, 25, “北京”);
}
@Test
public void testGetUser() {
Map
System.out.println(“User info: ” + user);
}
@Test
public void testBatchAddUsers() {
userService.batchAddUsers(100);
}
@Test
public void testListUsers() {
userService.listUsers(“user_”, 10);
}
}
# 执行结果
User added successfully: user_00000001
User info: {info:name=fgedu01, info:age=25, info:city=北京}
Batch add users successfully: 100 users
Total rows matched: 10
4.2 批量操作案例
package com.fgedu.hbase.batch;
import com.fgedu.hbase.connection.HBaseConnectionFactory;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class BatchOperations {
private static final int BATCH_SIZE = 1000;
public void batchPut(String tableName, List
try (Connection connection = HBaseConnectionFactory.getConnection();
Table table = connection.getTable(TableName.valueOf(tableName))) {
int total = puts.size();
int batches = (total + BATCH_SIZE – 1) / BATCH_SIZE;
for (int i = 0; i < batches; i++) {
int start = i * BATCH_SIZE;
int end = Math.min(start + BATCH_SIZE, total);
List
table.put(batch);
System.out.println(“Batch ” + (i + 1) + “/” + batches +
” completed: ” + batch.size() + ” rows”);
}
System.out.println(“Total rows inserted: ” + total);
}
}
public void batchGet(String tableName, List
try (Connection connection = HBaseConnectionFactory.getConnection();
Table table = connection.getTable(TableName.valueOf(tableName))) {
List
for (String rowKey : rowKeys) {
gets.add(new Get(Bytes.toBytes(rowKey)));
}
Result[] results = table.get(gets);
int count = 0;
for (Result result : results) {
if (!result.isEmpty()) {
count++;
String rowKey = Bytes.toString(result.getRow());
System.out.println(“Found: ” + rowKey);
}
}
System.out.println(“Total rows found: ” + count);
}
}
public void batchDelete(String tableName, List
try (Connection connection = HBaseConnectionFactory.getConnection();
Table table = connection.getTable(TableName.valueOf(tableName))) {
List
for (String rowKey : rowKeys) {
deletes.add(new Delete(Bytes.toBytes(rowKey)));
}
table.delete(deletes);
System.out.println(“Total rows deleted: ” + deletes.size());
}
}
}
4.3 常见问题处理
4.3.1 连接超时问题
# 排查步骤
# 1. 检查网络连通性
$ ping 192.168.1.60
# 2. 检查端口
$ telnet 192.168.1.60 16000
# 解决方案
# 1. 增加超时时间
config.set(“hbase.rpc.timeout”, “120000”);
config.set(“hbase.client.operation.timeout”, “180000”);
# 2. 增加重试次数
config.set(“hbase.client.retries.number”, “5”);
# 3. 检查ZooKeeper连接
config.set(“hbase.zookeeper.recoverable.waittime”, “10000”);
4.3.2 性能问题
# 排查步骤
# 1. 检查Scan配置
# 2. 检查缓存设置
# 3. 检查过滤器使用
# 解决方案
# 1. 设置Scan缓存
Scan scan = new Scan();
scan.setCaching(100); // 每次RPC返回100行
scan.setCacheBlocks(true); // 启用BlockCache
# 2. 使用批量获取
scan.setBatch(10); // 每行返回10列
# 3. 使用过滤器
Filter filter = new PageFilter(100);
scan.setFilter(filter);
# 4. 指定列族和列
scan.addFamily(Bytes.toBytes(“info”));
scan.addColumn(Bytes.toBytes(“info”), Bytes.toBytes(“name”));
Part05-风哥经验总结与分享
5.1 API最佳实践
HBase Java API最佳实践建议:
1. 复用Connection对象
2. 使用try-with-resources
3. 合理设置Scan缓存
4. 使用批量操作
5. 正确处理异常
5.2 性能优化建议
性能优化建议:
- 设置合理的Scan缓存大小
- 使用批量操作减少RPC
- 指定查询列减少数据传输
- 使用异步API提高并发
5.3 工具推荐
开发工具:
- IDEA:Java开发IDE
- Maven:项目构建工具
- JUnit:单元测试框架
- HBase Testing Utility:测试工具
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
