1. 首页 > Hadoop教程 > 正文

大数据教程FG075-HBase Java API开发实战

本文档风哥主要介绍HBase Java API开发实战,包括API概述、连接管理、DDL操作、DML操作等内容,风哥教程参考HBase官方文档Java API、Client Architecture等内容,适合大数据开发运维人员在学习和测试中使用,如果要应用于生产环境则需要自行确认。更多视频教程www.fgedu.net.cn

Part01-基础概念与理论知识

1.1 HBase Java API概述

HBase Java API是HBase提供的原生Java客户端接口,用于开发HBase应用程序。学习交流加群风哥微信: itpux-com

HBase Java API核心类:

  • Connection:连接对象,管理与HBase集群的连接
  • Admin:管理对象,用于DDL操作
  • Table:表对象,用于DML操作
  • Put:插入操作对象
  • Get:查询单行操作对象
  • Scan:扫描操作对象
  • Delete:删除操作对象

1.2 API架构设计

HBase Java API架构设计详解:

# API架构层次

1. Connection层
– 管理与集群的连接
– 维护连接池
– 处理故障转移

2. Admin层
– 表管理操作
– 集群管理操作
– 命名空间管理

3. Table层
– 数据读写操作
– 批量操作
– 事务操作

4. Result层
– 结果处理
– 数据解析

# API调用流程
Connection -> Admin/Table -> Operation -> Result

# 核心接口
1. Table接口
– put(Put put)
– get(Get get)
– scan(Scan scan)
– delete(Delete delete)

2. Admin接口
– createTable(TableDescriptor desc)
– disableTable(TableName tableName)
– deleteTable(TableName tableName)

3. Result接口
– getValue(byte[] family, byte[] qualifier)
– getRow()
– isEmpty()

1.3 连接管理机制

HBase连接管理机制详解:

# 连接管理机制

1. 连接创建
– ConnectionFactory.createConnection()
– 重量级操作,建议单例

2. 连接池
– 内置连接池
– 管理多个RegionServer连接

3. 连接复用
– 同一JVM共享Connection
– Table对象轻量级

4. 连接关闭
– 关闭Connection释放资源
– 关闭Table不影响Connection

# 连接配置
Configuration config = HBaseConfiguration.create();
config.set(“hbase.zookeeper.quorum”, “192.168.1.60,192.168.1.61,192.168.1.62”);
config.set(“hbase.zookeeper.property.clientPort”, “2181”);
config.set(“hbase.client.retries.number”, “3”);
config.set(“hbase.client.pause”, “100”);
config.set(“hbase.rpc.timeout”, “60000”);

# 连接生命周期
创建Connection -> 获取Table -> 执行操作 -> 关闭Table -> 关闭Connection

# 最佳实践
1. 应用启动时创建Connection
2. 每次操作获取Table
3. 操作完成后关闭Table
4. 应用关闭时关闭Connection

风哥提示:Connection是重量级对象,创建成本高,建议在应用启动时创建并复用。Table是轻量级对象,可以频繁创建和关闭。

Part02-生产环境规划与建议

2.1 项目结构规划

项目结构规划建议:

# Maven项目结构

fgedu-hbase-demo/
├── pom.xml
├── src/
│ ├── main/
│ │ ├── java/
│ │ │ └── com/fgedu/hbase/
│ │ │ ├── config/
│ │ │ │ └── HBaseConfig.java
│ │ │ ├── connection/
│ │ │ │ └── HBaseConnectionFactory.java
│ │ │ ├── dao/
│ │ │ │ └── UserDao.java
│ │ │ ├── service/
│ │ │ │ └── UserService.java
│ │ │ └── util/
│ │ │ └── HBaseUtil.java
│ │ └── resources/
│ │ └── hbase-site.xml
│ └── test/
│ └── java/
│ └── com/fgedu/hbase/
│ └── HBaseTest.java
└── README.md

# Maven依赖


org.apache.hbase
hbase-client
2.5.5


