1. 首页 > Hadoop教程 > 正文

大数据教程FG081-HBase与Spark集成实战

本文档风哥主要介绍HBase与Spark集成实战,包括集成概述、配置方法、数据读写、ETL处理等内容,风哥教程参考HBase官方文档Spark Integration、HBase-Spark Connector等内容,适合大数据开发运维人员在学习和测试中使用,如果要应用于生产环境则需要自行确认。更多视频教程www.fgedu.net.cn

Part01-基础概念与理论知识

1.1 集成概述

HBase与Spark集成可以实现高性能的数据处理,结合Spark的计算能力和HBase的存储能力。学习交流加群风哥微信: itpux-com

HBase与Spark集成优势:

  • 高性能计算:利用Spark内存计算
  • 批量处理:大规模数据处理
  • 实时分析:实时数据处理
  • 机器学习:MLlib与HBase结合
# HBase与Spark集成场景

1. 数据导入场景
– 批量导入数据到HBase
– 数据清洗和转换
– 增量数据同步

2. 数据分析场景
– 实时数据分析
– 复杂聚合计算
– 机器学习特征提取

3. 数据导出场景
– HBase数据导出
– 数据迁移
– 数据备份

4. 流处理场景
– Spark Streaming写入HBase
– 实时数据仓库

# HBase与Spark集成方式

1. HBase RDD
– 使用newAPIHadoopRDD
– 底层使用MapReduce API
– 适合批量处理

2. HBase Connector
– Spark HBase Connector
– 优化的读写性能
– 支持DataFrame API

3. Phoenix Spark
– 使用Phoenix JDBC
– SQL方式访问
– 简单易用

# 集成架构

┌─────────────┐
│ Spark │
│ (计算层) │
└──────┬──────┘

├── HBase RDD / Connector

┌──────┴──────┐
│ HBase │
│ (存储层) │
└─────────────┘

数据流:
Spark RDD -> HBase Connector -> HBase API -> HBase RegionServer

1.2 集成方式详解

集成方式详解:

# 方式一:HBase RDD

特点:
– 使用Hadoop InputFormat
– 支持Scan操作
– 可以设置过滤条件
– 返回RDD[(ImmutableBytesWritable, Result)]

示例:
val conf = HBaseConfiguration.create()
conf.set(“hbase.zookeeper.quorum”, “fgedu-node1,fgedu-node2,fgedu-node3”)
conf.set(TableInputFormat.INPUT_TABLE, “fgedu_user”)

val hbaseRDD = sc.newAPIHadoopRDD(
conf,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result]
)

# 方式二:HBase Connector

特点:
– 优化的性能
– 支持DataFrame API
– 支持批量操作
– 支持Predicate下推

示例:
import org.apache.hadoop.hbase.spark._

val df = spark.read
.option(“hbase.columns.mapping”,
“key STRING :key, name STRING info:name, age INT info:age”)
.option(“hbase.table”, “fgedu_user”)
.format(“org.apache.hadoop.hbase.spark”)
.load()

# 方式三:Phoenix Spark

特点:
– SQL接口
– 简单易用
– 支持DataFrame
– 性能较好

示例:
val df = spark.read
.format(“jdbc”)
.option(“url”, “jdbc:phoenix:fgedu-node1:2181”)
.option(“dbtable”, “FGEDU_USER”)
.load()

# 方式对比

方式 性能 易用性 功能
HBase RDD 中等 中等 基础
HBase Connector 高 中等 丰富
Phoenix Spark 高 简单 SQL

# 选择建议

1. 简单查询:Phoenix Spark
2. 复杂处理:HBase Connector
3. 兼容性要求:HBase RDD

1.3 架构设计

架构设计详解:

# HBase-Spark集成架构

1. 数据读取架构
Spark Driver
├── SparkContext
├── HBaseConfiguration
└── RDD Partition
├── Region 1 -> Executor 1
├── Region 2 -> Executor 2
└── Region 3 -> Executor 3

2. 数据写入架构
Spark Executor
├── Partition 1 -> HBase Connection 1
├── Partition 2 -> HBase Connection 2
└── Partition 3 -> HBase Connection 3
└── HBase RegionServer

