本文档风哥主要介绍HBase与Spark集成实战,包括集成概述、配置方法、数据读写、ETL处理等内容,风哥教程参考HBase官方文档Spark Integration、HBase-Spark Connector等内容,适合大数据开发运维人员在学习和测试中使用,如果要应用于生产环境则需要自行确认。更多视频教程www.fgedu.net.cn
Part01-基础概念与理论知识
1.1 集成概述
HBase与Spark集成可以实现高性能的数据处理,结合Spark的计算能力和HBase的存储能力。学习交流加群风哥微信: itpux-com
- 高性能计算:利用Spark内存计算
- 批量处理:大规模数据处理
- 实时分析:实时数据处理
- 机器学习:MLlib与HBase结合
1. 数据导入场景
– 批量导入数据到HBase
– 数据清洗和转换
– 增量数据同步
2. 数据分析场景
– 实时数据分析
– 复杂聚合计算
– 机器学习特征提取
3. 数据导出场景
– HBase数据导出
– 数据迁移
– 数据备份
4. 流处理场景
– Spark Streaming写入HBase
– 实时数据仓库
# HBase与Spark集成方式
1. HBase RDD
– 使用newAPIHadoopRDD
– 底层使用MapReduce API
– 适合批量处理
2. HBase Connector
– Spark HBase Connector
– 优化的读写性能
– 支持DataFrame API
3. Phoenix Spark
– 使用Phoenix JDBC
– SQL方式访问
– 简单易用
# 集成架构
┌─────────────┐
│ Spark │
│ (计算层) │
└──────┬──────┘
│
├── HBase RDD / Connector
│
┌──────┴──────┐
│ HBase │
│ (存储层) │
└─────────────┘
数据流:
Spark RDD -> HBase Connector -> HBase API -> HBase RegionServer
1.2 集成方式详解
集成方式详解:
特点:
– 使用Hadoop InputFormat
– 支持Scan操作
– 可以设置过滤条件
– 返回RDD[(ImmutableBytesWritable, Result)]
示例:
val conf = HBaseConfiguration.create()
conf.set(“hbase.zookeeper.quorum”, “fgedu-node1,fgedu-node2,fgedu-node3”)
conf.set(TableInputFormat.INPUT_TABLE, “fgedu_user”)
val hbaseRDD = sc.newAPIHadoopRDD(
conf,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result]
)
# 方式二:HBase Connector
特点:
– 优化的性能
– 支持DataFrame API
– 支持批量操作
– 支持Predicate下推
示例:
import org.apache.hadoop.hbase.spark._
val df = spark.read
.option(“hbase.columns.mapping”,
“key STRING :key, name STRING info:name, age INT info:age”)
.option(“hbase.table”, “fgedu_user”)
.format(“org.apache.hadoop.hbase.spark”)
.load()
# 方式三:Phoenix Spark
特点:
– SQL接口
– 简单易用
– 支持DataFrame
– 性能较好
示例:
val df = spark.read
.format(“jdbc”)
.option(“url”, “jdbc:phoenix:fgedu-node1:2181”)
.option(“dbtable”, “FGEDU_USER”)
.load()
# 方式对比
方式 性能 易用性 功能
HBase RDD 中等 中等 基础
HBase Connector 高 中等 丰富
Phoenix Spark 高 简单 SQL
# 选择建议
1. 简单查询:Phoenix Spark
2. 复杂处理:HBase Connector
3. 兼容性要求:HBase RDD
1.3 架构设计
架构设计详解:
1. 数据读取架构
Spark Driver
├── SparkContext
├── HBaseConfiguration
└── RDD Partition
├── Region 1 -> Executor 1
├── Region 2 -> Executor 2
└── Region 3 -> Executor 3
2. 数据写入架构
Spark Executor
├── Partition 1 -> HBase Connection 1
├── Partition 2 -> HBase Connection 2
└── Partition 3 -> HBase Connection 3
└── HBase RegionServer
3. 批量写入架构
Spark Executor
├── Partition
│ └── HFile生成
└── Bulk Load
└── HBase RegionServer
# 数据分区策略
1. 默认分区
– 按Region分区
– 每个Region一个分区
2. 自定义分区
– 按RowKey范围分区
– 控制并行度
3. 预分区优化
– 预先创建Region
– 避免热点
# 性能优化点
1. 读取优化
– 使用Scan缓存
– 设置过滤条件
– 指定列族和列
– 并行读取
2. 写入优化
– 批量写入
– 使用Bulk Load
– 预分区
– 调整MemStore
3. 资源优化
– Executor数量
– 内存配置
– 并行度设置
# 数据类型映射
Spark类型 HBase类型
StringType String
IntegerType Integer
LongType Long
DoubleType Double
BooleanType Boolean
BinaryType Bytes
TimestampType Long
Part02-生产环境规划与建议
2.1 环境规划建议
环境规划建议:
Spark版本 HBase版本 Hadoop版本
3.5.x 2.5.x 3.3.x
3.4.x 2.4.x 3.2.x
# 推荐版本组合
Spark 3.5.0 + HBase 2.5.5 + Hadoop 3.3.6
# 环境配置
1. Spark配置
spark-defaults.conf:
spark.jars /bigdata/app/hbase/lib/hbase-client.jar,/bigdata/app/hbase/lib/hbase-common.jar,/bigdata/app/hbase/lib/hbase-server.jar
spark.executor.extraClassPath /bigdata/app/hbase/lib/*
spark.driver.extraClassPath /bigdata/app/hbase/lib/*
2. HBase配置
hbase-site.xml:
# Maven依赖
# 启动Spark Shell
$ spark-shell –master yarn \
–jars /bigdata/app/hbase/lib/hbase-client-2.5.5.jar,/bigdata/app/hbase/lib/hbase-common-2.5.5.jar,/bigdata/app/hbase/lib/hbase-server-2.5.5.jar \
–driver-class-path /bigdata/app/hbase/conf
2.2 数据处理规划
数据处理规划建议:
1. Scan配置
– 设置缓存大小
– 设置批量大小
– 设置过滤条件
– 指定列族和列
2. 分区策略
– 按Region分区
– 自定义分区数
– 控制并行度
3. 资源配置
– Executor数量
– Executor内存
– Executor核心数
# 数据写入规划
1. 写入方式
– 单条Put
– 批量Put
– Bulk Load
2. 性能优化
– 批量大小
– 预分区
– 异步写入
3. 资源配置
– 并行度
– 内存配置
– 连接池
# 数据处理流程
1. 数据导入流程
源数据 -> Spark读取 -> 数据清洗 -> 数据转换 -> HBase写入
2. 数据分析流程
HBase读取 -> Spark处理 -> 结果输出
3. ETL流程
源系统 -> Spark ETL -> HBase存储 -> 下游系统
# 示例配置
# 读取配置
val scan = new Scan()
scan.setCaching(1000)
scan.setBatch(100)
scan.setFilter(new PageFilter(10000))
# 写入配置
val conf = HBaseConfiguration.create()
conf.set(“hbase.client.write.buffer”, “2097152”) // 2MB
conf.set(“hbase.client.retries.number”, “3”)
2.3 性能规划建议
性能规划建议:
1. 读取优化
– 使用Scan缓存
– 设置过滤器
– 指定列
– 并行读取
2. 写入优化
– 批量写入
– Bulk Load
– 预分区
– 异步写入
3. 资源优化
– 合理配置Executor
– 调整并行度
– 内存优化
# 读取性能优化
1. Scan缓存
scan.setCaching(1000) // 每次RPC返回1000行
2. 批量大小
scan.setBatch(100) // 每行返回100列
3. 过滤器
scan.setFilter(new PrefixFilter(Bytes.toBytes(“user_”)))
4. 指定列
scan.addColumn(Bytes.toBytes(“info”), Bytes.toBytes(“name”))
# 写入性能优化
1. 批量写入
val puts = new java.util.ArrayList[Put]()
for (i <- 1 to 1000) {
val put = new Put(Bytes.toBytes(s"user_$i"))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"),
Bytes.toBytes(s"fgedu$i"))
puts.add(put)
}
table.put(puts)
2. Bulk Load
// 生成HFile
val hfiles = rdd.map(row => {
val put = new Put(Bytes.toBytes(row.getAs[String](“key”)))
put.addColumn(Bytes.toBytes(“info”), Bytes.toBytes(“value”),
Bytes.toBytes(row.getAs[String](“value”)))
(new ImmutableBytesWritable(put.getRow), put)
})
// 保存HFile
hfiles.saveAsNewAPIHadoopFile(
“/tmp/hfiles”,
classOf[ImmutableBytesWritable],
classOf[Put],
classOf[HFileOutputFormat2],
conf
)
// 加载HFile
LoadIncrementalHFiles.doBulkLoad(…)
# 资源配置优化
# Spark配置
spark.executor.instances=10
spark.executor.memory=8g
spark.executor.cores=4
spark.executor.memoryOverhead=2g
spark.sql.shuffle.partitions=200
spark.default.parallelism=200
# HBase配置
hbase.client.write.buffer=2097152
hbase.client.scanner.caching=1000
Part03-生产环境项目实施方案
3.1 配置集成实战
3.1.1 环境准备
$ cp /bigdata/app/hbase/lib/hbase-client-2.5.5.jar /bigdata/app/spark/jars/
$ cp /bigdata/app/hbase/lib/hbase-common-2.5.5.jar /bigdata/app/spark/jars/
$ cp /bigdata/app/hbase/lib/hbase-server-2.5.5.jar /bigdata/app/spark/jars/
$ cp /bigdata/app/hbase/lib/hbase-hadoop-compat-2.5.5.jar /bigdata/app/spark/jars/
$ cp /bigdata/app/hbase/lib/hbase-hadoop2-compat-2.5.5.jar /bigdata/app/spark/jars/
$ cp /bigdata/app/hbase/lib/hbase-protocol-2.5.5.jar /bigdata/app/spark/jars/
$ cp /bigdata/app/hbase/lib/hbase-spark-2.5.5.jar /bigdata/app/spark/jars/
# 2. 复制HBase配置文件
$ cp /bigdata/app/hbase/conf/hbase-site.xml /bigdata/app/spark/conf/
# 3. 配置spark-defaults.conf
$ cat >> /bigdata/app/spark/conf/spark-defaults.conf << 'EOF'
spark.jars /bigdata/app/spark/jars/hbase-client-2.5.5.jar,/bigdata/app/spark/jars/hbase-common-2.5.5.jar,/bigdata/app/spark/jars/hbase-server-2.5.5.jar
spark.executor.extraClassPath /bigdata/app/spark/jars/*
spark.driver.extraClassPath /bigdata/app/spark/jars/*
EOF
# 4. 启动Spark Shell测试
$ spark-shell --master yarn \
--jars /bigdata/app/spark/jars/hbase-*.jar \
--driver-class-path /bigdata/app/hbase/conf
Spark context available as 'sc' (master = yarn, app id = application_1680940800000_0001).
Spark session available as 'spark'.
scala> import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.HBaseConfiguration
scala> import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.util.Bytes
scala> val conf = HBaseConfiguration.create()
conf: org.apache.hadoop.conf.Configuration = Configuration: core-default.xml, core-site.xml, hbase-site.xml
scala> conf.get(“hbase.zookeeper.quorum”)
res0: String = fgedu-node1,fgedu-node2,fgedu-node3
3.1.2 创建测试表
$ hbase shell
hbase(main):001:0> create ‘fgedu_spark_user’, ‘info’
Created table fgedu_spark_user
hbase(main):002:0> put ‘fgedu_spark_user’, ‘user_00000001’, ‘info:name’, ‘fgedu01’
Took 0.0123 seconds.
hbase(main):003:0> put ‘fgedu_spark_user’, ‘user_00000001’, ‘info:age’, ’25’
Took 0.0123 seconds.
hbase(main):004:0> put ‘fgedu_spark_user’, ‘user_00000001’, ‘info:city’, ‘北京’
Took 0.0123 seconds.
hbase(main):005:0> put ‘fgedu_spark_user’, ‘user_00000002’, ‘info:name’, ‘fgedu02’
Took 0.0123 seconds.
hbase(main):006:0> put ‘fgedu_spark_user’, ‘user_00000002’, ‘info:age’, ’30’
Took 0.0123 seconds.
hbase(main):007:0> put ‘fgedu_spark_user’, ‘user_00000002’, ‘info:city’, ‘上海’
Took 0.0123 seconds.
hbase(main):008:0> scan ‘fgedu_spark_user’
ROW COLUMN+CELL
user_00000001 column=info:age, timestamp=1680940800001, value=25
user_00000001 column=info:city, timestamp=1680940800002, value=北京
user_00000001 column=info:name, timestamp=1680940800000, value=fgedu01
user_00000002 column=info:age, timestamp=1680940800011, value=30
user_00000002 column=info:city, timestamp=1680940800012, value=上海
user_00000002 column=info:name, timestamp=1680940800010, value=fgedu02
2 row(s)
3.2 数据读取实战
3.2.1 使用HBase RDD读取
$ spark-shell –master yarn –jars /bigdata/app/spark/jars/hbase-*.jar
# 读取HBase数据
scala> import org.apache.hadoop.hbase.HBaseConfiguration
scala> import org.apache.hadoop.hbase.client.{Result, Scan}
scala> import org.apache.hadoop.hbase.io.ImmutableBytesWritable
scala> import org.apache.hadoop.hbase.mapreduce.TableInputFormat
scala> import org.apache.hadoop.hbase.util.Bytes
scala> val conf = HBaseConfiguration.create()
conf: org.apache.hadoop.conf.Configuration = Configuration: …
scala> conf.set(TableInputFormat.INPUT_TABLE, “fgedu_spark_user”)
scala> conf.set(TableInputFormat.SCAN_COLUMNS, “info:name info:age info:city”)
scala> val hbaseRDD = sc.newAPIHadoopRDD(
| conf,
| classOf[TableInputFormat],
| classOf[ImmutableBytesWritable],
| classOf[Result]
| )
hbaseRDD: org.apache.spark.rdd.RDD[(org.apache.hadoop.hbase.io.ImmutableBytesWritable, org.apache.hadoop.hbase.client.Result)] = NewHadoopRDD[0] at newAPIHadoopRDD at
scala> hbaseRDD.count()
res1: Long = 2
scala> hbaseRDD.take(10).foreach { case (_, result) =>
| val key = Bytes.toString(result.getRow)
| val name = Bytes.toString(result.getValue(Bytes.toBytes(“info”), Bytes.toBytes(“name”)))
| val age = Bytes.toString(result.getValue(Bytes.toBytes(“info”), Bytes.toBytes(“age”)))
| val city = Bytes.toString(result.getValue(Bytes.toBytes(“info”), Bytes.toBytes(“city”)))
| println(s”key=$key, name=$name, age=$age, city=$city”)
| }
key=user_00000001, name=fgedu01, age=25, city=北京
key=user_00000002, name=fgedu02, age=30, city=上海
# 转换为DataFrame
scala> import spark.implicits._
scala> case class User(userId: String, name: String, age: Int, city: String)
defined class User
scala> val userDF = hbaseRDD.map { case (_, result) =>
| val userId = Bytes.toString(result.getRow)
| val name = Bytes.toString(result.getValue(Bytes.toBytes(“info”), Bytes.toBytes(“name”)))
| val age = Bytes.toInt(result.getValue(Bytes.toBytes(“info”), Bytes.toBytes(“age”)))
| val city = Bytes.toString(result.getValue(Bytes.toBytes(“info”), Bytes.toBytes(“city”)))
| User(userId, name, age, city)
| }.toDF()
userDF: org.apache.spark.sql.DataFrame = [userId: string, name: string … 2 more fields]
scala> userDF.show()
+————-+——-+—+—-+
| userId| name|age|city|
+————-+——-+—+—-+
|user_00000001|fgedu01| 25| 北京|
|user_00000002|fgedu02| 30| 上海|
+————-+——-+—+—-+
3.2.2 使用Scan过滤器
scala> import org.apache.hadoop.hbase.filter.{PrefixFilter, PageFilter}
scala> val scan = new Scan()
scan: org.apache.hadoop.hbase.client.Scan = {“limit”:-1,”timeRange”:[0,9223372036854775807],”loadColumnFamiliesOnDemand”:null,”startRow”:””,”stopRow”:””,”batch”:-1,”maxResultSize”:-1,”cacheBlocks”:true,”reversed”:false,”consistency”:”STRONG”,”targetReplicaId”:-1,”mvccReadPoint”:”-1″,”isolationLevel”:”READ_COMMITTED”}
scala> scan.setFilter(new PrefixFilter(Bytes.toBytes(“user_00000001”)))
scala> scan.setCaching(100)
scala> import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil
scala> conf.set(TableInputFormat.INPUT_TABLE, “fgedu_spark_user”)
scala> conf.set(TableInputFormat.SCAN, TableMapReduceUtil.convertScanToString(scan))
scala> val filteredRDD = sc.newAPIHadoopRDD(
| conf,
| classOf[TableInputFormat],
| classOf[ImmutableBytesWritable],
| classOf[Result]
| )
filteredRDD: org.apache.spark.rdd.RDD[(org.apache.hadoop.hbase.io.ImmutableBytesWritable, org.apache.hadoop.hbase.client.Result)] = NewHadoopRDD[1] at newAPIHadoopRDD at
scala> filteredRDD.count()
res2: Long = 1
scala> filteredRDD.take(10).foreach { case (_, result) =>
| val key = Bytes.toString(result.getRow)
| val name = Bytes.toString(result.getValue(Bytes.toBytes(“info”), Bytes.toBytes(“name”)))
| println(s”key=$key, name=$name”)
| }
key=user_00000001, name=fgedu01
3.3 数据写入实战
3.3.1 批量写入数据
scala> import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put, Table}
scala> import org.apache.hadoop.hbase.TableName
scala> val conf = HBaseConfiguration.create()
scala> val connection = ConnectionFactory.createConnection(conf)
connection: org.apache.hadoop.hbase.client.Connection = …
scala> val table = connection.getTable(TableName.valueOf(“fgedu_spark_user”))
table: org.apache.hadoop.hbase.client.Table = fgedu_spark_user
# 创建测试数据
scala> val testData = (1 to 1000).map(i => (s”user_${“%08d”.format(i)}”, s”fgedu$i”, 20 + i % 40, “北京”))
testData: scala.collection.immutable.IndexedSeq[(String, String, Int, String)] = Vector((user_00000001,fgedu1,21,北京), (user_00000002,fgedu2,22,北京), …
scala> val testRDD = sc.parallelize(testData, 10)
testRDD: org.apache.spark.rdd.RDD[(String, String, Int, String)] = ParallelCollectionRDD[2] at parallelize at
# 批量写入
scala> testRDD.foreachPartition { iter =>
| val conf = HBaseConfiguration.create()
| val connection = ConnectionFactory.createConnection(conf)
| val table = connection.getTable(TableName.valueOf(“fgedu_spark_user”))
|
| val puts = new java.util.ArrayList[Put]()
| iter.foreach { case (userId, name, age, city) =>
| val put = new Put(Bytes.toBytes(userId))
| put.addColumn(Bytes.toBytes(“info”), Bytes.toBytes(“name”), Bytes.toBytes(name))
| put.addColumn(Bytes.toBytes(“info”), Bytes.toBytes(“age”), Bytes.toBytes(age))
| put.addColumn(Bytes.toBytes(“info”), Bytes.toBytes(“city”), Bytes.toBytes(city))
| puts.add(put)
|
| if (puts.size() >= 100) {
| table.put(puts)
| puts.clear()
| }
| }
|
| if (puts.size() > 0) {
| table.put(puts)
| }
|
| table.close()
| connection.close()
| }
# 验证数据
$ hbase shell
hbase(main):009:0> count ‘fgedu_spark_user’
1002 row(s)
3.3.2 使用Bulk Load写入
scala> import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles}
scala> import org.apache.hadoop.mapreduce.Job
# 准备数据
scala> val bulkData = (1 to 10000).map(i => (s”user_${“%08d”.format(i + 10000)}”, s”fgedu_bulk_$i”, 20 + i % 40, “广州”))
bulkData: scala.collection.immutable.IndexedSeq[(String, String, Int, String)] = Vector((user_00010001,fgedu_bulk_1,21,广州), …
scala> val bulkRDD = sc.parallelize(bulkData, 20)
bulkRDD: org.apache.spark.rdd.RDD[(String, String, Int, String)] = ParallelCollectionRDD[3] at parallelize at
# 转换为HFile格式
scala> val hfileRDD = bulkRDD.map { case (userId, name, age, city) =>
| val put = new Put(Bytes.toBytes(userId))
| put.addColumn(Bytes.toBytes(“info”), Bytes.toBytes(“name”), Bytes.toBytes(name))
| put.addColumn(Bytes.toBytes(“info”), Bytes.toBytes(“age”), Bytes.toBytes(age))
| put.addColumn(Bytes.toBytes(“info”), Bytes.toBytes(“city”), Bytes.toBytes(city))
| (new ImmutableBytesWritable(Bytes.toBytes(userId)), put)
| }
hfileRDD: org.apache.spark.rdd.RDD[(org.apache.hadoop.hbase.io.ImmutableBytesWritable, org.apache.hadoop.hbase.client.Put)] = MapPartitionsRDD[4] at map at
# 配置HFile输出
scala> val job = Job.getInstance(conf)
job: org.apache.hadoop.mapreduce.Job = job_local1234567890_0001
scala> val table = connection.getTable(TableName.valueOf(“fgedu_spark_user”))
scala> val regionLocator = connection.getRegionLocator(TableName.valueOf(“fgedu_spark_user”))
scala> HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator)
# 保存HFile
scala> hfileRDD.saveAsNewAPIHadoopFile(
| “/tmp/hfiles/fgedu_spark_user”,
| classOf[ImmutableBytesWritable],
| classOf[Put],
| classOf[HFileOutputFormat2],
| conf
| )
# 加载HFile到HBase
$ hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles \
/tmp/hfiles/fgedu_spark_user \
fgedu_spark_user
SLF4J: Class path contains multiple SLF4J bindings.
…
2026-04-08 15:00:00,123 INFO [main] mapreduce.LoadIncrementalHFiles: Loading HFile from /tmp/hfiles/fgedu_spark_user to fgedu_spark_user
…
2026-04-08 15:05:00,123 INFO [main] mapreduce.LoadIncrementalHFiles: Loaded 10000 rows
# 验证数据
$ hbase shell
hbase(main):010:0> count ‘fgedu_spark_user’
11002 row(s)
Part04-生产案例与实战讲解
4.1 ETL处理案例
# 1. 创建Spark应用
$ cat > /tmp/HBaseETL.scala << 'EOF'
// HBaseETL.scala
// from:www.itpux.com.qq113257174.wx:itpux-com
// web: http://www.fgedu.net.cn
import org.apache.spark.sql.SparkSession
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put, Table}
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.util.Bytes
object HBaseETL {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("HBaseETL")
.getOrCreate()
import spark.implicits._
// 读取CSV文件
val csvPath = args(0)
val hbaseTable = args(1)
val df = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv(csvPath)
// 数据清洗
val cleanedDF = df.filter($"age" > 0 && $”age” < 150)
.filter($"name".isNotNull)
// 写入HBase
cleanedDF.foreachPartition { iter =>
val conf = HBaseConfiguration.create()
val connection = ConnectionFactory.createConnection(conf)
val table = connection.getTable(TableName.valueOf(hbaseTable))
val puts = new java.util.ArrayList[Put]()
iter.foreach { row =>
val userId = row.getAs[String](“user_id”)
val name = row.getAs[String](“name”)
val age = row.getAs[Int](“age”)
val city = row.getAs[String](“city”)
val put = new Put(Bytes.toBytes(userId))
put.addColumn(Bytes.toBytes(“info”), Bytes.toBytes(“name”), Bytes.toBytes(name))
put.addColumn(Bytes.toBytes(“info”), Bytes.toBytes(“age”), Bytes.toBytes(age))
put.addColumn(Bytes.toBytes(“info”), Bytes.toBytes(“city”), Bytes.toBytes(city))
puts.add(put)
if (puts.size() >= 1000) {
table.put(puts)
puts.clear()
}
}
if (puts.size() > 0) {
table.put(puts)
}
table.close()
connection.close()
}
spark.stop()
}
}
EOF
# 2. 编译打包
$ scalac -classpath “/bigdata/app/spark/jars/*:/bigdata/app/hbase/lib/*” /tmp/HBaseETL.scala
$ jar -cvf hbase-etl.jar HBaseETL*.class
# 3. 提交作业
$ spark-submit –master yarn \
–class HBaseETL \
–jars /bigdata/app/spark/jars/hbase-*.jar \
hbase-etl.jar \
/data/user.csv \
fgedu_user
# 4. 验证结果
$ hbase shell
hbase(main):011:0> count ‘fgedu_user’
100000 row(s)
4.2 数据分析案例
scala> import org.apache.spark.sql.SparkSession
scala> import org.apache.hadoop.hbase.HBaseConfiguration
scala> import org.apache.hadoop.hbase.client.{Result, Scan}
scala> import org.apache.hadoop.hbase.io.ImmutableBytesWritable
scala> import org.apache.hadoop.hbase.mapreduce.TableInputFormat
scala> import org.apache.hadoop.hbase.util.Bytes
scala> val spark = SparkSession.builder().appName(“HBaseAnalysis”).getOrCreate()
scala> val conf = HBaseConfiguration.create()
scala> conf.set(TableInputFormat.INPUT_TABLE, “fgedu_behavior”)
scala> val behaviorRDD = sc.newAPIHadoopRDD(
| conf,
| classOf[TableInputFormat],
| classOf[ImmutableBytesWritable],
| classOf[Result]
| )
scala> case class Behavior(userId: String, action: String, itemId: String, actionTime: String)
defined class Behavior
scala> val behaviorDF = behaviorRDD.map { case (_, result) =>
| val userId = Bytes.toString(result.getValue(Bytes.toBytes(“info”), Bytes.toBytes(“user_id”)))
| val action = Bytes.toString(result.getValue(Bytes.toBytes(“info”), Bytes.toBytes(“action”)))
| val itemId = Bytes.toString(result.getValue(Bytes.toBytes(“info”), Bytes.toBytes(“item_id”)))
| val actionTime = Bytes.toString(result.getValue(Bytes.toBytes(“info”), Bytes.toBytes(“action_time”)))
| Behavior(userId, action, itemId, actionTime)
| }.toDF()
scala> behaviorDF.createOrReplaceTempView(“behavior”)
# 用户行为统计
scala> spark.sql(“””
| SELECT action, COUNT(*) as cnt
| FROM behavior
| GROUP BY action
| ORDER BY cnt DESC
| “””).show()
+——+—–+
|action| cnt|
+——+—–+
| view|50000|
| click|30000|
| buy|10000|
+——+—–+
# 用户活跃度分析
scala> spark.sql(“””
| SELECT user_id, COUNT(*) as action_count
| FROM behavior
| GROUP BY user_id
| ORDER BY action_count DESC
| LIMIT 10
| “””).show()
+————-+————+
| user_id|action_count|
+————-+————+
|user_00000001| 156|
|user_00000002| 142|
|user_00000003| 138|
+————-+————+
# 时间段分析
scala> spark.sql(“””
| SELECT
| SUBSTR(action_time, 1, 10) as dt,
| COUNT(*) as cnt
| FROM behavior
| GROUP BY SUBSTR(action_time, 1, 10)
| ORDER BY dt
| “””).show()
+———-+—–+
| dt| cnt|
+———-+—–+
|2026-04-01|15000|
|2026-04-02|16000|
|2026-04-03|14500|
+———-+—–+
4.3 常见问题处理
4.3.1 连接超时
scala> val hbaseRDD = sc.newAPIHadoopRDD(…)
java.net.ConnectException: Connection timed out
# 解决方案
# 1. 检查网络连通性
$ ping fgedu-node1
# 2. 检查HBase服务
$ hbase shell
hbase(main):012:0> status
# 3. 增加超时配置
scala> conf.set(“hbase.rpc.timeout”, “120000”)
scala> conf.set(“hbase.client.operation.timeout”, “180000”)
# 4. 检查ZooKeeper连接
scala> conf.set(“hbase.zookeeper.recoverable.waittime”, “10000”)
4.3.2 内存不足
java.lang.OutOfMemoryError: Java heap space
# 解决方案
# 1. 增加Executor内存
$ spark-submit –master yarn \
–executor-memory 8g \
–executor-cores 4 \
…
# 2. 增加堆外内存
$ spark-submit –master yarn \
–conf spark.executor.memoryOverhead=2g \
…
# 3. 减少分区数据量
scala> val hbaseRDD = sc.newAPIHadoopRDD(…)
scala> val repartitionedRDD = hbaseRDD.repartition(100)
# 4. 使用Scan缓存
scala> scan.setCaching(1000)
Part05-风哥经验总结与分享
5.1 集成最佳实践
HBase与Spark集成最佳实践建议:
1. 大批量写入使用Bulk Load
2. 读取时使用Scan缓存和过滤器
3. 合理配置资源参数
4. 处理连接池管理
5. 监控作业执行
5.2 使用建议
使用建议:
- 避免全表扫描
- 使用批量操作
- 合理设置并行度
- 监控资源使用
5.3 工具推荐
集成工具:
- HBase RDD:基础集成方式
- HBase Connector:优化集成方式
- Phoenix Spark:SQL集成方式
- Bulk Load:批量写入工具
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