org.apache.hbase
hbase-common
2.5.5


org.apache.hadoop
hadoop-common
3.3.6

2.2 连接池规划

连接池规划建议:

# 连接池配置

1. 连接数配置
hbase.client.ipc.pool.type=thread-local
hbase.client.ipc.pool.size=10

2. 超时配置
hbase.rpc.timeout=60000
hbase.client.operation.timeout=120000
hbase.client.scanner.timeout.period=60000

3. 重试配置
hbase.client.retries.number=3
hbase.client.pause=100
hbase.client.retries.longer.multiplier=10

# 连接池实现
public class HBaseConnectionPool {
private static final Logger LOG = LoggerFactory.getLogger(HBaseConnectionPool.class);
private static volatile Connection connection;

public static Connection getConnection() throws IOException {
if (connection == null || connection.isClosed()) {
synchronized (HBaseConnectionPool.class) {
if (connection == null || connection.isClosed()) {
Configuration config = HBaseConfiguration.create();
connection = ConnectionFactory.createConnection(config);
LOG.info(“HBase connection created successfully”);
}
}
}
return connection;
}

public static void closeConnection() {
if (connection != null) {
try {
connection.close();
LOG.info(“HBase connection closed successfully”);
} catch (IOException e) {
LOG.error(“Failed to close HBase connection”, e);
}
}
}
}

2.3 异常处理规划

异常处理规划建议:

# 异常类型

1. IOException
– 连接异常
– 网络异常

2. RetriesExhaustedException
– 重试次数耗尽

3. RegionOfflineException
– Region离线

4. TableNotFoundException
– 表不存在

5. DoNotRetryIOException
– 不重试的异常

# 异常处理策略
try {
// HBase操作
} catch (RetriesExhaustedException e) {
// 重试失败,记录日志并报警
LOG.error(“HBase operation failed after retries”, e);
throw new ServiceException(“服务暂时不可用,请稍后重试”);
} catch (RegionOfflineException e) {
// Region离线,等待或重试
LOG.warn(“Region is offline, waiting for recovery”, e);
Thread.sleep(5000);
// 重试操作
} catch (IOException e) {
// IO异常,记录日志
LOG.error(“HBase IO exception”, e);
throw new ServiceException(“数据操作失败”);
}

生产环境建议:生产环境建议使用连接池管理Connection,配置合理的超时和重试参数。异常处理需要区分可恢复和不可恢复异常。学习交流加群风哥QQ113257174

Part03-生产环境项目实施方案

3.1 连接管理实现

3.1.1 配置类实现

# HBaseConfig.java
package com.fgedu.hbase.config;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;

public class HBaseConfig {

private static final String ZOOKEEPER_QUORUM = “192.168.1.60,192.168.1.61,192.168.1.62”;
private static final String ZOOKEEPER_PORT = “2181”;
private static final String HBASE_RPC_TIMEOUT = “60000”;
private static final String HBASE_CLIENT_TIMEOUT = “120000”;
private static final String HBASE_RETRIES = “3”;

public static Configuration getConfiguration() {
Configuration config = HBaseConfiguration.create();

config.set(“hbase.zookeeper.quorum”, ZOOKEEPER_QUORUM);
config.set(“hbase.zookeeper.property.clientPort”, ZOOKEEPER_PORT);
config.set(“hbase.rpc.timeout”, HBASE_RPC_TIMEOUT);
config.set(“hbase.client.operation.timeout”, HBASE_CLIENT_TIMEOUT);
config.set(“hbase.client.retries.number”, HBASE_RETRIES);

return config;
}
}

# HBaseConnectionFactory.java
package com.fgedu.hbase.connection;

import com.fgedu.hbase.config.HBaseConfig;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