3. 批量写入架构
Spark Executor
├── Partition
│ └── HFile生成
└── Bulk Load
└── HBase RegionServer

# 数据分区策略

1. 默认分区
– 按Region分区
– 每个Region一个分区

2. 自定义分区
– 按RowKey范围分区
– 控制并行度

3. 预分区优化
– 预先创建Region
– 避免热点

# 性能优化点

1. 读取优化
– 使用Scan缓存
– 设置过滤条件
– 指定列族和列
– 并行读取

2. 写入优化
– 批量写入
– 使用Bulk Load
– 预分区
– 调整MemStore

3. 资源优化
– Executor数量
– 内存配置
– 并行度设置

# 数据类型映射

Spark类型 HBase类型
StringType String
IntegerType Integer
LongType Long
DoubleType Double
BooleanType Boolean
BinaryType Bytes
TimestampType Long

风哥提示:HBase与Spark集成可以实现高性能数据处理。建议根据场景选择合适的集成方式,简单查询使用Phoenix Spark,复杂处理使用HBase Connector。

Part02-生产环境规划与建议

2.1 环境规划建议

环境规划建议:

# 版本兼容性

Spark版本 HBase版本 Hadoop版本
3.5.x 2.5.x 3.3.x
3.4.x 2.4.x 3.2.x

# 推荐版本组合
Spark 3.5.0 + HBase 2.5.5 + Hadoop 3.3.6

# 环境配置