public class HBaseConnectionFactory {

private static final Logger LOG = LoggerFactory.getLogger(HBaseConnectionFactory.class);

private static volatile Connection connection;

private HBaseConnectionFactory() {}

public static Connection getConnection() throws IOException {
if (connection == null || connection.isClosed()) {
synchronized (HBaseConnectionFactory.class) {
if (connection == null || connection.isClosed()) {
Configuration config = HBaseConfig.getConfiguration();
connection = ConnectionFactory.createConnection(config);
LOG.info(“HBase connection created successfully”);
}
}
}
return connection;
}

public static void closeConnection() {
if (connection != null) {
synchronized (HBaseConnectionFactory.class) {
if (connection != null) {
try {
connection.close();
connection = null;
LOG.info(“HBase connection closed successfully”);
} catch (IOException e) {
LOG.error(“Failed to close HBase connection”, e);
}
}
}
}
}
}

3.1.2 连接测试

# 连接测试代码
package com.fgedu.hbase.test;

import com.fgedu.hbase.connection.HBaseConnectionFactory;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.junit.Test;

import java.io.IOException;

public class ConnectionTest {

@Test
public void testConnection() throws IOException {
Connection connection = HBaseConnectionFactory.getConnection();

System.out.println(“Connection is closed: ” + connection.isClosed());
System.out.println(“Connection class: ” + connection.getClass().getName());

Admin admin = connection.getAdmin();
System.out.println(“Cluster status: ” + admin.getClusterMetrics());
System.out.println(“Master: ” + admin.getClusterMetrics().getMasterName());

admin.close();
HBaseConnectionFactory.closeConnection();
}
}

# 执行结果
Connection is closed: false
Connection class: org.apache.hadoop.hbase.client.ConnectionImpl
Cluster status: ClusterMetrics{liveNodes=3, deadNodes=0, master=fgedu-node1:16000, …}
Master: fgedu-node1:16000
HBase connection closed successfully

3.2 DDL操作实现

3.2.1 表管理实现

# HBaseTableManager.java
package com.fgedu.hbase.manager;

import com.fgedu.hbase.connection.HBaseConnectionFactory;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;

public class HBaseTableManager {

public void createTable(String tableName, String[] columnFamilies) throws IOException {
try (Connection connection = HBaseConnectionFactory.getConnection();
Admin admin = connection.getAdmin()) {

TableName table = TableName.valueOf(tableName);

if (admin.tableExists(table)) {
System.out.println(“Table ” + tableName + ” already exists”);
return;
}

TableDescriptorBuilder tableBuilder = TableDescriptorBuilder.newBuilder(table);

for (String cf : columnFamilies) {
ColumnFamilyDescriptor familyDescriptor = ColumnFamilyDescriptorBuilder
.newBuilder(Bytes.toBytes(cf))
.setCompressionType(Compression.Algorithm.SNAPPY)
.setBloomFilterType(BloomType.ROW)
.setMaxVersions(1)
.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF)
.build();
tableBuilder.setColumnFamily(familyDescriptor);
}

admin.createTable(tableBuilder.build());
System.out.println(“Table ” + tableName + ” created successfully”);
}
}

public void createTableWithSplits(String tableName, String[] columnFamilies,
byte[][] splitKeys) throws IOException {
try (Connection connection = HBaseConnectionFactory.getConnection();
Admin admin = connection.getAdmin()) {

TableName table = TableName.valueOf(tableName);

if (admin.tableExists(table)) {
System.out.println(“Table ” + tableName + ” already exists”);
return;
}

TableDescriptorBuilder tableBuilder = TableDescriptorBuilder.newBuilder(table);

for (String cf : columnFamilies) {
ColumnFamilyDescriptor familyDescriptor = ColumnFamilyDescriptorBuilder
.newBuilder(Bytes.toBytes(cf))
.setCompressionType(Compression.Algorithm.SNAPPY)
.setBloomFilterType(BloomType.ROW)
.setMaxVersions(1)
.build();
tableBuilder.setColumnFamily(familyDescriptor);
}

admin.createTable(tableBuilder.build(), splitKeys);
System.out.println(“Table ” + tableName + ” created with splits successfully”);
}
}

public void disableTable(String tableName) throws IOException {
try (Connection connection = HBaseConnectionFactory.getConnection();
Admin admin = connection.getAdmin()) {

TableName table = TableName.valueOf(tableName);

if (!admin.tableExists(table)) {
System.out.println(“Table ” + tableName + ” does not exist”);
return;
}

if (admin.isTableDisabled(table)) {
System.out.println(“Table ” + tableName + ” is already disabled”);
return;
}

admin.disableTable(table);
System.out.println(“Table ” + tableName + ” disabled successfully”);
}
}

public void dropTable(String tableName) throws IOException {
try (Connection connection = HBaseConnectionFactory.getConnection();
Admin admin = connection.getAdmin()) {

TableName table = TableName.valueOf(tableName);

if (!admin.tableExists(table)) {
System.out.println(“Table ” + tableName + ” does not exist”);
return;
}

if (!admin.isTableDisabled(table)) {
admin.disableTable(table);
}

admin.deleteTable(table);
System.out.println(“Table ” + tableName + ” dropped successfully”);
}
}

public void listTables() throws IOException {
try (Connection connection = HBaseConnectionFactory.getConnection();
Admin admin = connection.getAdmin()) {

TableName[] tableNames = admin.listTableNames();

System.out.println(“Total tables: ” + tableNames.length);
for (TableName tableName : tableNames) {
System.out.println(” ” + tableName.getNameAsString());
}
}
}
}

3.2.2 表管理测试

# 表管理测试
package com.fgedu.hbase.test;

import com.fgedu.hbase.manager.HBaseTableManager;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;

public class TableManagerTest {

private HBaseTableManager tableManager;

@Before
public void setUp() {
tableManager = new HBaseTableManager();
}

@Test
public void testCreateTable() throws IOException {
tableManager.createTable(“fgedu_user_api”, new String[]{“info”, “detail”});
}

@Test
public void testCreateTableWithSplits() throws IOException {
byte[][] splitKeys = new byte[9][];
for (int i = 1; i <= 9; i++) { splitKeys[i - 1] = Bytes.toBytes(String.valueOf(i) + "0000000"); } tableManager.createTableWithSplits( "fgedu_order_api", new String[]{"basic", "status"}, splitKeys ); } @Test public void testListTables() throws IOException { tableManager.listTables(); } @Test public void testDropTable() throws IOException { tableManager.dropTable("fgedu_user_api"); } } # 执行结果 Table fgedu_user_api created successfully Table fgedu_order_api created with splits successfully Total tables: 2 fgedu_order_api fgedu_user_api Table fgedu_user_api dropped successfully

3.3 DML操作实现

3.3.1 数据插入实现

# HBaseDao.java
package com.fgedu.hbase.dao;

import com.fgedu.hbase.connection.HBaseConnectionFactory;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class HBaseDao {

private static final byte[] CF_INFO = Bytes.toBytes(“info”);
private static final byte[] COL_NAME = Bytes.toBytes(“name”);
private static final byte[] COL_AGE = Bytes.toBytes(“age”);
private static final byte[] COL_CITY = Bytes.toBytes(“city”);

public void put(String tableName, String rowKey, String family,
String qualifier, String value) throws IOException {
try (Connection connection = HBaseConnectionFactory.getConnection();
Table table = connection.getTable(TableName.valueOf(tableName))) {

Put put = new Put(Bytes.toBytes(rowKey));
put.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier), Bytes.toBytes(value));

table.put(put);
System.out.println(“Put data successfully: ” + rowKey);
}
}

public void putUser(String tableName, String userId, String name,
int age, String city) throws IOException {
try (Connection connection = HBaseConnectionFactory.getConnection();
Table table = connection.getTable(TableName.valueOf(tableName))) {

Put put = new Put(Bytes.toBytes(userId));
put.addColumn(CF_INFO, COL_NAME, Bytes.toBytes(name));
put.addColumn(CF_INFO, COL_AGE, Bytes.toBytes(age));
put.addColumn(CF_INFO, COL_CITY, Bytes.toBytes(city));

table.put(put);
System.out.println(“Put user successfully: ” + userId);
}
}