1. Spark配置
spark-defaults.conf:
spark.jars /bigdata/app/hbase/lib/hbase-client.jar,/bigdata/app/hbase/lib/hbase-common.jar,/bigdata/app/hbase/lib/hbase-server.jar
spark.executor.extraClassPath /bigdata/app/hbase/lib/*
spark.driver.extraClassPath /bigdata/app/hbase/lib/*

2. HBase配置
hbase-site.xml: hbase.zookeeper.quorum
fgedu-node1,fgedu-node2,fgedu-node3

# Maven依赖



org.apache.spark
spark-core_2.12
3.5.0


org.apache.spark
spark-sql_2.12
3.5.0


org.apache.hbase
hbase-client
2.5.5


org.apache.hbase
hbase-common
2.5.5


org.apache.hbase
hbase-server
2.5.5


org.apache.hbase
hbase-spark
2.5.5

# 启动Spark Shell

$ spark-shell –master yarn \
–jars /bigdata/app/hbase/lib/hbase-client-2.5.5.jar,/bigdata/app/hbase/lib/hbase-common-2.5.5.jar,/bigdata/app/hbase/lib/hbase-server-2.5.5.jar \
–driver-class-path /bigdata/app/hbase/conf

2.2 数据处理规划

数据处理规划建议:

# 数据读取规划

1. Scan配置
– 设置缓存大小
– 设置批量大小
– 设置过滤条件
– 指定列族和列

2. 分区策略
– 按Region分区
– 自定义分区数
– 控制并行度

3. 资源配置
– Executor数量
– Executor内存
– Executor核心数

# 数据写入规划

1. 写入方式
– 单条Put
– 批量Put
– Bulk Load

2. 性能优化
– 批量大小
– 预分区
– 异步写入

3. 资源配置
– 并行度
– 内存配置
– 连接池

# 数据处理流程

1. 数据导入流程
源数据 -> Spark读取 -> 数据清洗 -> 数据转换 -> HBase写入

2. 数据分析流程
HBase读取 -> Spark处理 -> 结果输出

3. ETL流程
源系统 -> Spark ETL -> HBase存储 -> 下游系统

# 示例配置

# 读取配置
val scan = new Scan()
scan.setCaching(1000)
scan.setBatch(100)
scan.setFilter(new PageFilter(10000))

# 写入配置
val conf = HBaseConfiguration.create()
conf.set(“hbase.client.write.buffer”, “2097152”) // 2MB
conf.set(“hbase.client.retries.number”, “3”)

2.3 性能规划建议

性能规划建议:

# 性能优化策略

1. 读取优化
– 使用Scan缓存
– 设置过滤器
– 指定列
– 并行读取

2. 写入优化
– 批量写入
– Bulk Load
– 预分区
– 异步写入

3. 资源优化
– 合理配置Executor
– 调整并行度
– 内存优化

# 读取性能优化

1. Scan缓存
scan.setCaching(1000) // 每次RPC返回1000行

2. 批量大小
scan.setBatch(100) // 每行返回100列

3. 过滤器
scan.setFilter(new PrefixFilter(Bytes.toBytes(“user_”)))

4. 指定列
scan.addColumn(Bytes.toBytes(“info”), Bytes.toBytes(“name”))

# 写入性能优化

1. 批量写入
val puts = new java.util.ArrayList[Put]()
for (i <- 1 to 1000) { val put = new Put(Bytes.toBytes(s"user_$i")) put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(s"fgedu$i")) puts.add(put) } table.put(puts) 2. Bulk Load // 生成HFile val hfiles = rdd.map(row => {
val put = new Put(Bytes.toBytes(row.getAs[String](“key”)))
put.addColumn(Bytes.toBytes(“info”), Bytes.toBytes(“value”),
Bytes.toBytes(row.getAs[String](“value”)))
(new ImmutableBytesWritable(put.getRow), put)
})

// 保存HFile
hfiles.saveAsNewAPIHadoopFile(
“/tmp/hfiles”,
classOf[ImmutableBytesWritable],
classOf[Put],
classOf[HFileOutputFormat2],
conf
)

// 加载HFile
LoadIncrementalHFiles.doBulkLoad(…)

# 资源配置优化

# Spark配置
spark.executor.instances=10
spark.executor.memory=8g
spark.executor.cores=4
spark.executor.memoryOverhead=2g

spark.sql.shuffle.partitions=200
spark.default.parallelism=200

# HBase配置
hbase.client.write.buffer=2097152
hbase.client.scanner.caching=1000

生产环境建议:生产环境建议使用HBase Connector进行集成,配置合理的资源参数。大批量写入使用Bulk Load,提高写入性能。学习交流加群风哥QQ113257174

Part03-生产环境项目实施方案

3.1 配置集成实战

3.1.1 环境准备

# 1. 复制HBase JAR包到Spark
$ cp /bigdata/app/hbase/lib/hbase-client-2.5.5.jar /bigdata/app/spark/jars/
$ cp /bigdata/app/hbase/lib/hbase-common-2.5.5.jar /bigdata/app/spark/jars/
$ cp /bigdata/app/hbase/lib/hbase-server-2.5.5.jar /bigdata/app/spark/jars/
$ cp /bigdata/app/hbase/lib/hbase-hadoop-compat-2.5.5.jar /bigdata/app/spark/jars/
$ cp /bigdata/app/hbase/lib/hbase-hadoop2-compat-2.5.5.jar /bigdata/app/spark/jars/
$ cp /bigdata/app/hbase/lib/hbase-protocol-2.5.5.jar /bigdata/app/spark/jars/
$ cp /bigdata/app/hbase/lib/hbase-spark-2.5.5.jar /bigdata/app/spark/jars/

# 2. 复制HBase配置文件
$ cp /bigdata/app/hbase/conf/hbase-site.xml /bigdata/app/spark/conf/

# 3. 配置spark-defaults.conf
$ cat >> /bigdata/app/spark/conf/spark-defaults.conf << 'EOF' spark.jars /bigdata/app/spark/jars/hbase-client-2.5.5.jar,/bigdata/app/spark/jars/hbase-common-2.5.5.jar,/bigdata/app/spark/jars/hbase-server-2.5.5.jar spark.executor.extraClassPath /bigdata/app/spark/jars/* spark.driver.extraClassPath /bigdata/app/spark/jars/* EOF # 4. 启动Spark Shell测试 $ spark-shell --master yarn \ --jars /bigdata/app/spark/jars/hbase-*.jar \ --driver-class-path /bigdata/app/hbase/conf Spark context available as 'sc' (master = yarn, app id = application_1680940800000_0001). Spark session available as 'spark'. scala> import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.HBaseConfiguration

scala> import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.util.Bytes

scala> val conf = HBaseConfiguration.create()
conf: org.apache.hadoop.conf.Configuration = Configuration: core-default.xml, core-site.xml, hbase-site.xml

scala> conf.get(“hbase.zookeeper.quorum”)
res0: String = fgedu-node1,fgedu-node2,fgedu-node3

3.1.2 创建测试表

# 创建HBase测试表
$ hbase shell

hbase(main):001:0> create ‘fgedu_spark_user’, ‘info’

Created table fgedu_spark_user

hbase(main):002:0> put ‘fgedu_spark_user’, ‘user_00000001’, ‘info:name’, ‘fgedu01’

Took 0.0123 seconds.

hbase(main):003:0> put ‘fgedu_spark_user’, ‘user_00000001’, ‘info:age’, ’25’

Took 0.0123 seconds.

hbase(main):004:0> put ‘fgedu_spark_user’, ‘user_00000001’, ‘info:city’, ‘北京’

Took 0.0123 seconds.

hbase(main):005:0> put ‘fgedu_spark_user’, ‘user_00000002’, ‘info:name’, ‘fgedu02’

Took 0.0123 seconds.

hbase(main):006:0> put ‘fgedu_spark_user’, ‘user_00000002’, ‘info:age’, ’30’

Took 0.0123 seconds.

hbase(main):007:0> put ‘fgedu_spark_user’, ‘user_00000002’, ‘info:city’, ‘上海’

Took 0.0123 seconds.

hbase(main):008:0> scan ‘fgedu_spark_user’

ROW COLUMN+CELL
user_00000001 column=info:age, timestamp=1680940800001, value=25
user_00000001 column=info:city, timestamp=1680940800002, value=北京
user_00000001 column=info:name, timestamp=1680940800000, value=fgedu01
user_00000002 column=info:age, timestamp=1680940800011, value=30
user_00000002 column=info:city, timestamp=1680940800012, value=上海
user_00000002 column=info:name, timestamp=1680940800010, value=fgedu02
2 row(s)

3.2 数据读取实战

3.2.1 使用HBase RDD读取

# 启动Spark Shell
$ spark-shell –master yarn –jars /bigdata/app/spark/jars/hbase-*.jar

# 读取HBase数据
scala> import org.apache.hadoop.hbase.HBaseConfiguration
scala> import org.apache.hadoop.hbase.client.{Result, Scan}
scala> import org.apache.hadoop.hbase.io.ImmutableBytesWritable
scala> import org.apache.hadoop.hbase.mapreduce.TableInputFormat
scala> import org.apache.hadoop.hbase.util.Bytes

scala> val conf = HBaseConfiguration.create()
conf: org.apache.hadoop.conf.Configuration = Configuration: …

scala> conf.set(TableInputFormat.INPUT_TABLE, “fgedu_spark_user”)

scala> conf.set(TableInputFormat.SCAN_COLUMNS, “info:name info:age info:city”)

scala> val hbaseRDD = sc.newAPIHadoopRDD(
| conf,
| classOf[TableInputFormat],
| classOf[ImmutableBytesWritable],
| classOf[Result]
| )
hbaseRDD: org.apache.spark.rdd.RDD[(org.apache.hadoop.hbase.io.ImmutableBytesWritable, org.apache.hadoop.hbase.client.Result)] = NewHadoopRDD[0] at newAPIHadoopRDD at :30

scala> hbaseRDD.count()
res1: Long = 2

scala> hbaseRDD.take(10).foreach { case (_, result) =>
| val key = Bytes.toString(result.getRow)
| val name = Bytes.toString(result.getValue(Bytes.toBytes(“info”), Bytes.toBytes(“name”)))
| val age = Bytes.toString(result.getValue(Bytes.toBytes(“info”), Bytes.toBytes(“age”)))
| val city = Bytes.toString(result.getValue(Bytes.toBytes(“info”), Bytes.toBytes(“city”)))
| println(s”key=$key, name=$name, age=$age, city=$city”)
| }
key=user_00000001, name=fgedu01, age=25, city=北京
key=user_00000002, name=fgedu02, age=30, city=上海

# 转换为DataFrame
scala> import spark.implicits._

scala> case class User(userId: String, name: String, age: Int, city: String)
defined class User

scala> val userDF = hbaseRDD.map { case (_, result) =>
| val userId = Bytes.toString(result.getRow)
| val name = Bytes.toString(result.getValue(Bytes.toBytes(“info”), Bytes.toBytes(“name”)))
| val age = Bytes.toInt(result.getValue(Bytes.toBytes(“info”), Bytes.toBytes(“age”)))
| val city = Bytes.toString(result.getValue(Bytes.toBytes(“info”), Bytes.toBytes(“city”)))
| User(userId, name, age, city)
| }.toDF()
userDF: org.apache.spark.sql.DataFrame = [userId: string, name: string … 2 more fields]

scala> userDF.show()
+————-+——-+—+—-+
| userId| name|age|city|
+————-+——-+—+—-+
|user_00000001|fgedu01| 25| 北京|
|user_00000002|fgedu02| 30| 上海|
+————-+——-+—+—-+

3.2.2 使用Scan过滤器

# 使用Scan过滤器读取
scala> import org.apache.hadoop.hbase.filter.{PrefixFilter, PageFilter}

scala> val scan = new Scan()
scan: org.apache.hadoop.hbase.client.Scan = {“limit”:-1,”timeRange”:[0,9223372036854775807],”loadColumnFamiliesOnDemand”:null,”startRow”:””,”stopRow”:””,”batch”:-1,”maxResultSize”:-1,”cacheBlocks”:true,”reversed”:false,”consistency”:”STRONG”,”targetReplicaId”:-1,”mvccReadPoint”:”-1″,”isolationLevel”:”READ_COMMITTED”}

scala> scan.setFilter(new PrefixFilter(Bytes.toBytes(“user_00000001”)))

scala> scan.setCaching(100)

scala> import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil

scala> conf.set(TableInputFormat.INPUT_TABLE, “fgedu_spark_user”)

scala> conf.set(TableInputFormat.SCAN, TableMapReduceUtil.convertScanToString(scan))

scala> val filteredRDD = sc.newAPIHadoopRDD(
| conf,
| classOf[TableInputFormat],
| classOf[ImmutableBytesWritable],
| classOf[Result]
| )
filteredRDD: org.apache.spark.rdd.RDD[(org.apache.hadoop.hbase.io.ImmutableBytesWritable, org.apache.hadoop.hbase.client.Result)] = NewHadoopRDD[1] at newAPIHadoopRDD at :35

scala> filteredRDD.count()
res2: Long = 1

scala> filteredRDD.take(10).foreach { case (_, result) =>
| val key = Bytes.toString(result.getRow)
| val name = Bytes.toString(result.getValue(Bytes.toBytes(“info”), Bytes.toBytes(“name”)))
| println(s”key=$key, name=$name”)
| }
key=user_00000001, name=fgedu01

3.3 数据写入实战

3.3.1 批量写入数据

# 批量写入数据到HBase
scala> import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put, Table}
scala> import org.apache.hadoop.hbase.TableName

scala> val conf = HBaseConfiguration.create()

scala> val connection = ConnectionFactory.createConnection(conf)
connection: org.apache.hadoop.hbase.client.Connection = …

scala> val table = connection.getTable(TableName.valueOf(“fgedu_spark_user”))
table: org.apache.hadoop.hbase.client.Table = fgedu_spark_user

# 创建测试数据
scala> val testData = (1 to 1000).map(i => (s”user_${“%08d”.format(i)}”, s”fgedu$i”, 20 + i % 40, “北京”))
testData: scala.collection.immutable.IndexedSeq[(String, String, Int, String)] = Vector((user_00000001,fgedu1,21,北京), (user_00000002,fgedu2,22,北京), …

scala> val testRDD = sc.parallelize(testData, 10)
testRDD: org.apache.spark.rdd.RDD[(String, String, Int, String)] = ParallelCollectionRDD[2] at parallelize at :37

# 批量写入
scala> testRDD.foreachPartition { iter =>
| val conf = HBaseConfiguration.create()
| val connection = ConnectionFactory.createConnection(conf)
| val table = connection.getTable(TableName.valueOf(“fgedu_spark_user”))
|
| val puts = new java.util.ArrayList[Put]()
| iter.foreach { case (userId, name, age, city) =>
| val put = new Put(Bytes.toBytes(userId))
| put.addColumn(Bytes.toBytes(“info”), Bytes.toBytes(“name”), Bytes.toBytes(name))
| put.addColumn(Bytes.toBytes(“info”), Bytes.toBytes(“age”), Bytes.toBytes(age))
| put.addColumn(Bytes.toBytes(“info”), Bytes.toBytes(“city”), Bytes.toBytes(city))
| puts.add(put)
|
| if (puts.size() >= 100) {
| table.put(puts)
| puts.clear()
| }
| }
|
| if (puts.size() > 0) {
| table.put(puts)
| }
|
| table.close()
| connection.close()
| }

# 验证数据
$ hbase shell

hbase(main):009:0> count ‘fgedu_spark_user’

1002 row(s)

3.3.2 使用Bulk Load写入

# 使用Bulk Load批量写入
scala> import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles}
scala> import org.apache.hadoop.mapreduce.Job

# 准备数据
scala> val bulkData = (1 to 10000).map(i => (s”user_${“%08d”.format(i + 10000)}”, s”fgedu_bulk_$i”, 20 + i % 40, “广州”))
bulkData: scala.collection.immutable.IndexedSeq[(String, String, Int, String)] = Vector((user_00010001,fgedu_bulk_1,21,广州), …

scala> val bulkRDD = sc.parallelize(bulkData, 20)
bulkRDD: org.apache.spark.rdd.RDD[(String, String, Int, String)] = ParallelCollectionRDD[3] at parallelize at :37

# 转换为HFile格式
scala> val hfileRDD = bulkRDD.map { case (userId, name, age, city) =>
| val put = new Put(Bytes.toBytes(userId))
| put.addColumn(Bytes.toBytes(“info”), Bytes.toBytes(“name”), Bytes.toBytes(name))
| put.addColumn(Bytes.toBytes(“info”), Bytes.toBytes(“age”), Bytes.toBytes(age))
| put.addColumn(Bytes.toBytes(“info”), Bytes.toBytes(“city”), Bytes.toBytes(city))
| (new ImmutableBytesWritable(Bytes.toBytes(userId)), put)
| }
hfileRDD: org.apache.spark.rdd.RDD[(org.apache.hadoop.hbase.io.ImmutableBytesWritable, org.apache.hadoop.hbase.client.Put)] = MapPartitionsRDD[4] at map at :35

# 配置HFile输出
scala> val job = Job.getInstance(conf)
job: org.apache.hadoop.mapreduce.Job = job_local1234567890_0001

scala> val table = connection.getTable(TableName.valueOf(“fgedu_spark_user”))

scala> val regionLocator = connection.getRegionLocator(TableName.valueOf(“fgedu_spark_user”))

scala> HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator)

# 保存HFile
scala> hfileRDD.saveAsNewAPIHadoopFile(
| “/tmp/hfiles/fgedu_spark_user”,
| classOf[ImmutableBytesWritable],
| classOf[Put],
| classOf[HFileOutputFormat2],
| conf
| )

# 加载HFile到HBase
$ hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles \
/tmp/hfiles/fgedu_spark_user \
fgedu_spark_user

SLF4J: Class path contains multiple SLF4J bindings.

2026-04-08 15:00:00,123 INFO [main] mapreduce.LoadIncrementalHFiles: Loading HFile from /tmp/hfiles/fgedu_spark_user to fgedu_spark_user

2026-04-08 15:05:00,123 INFO [main] mapreduce.LoadIncrementalHFiles: Loaded 10000 rows

# 验证数据
$ hbase shell

hbase(main):010:0> count ‘fgedu_spark_user’

11002 row(s)

风哥提示:大批量数据写入建议使用Bulk Load方式,性能更好且不影响HBase在线服务。小批量数据可以使用Put方式写入。更多学习教程公众号风哥教程itpux_com

Part04-生产案例与实战讲解

4.1 ETL处理案例

# 场景:从CSV文件ETL到HBase

# 1. 创建Spark应用
$ cat > /tmp/HBaseETL.scala << 'EOF' // HBaseETL.scala // from:www.itpux.com.qq113257174.wx:itpux-com // web: http://www.fgedu.net.cn import org.apache.spark.sql.SparkSession import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put, Table} import org.apache.hadoop.hbase.TableName import org.apache.hadoop.hbase.util.Bytes object HBaseETL { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("HBaseETL") .getOrCreate() import spark.implicits._ // 读取CSV文件 val csvPath = args(0) val hbaseTable = args(1) val df = spark.read .option("header", "true") .option("inferSchema", "true") .csv(csvPath) // 数据清洗 val cleanedDF = df.filter($"age" > 0 && $”age” < 150) .filter($"name".isNotNull) // 写入HBase cleanedDF.foreachPartition { iter =>
val conf = HBaseConfiguration.create()
val connection = ConnectionFactory.createConnection(conf)
val table = connection.getTable(TableName.valueOf(hbaseTable))

val puts = new java.util.ArrayList[Put]()

iter.foreach { row =>
val userId = row.getAs[String](“user_id”)
val name = row.getAs[String](“name”)
val age = row.getAs[Int](“age”)
val city = row.getAs[String](“city”)

val put = new Put(Bytes.toBytes(userId))
put.addColumn(Bytes.toBytes(“info”), Bytes.toBytes(“name”), Bytes.toBytes(name))
put.addColumn(Bytes.toBytes(“info”), Bytes.toBytes(“age”), Bytes.toBytes(age))
put.addColumn(Bytes.toBytes(“info”), Bytes.toBytes(“city”), Bytes.toBytes(city))
puts.add(put)

if (puts.size() >= 1000) {
table.put(puts)
puts.clear()
}
}

if (puts.size() > 0) {
table.put(puts)
}

table.close()
connection.close()
}

spark.stop()
}
}
EOF

# 2. 编译打包
$ scalac -classpath “/bigdata/app/spark/jars/*:/bigdata/app/hbase/lib/*” /tmp/HBaseETL.scala
$ jar -cvf hbase-etl.jar HBaseETL*.class

# 3. 提交作业
$ spark-submit –master yarn \
–class HBaseETL \
–jars /bigdata/app/spark/jars/hbase-*.jar \
hbase-etl.jar \
/data/user.csv \
fgedu_user

# 4. 验证结果
$ hbase shell

hbase(main):011:0> count ‘fgedu_user’

100000 row(s)

4.2 数据分析案例

# 场景:分析HBase用户行为数据

scala> import org.apache.spark.sql.SparkSession
scala> import org.apache.hadoop.hbase.HBaseConfiguration
scala> import org.apache.hadoop.hbase.client.{Result, Scan}
scala> import org.apache.hadoop.hbase.io.ImmutableBytesWritable
scala> import org.apache.hadoop.hbase.mapreduce.TableInputFormat
scala> import org.apache.hadoop.hbase.util.Bytes

scala> val spark = SparkSession.builder().appName(“HBaseAnalysis”).getOrCreate()

scala> val conf = HBaseConfiguration.create()
scala> conf.set(TableInputFormat.INPUT_TABLE, “fgedu_behavior”)

scala> val behaviorRDD = sc.newAPIHadoopRDD(
| conf,
| classOf[TableInputFormat],
| classOf[ImmutableBytesWritable],
| classOf[Result]
| )

scala> case class Behavior(userId: String, action: String, itemId: String, actionTime: String)
defined class Behavior

scala> val behaviorDF = behaviorRDD.map { case (_, result) =>
| val userId = Bytes.toString(result.getValue(Bytes.toBytes(“info”), Bytes.toBytes(“user_id”)))
| val action = Bytes.toString(result.getValue(Bytes.toBytes(“info”), Bytes.toBytes(“action”)))
| val itemId = Bytes.toString(result.getValue(Bytes.toBytes(“info”), Bytes.toBytes(“item_id”)))
| val actionTime = Bytes.toString(result.getValue(Bytes.toBytes(“info”), Bytes.toBytes(“action_time”)))
| Behavior(userId, action, itemId, actionTime)
| }.toDF()

scala> behaviorDF.createOrReplaceTempView(“behavior”)

# 用户行为统计
scala> spark.sql(“””
| SELECT action, COUNT(*) as cnt
| FROM behavior
| GROUP BY action
| ORDER BY cnt DESC
| “””).show()

+——+—–+
|action| cnt|
+——+—–+
| view|50000|
| click|30000|
| buy|10000|
+——+—–+

# 用户活跃度分析
scala> spark.sql(“””
| SELECT user_id, COUNT(*) as action_count
| FROM behavior
| GROUP BY user_id
| ORDER BY action_count DESC
| LIMIT 10
| “””).show()

+————-+————+
| user_id|action_count|
+————-+————+
|user_00000001| 156|
|user_00000002| 142|
|user_00000003| 138|
+————-+————+

# 时间段分析
scala> spark.sql(“””
| SELECT
| SUBSTR(action_time, 1, 10) as dt,
| COUNT(*) as cnt
| FROM behavior
| GROUP BY SUBSTR(action_time, 1, 10)
| ORDER BY dt
| “””).show()

+———-+—–+
| dt| cnt|
+———-+—–+
|2026-04-01|15000|
|2026-04-02|16000|
|2026-04-03|14500|
+———-+—–+

4.3 常见问题处理

4.3.1 连接超时

# 问题现象:连接HBase超时

scala> val hbaseRDD = sc.newAPIHadoopRDD(…)
java.net.ConnectException: Connection timed out

# 解决方案
# 1. 检查网络连通性
$ ping fgedu-node1

# 2. 检查HBase服务
$ hbase shell
hbase(main):012:0> status

# 3. 增加超时配置
scala> conf.set(“hbase.rpc.timeout”, “120000”)
scala> conf.set(“hbase.client.operation.timeout”, “180000”)

# 4. 检查ZooKeeper连接
scala> conf.set(“hbase.zookeeper.recoverable.waittime”, “10000”)

4.3.2 内存不足

# 问题现象:Executor内存不足

java.lang.OutOfMemoryError: Java heap space

# 解决方案
# 1. 增加Executor内存
$ spark-submit –master yarn \
–executor-memory 8g \
–executor-cores 4 \

# 2. 增加堆外内存
$ spark-submit –master yarn \
–conf spark.executor.memoryOverhead=2g \

# 3. 减少分区数据量
scala> val hbaseRDD = sc.newAPIHadoopRDD(…)
scala> val repartitionedRDD = hbaseRDD.repartition(100)

# 4. 使用Scan缓存
scala> scan.setCaching(1000)

Part05-风哥经验总结与分享

5.1 集成最佳实践

HBase与Spark集成最佳实践建议:

# 集成最佳实践
1. 大批量写入使用Bulk Load
2. 读取时使用Scan缓存和过滤器
3. 合理配置资源参数
4. 处理连接池管理
5. 监控作业执行

5.2 使用建议

使用建议:

HBase-Spark集成使用建议:

  • 避免全表扫描
  • 使用批量操作
  • 合理设置并行度
  • 监控资源使用

5.3 工具推荐

集成工具:

  • HBase RDD:基础集成方式
  • HBase Connector:优化集成方式
  • Phoenix Spark:SQL集成方式
  • Bulk Load:批量写入工具
风哥提示:HBase与Spark集成可以实现高性能数据处理。建议根据数据量选择合适的写入方式,大批量使用Bulk Load,小批量使用Put。from bigdata视频:www.itpux.com

本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html

联系我们

在线咨询:点击这里给我发消息

微信号:itpux-com

工作日:9:30-18:30,节假日休息