public void batchPut(String tableName, List puts) throws IOException {
try (Connection connection = HBaseConnectionFactory.getConnection();
Table table = connection.getTable(TableName.valueOf(tableName))) {

table.put(puts);
System.out.println(“Batch put successfully: ” + puts.size() + ” rows”);
}
}

public List createPutList(int count) {
List puts = new ArrayList<>();

for (int i = 1; i <= count; i++) { String userId = String.format("user_%08d", i); String name = "fgedu" + String.format("%02d", i % 100); int age = 20 + (i % 40); String[] cities = {"北京", "上海", "广州", "深圳", "杭州"}; String city = cities[i % cities.length]; Put put = new Put(Bytes.toBytes(userId)); put.addColumn(CF_INFO, COL_NAME, Bytes.toBytes(name)); put.addColumn(CF_INFO, COL_AGE, Bytes.toBytes(age)); put.addColumn(CF_INFO, COL_CITY, Bytes.toBytes(city)); puts.add(put); } return puts; } }

3.3.2 数据查询实现

# HBaseQueryDao.java
package com.fgedu.hbase.dao;

import com.fgedu.hbase.connection.HBaseConnectionFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class HBaseQueryDao {

public void get(String tableName, String rowKey) throws IOException {
try (Connection connection = HBaseConnectionFactory.getConnection();
Table table = connection.getTable(TableName.valueOf(tableName))) {

Get get = new Get(Bytes.toBytes(rowKey));
Result result = table.get(get);

if (result.isEmpty()) {
System.out.println(“No data found for rowKey: ” + rowKey);
return;
}

System.out.println(“Row: ” + rowKey);
for (Cell cell : result.rawCells()) {
String family = Bytes.toString(CellUtil.cloneFamily(cell));
String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));
long timestamp = cell.getTimestamp();

System.out.println(” ” + family + “:” + qualifier + ” = ” + value +
” (timestamp: ” + timestamp + “)”);
}
}
}

public Map getAsMap(String tableName, String rowKey) throws IOException {
Map result = new HashMap<>();

try (Connection connection = HBaseConnectionFactory.getConnection();
Table table = connection.getTable(TableName.valueOf(tableName))) {

Get get = new Get(Bytes.toBytes(rowKey));
Result rs = table.get(get);

for (Cell cell : rs.rawCells()) {
String family = Bytes.toString(CellUtil.cloneFamily(cell));
String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));

result.put(family + “:” + qualifier, value);
}
}

return result;
}

public void scan(String tableName, String startRow, String stopRow,
int limit) throws IOException {
try (Connection connection = HBaseConnectionFactory.getConnection();
Table table = connection.getTable(TableName.valueOf(tableName))) {

Scan scan = new Scan();
scan.withStartRow(Bytes.toBytes(startRow));
scan.withStopRow(Bytes.toBytes(stopRow));
scan.setLimit(limit);

try (ResultScanner scanner = table.getScanner(scan)) {
int count = 0;
for (Result result : scanner) {
String rowKey = Bytes.toString(result.getRow());
System.out.println(“Row: ” + rowKey);

for (Cell cell : result.rawCells()) {
String family = Bytes.toString(CellUtil.cloneFamily(cell));
String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));

System.out.println(” ” + family + “:” + qualifier + ” = ” + value);
}

count++;
if (count >= limit) {
break;
}
}

System.out.println(“Total rows scanned: ” + count);
}
}
}

public void scanWithFilter(String tableName, String prefix, int limit)
throws IOException {
try (Connection connection = HBaseConnectionFactory.getConnection();
Table table = connection.getTable(TableName.valueOf(tableName))) {

Scan scan = new Scan();
scan.setRowPrefixFilter(Bytes.toBytes(prefix));
scan.setLimit(limit);

try (ResultScanner scanner = table.getScanner(scan)) {
int count = 0;
for (Result result : scanner) {
String rowKey = Bytes.toString(result.getRow());
System.out.println(“Row: ” + rowKey);

for (Cell cell : result.rawCells()) {
String family = Bytes.toString(CellUtil.cloneFamily(cell));
String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));

System.out.println(” ” + family + “:” + qualifier + ” = ” + value);
}

count++;
}

System.out.println(“Total rows matched: ” + count);
}
}
}
}

风哥提示:HBase Java API开发需要注意资源管理,使用try-with-resources确保Connection、Table等资源正确关闭。更多学习教程公众号风哥教程itpux_com

Part04-生产案例与实战讲解

4.1 用户服务案例

# UserService.java
package com.fgedu.hbase.service;

import com.fgedu.hbase.dao.HBaseDao;
import com.fgedu.hbase.dao.HBaseQueryDao;
import org.apache.hadoop.hbase.client.Put;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.List;
import java.util.Map;

public class UserService {

private static final Logger LOG = LoggerFactory.getLogger(UserService.class);
private static final String TABLE_NAME = “fgedu_user_profile”;

private HBaseDao hbaseDao;
private HBaseQueryDao queryDao;

public UserService() {
this.hbaseDao = new HBaseDao();
this.queryDao = new HBaseQueryDao();
}

public void addUser(String userId, String name, int age, String city) {
try {
hbaseDao.putUser(TABLE_NAME, userId, name, age, city);
LOG.info(“User added successfully: {}”, userId);
} catch (IOException e) {
LOG.error(“Failed to add user: {}”, userId, e);
throw new RuntimeException(“Failed to add user”, e);
}
}

public Map getUser(String userId) {
try {
return queryDao.getAsMap(TABLE_NAME, userId);
} catch (IOException e) {
LOG.error(“Failed to get user: {}”, userId, e);
throw new RuntimeException(“Failed to get user”, e);
}
}

public void batchAddUsers(int count) {
try {
List puts = hbaseDao.createPutList(count);
hbaseDao.batchPut(TABLE_NAME, puts);
LOG.info(“Batch add users successfully: {} users”, count);
} catch (IOException e) {
LOG.error(“Failed to batch add users”, e);
throw new RuntimeException(“Failed to batch add users”, e);
}
}

public void listUsers(String prefix, int limit) {
try {
queryDao.scanWithFilter(TABLE_NAME, prefix, limit);
} catch (IOException e) {
LOG.error(“Failed to list users”, e);
throw new RuntimeException(“Failed to list users”, e);
}
}
}

# UserServiceTest.java
package com.fgedu.hbase.test;

import com.fgedu.hbase.service.UserService;
import org.junit.Before;
import org.junit.Test;

import java.util.Map;

public class UserServiceTest {

private UserService userService;

@Before
public void setUp() {
userService = new UserService();
}

@Test
public void testAddUser() {
userService.addUser(“user_00000001”, “fgedu01”, 25, “北京”);
}

@Test
public void testGetUser() {
Map user = userService.getUser(“user_00000001”);
System.out.println(“User info: ” + user);
}

@Test
public void testBatchAddUsers() {
userService.batchAddUsers(100);
}

@Test
public void testListUsers() {
userService.listUsers(“user_”, 10);
}
}

# 执行结果
User added successfully: user_00000001
User info: {info:name=fgedu01, info:age=25, info:city=北京}
Batch add users successfully: 100 users
Total rows matched: 10

4.2 批量操作案例

# 批量操作实现
package com.fgedu.hbase.batch;

import com.fgedu.hbase.connection.HBaseConnectionFactory;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class BatchOperations {

private static final int BATCH_SIZE = 1000;

public void batchPut(String tableName, List puts) throws IOException {
try (Connection connection = HBaseConnectionFactory.getConnection();
Table table = connection.getTable(TableName.valueOf(tableName))) {

int total = puts.size();
int batches = (total + BATCH_SIZE – 1) / BATCH_SIZE;

for (int i = 0; i < batches; i++) { int start = i * BATCH_SIZE; int end = Math.min(start + BATCH_SIZE, total); List batch = puts.subList(start, end);

table.put(batch);
System.out.println(“Batch ” + (i + 1) + “/” + batches +
” completed: ” + batch.size() + ” rows”);
}

System.out.println(“Total rows inserted: ” + total);
}
}

public void batchGet(String tableName, List rowKeys) throws IOException {
try (Connection connection = HBaseConnectionFactory.getConnection();
Table table = connection.getTable(TableName.valueOf(tableName))) {

List gets = new ArrayList<>();
for (String rowKey : rowKeys) {
gets.add(new Get(Bytes.toBytes(rowKey)));
}

Result[] results = table.get(gets);

int count = 0;
for (Result result : results) {
if (!result.isEmpty()) {
count++;
String rowKey = Bytes.toString(result.getRow());
System.out.println(“Found: ” + rowKey);
}
}

System.out.println(“Total rows found: ” + count);
}
}

public void batchDelete(String tableName, List rowKeys) throws IOException {
try (Connection connection = HBaseConnectionFactory.getConnection();
Table table = connection.getTable(TableName.valueOf(tableName))) {

List deletes = new ArrayList<>();
for (String rowKey : rowKeys) {
deletes.add(new Delete(Bytes.toBytes(rowKey)));
}

table.delete(deletes);
System.out.println(“Total rows deleted: ” + deletes.size());
}
}
}

4.3 常见问题处理

4.3.1 连接超时问题

# 问题现象:连接超时

# 排查步骤
# 1. 检查网络连通性
$ ping 192.168.1.60

# 2. 检查端口
$ telnet 192.168.1.60 16000

# 解决方案
# 1. 增加超时时间
config.set(“hbase.rpc.timeout”, “120000”);
config.set(“hbase.client.operation.timeout”, “180000”);

# 2. 增加重试次数
config.set(“hbase.client.retries.number”, “5”);

# 3. 检查ZooKeeper连接
config.set(“hbase.zookeeper.recoverable.waittime”, “10000”);

4.3.2 性能问题

# 问题现象:查询性能慢

# 排查步骤
# 1. 检查Scan配置
# 2. 检查缓存设置
# 3. 检查过滤器使用

# 解决方案
# 1. 设置Scan缓存
Scan scan = new Scan();
scan.setCaching(100); // 每次RPC返回100行
scan.setCacheBlocks(true); // 启用BlockCache

# 2. 使用批量获取
scan.setBatch(10); // 每行返回10列

# 3. 使用过滤器
Filter filter = new PageFilter(100);
scan.setFilter(filter);

# 4. 指定列族和列
scan.addFamily(Bytes.toBytes(“info”));
scan.addColumn(Bytes.toBytes(“info”), Bytes.toBytes(“name”));

Part05-风哥经验总结与分享

5.1 API最佳实践

HBase Java API最佳实践建议:

# API最佳实践
1. 复用Connection对象
2. 使用try-with-resources
3. 合理设置Scan缓存
4. 使用批量操作
5. 正确处理异常

5.2 性能优化建议

性能优化建议:

HBase Java API性能优化建议:

  • 设置合理的Scan缓存大小
  • 使用批量操作减少RPC
  • 指定查询列减少数据传输
  • 使用异步API提高并发

5.3 工具推荐

开发工具:

  • IDEA:Java开发IDE
  • Maven:项目构建工具
  • JUnit:单元测试框架
  • HBase Testing Utility:测试工具
风哥提示:HBase Java API开发需要注意资源管理和性能优化。建议使用连接池管理Connection,使用批量操作提高性能。from bigdata视频:www.itpux.com

本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html

联系我们

在线咨询:点击这里给我发消息

微信号:itpux-com

工作日:9:30-18:30,节假日休息