本文档风哥主要介绍Spark GraphX图计算实战,包括GraphX核心概念、属性图模型、图操作算子、图算法等内容,风哥教程参考Spark官方文档GraphX Programming Guide、GraphX API等内容,适合大数据开发运维人员在学习和测试中使用,如果要应用于生产环境则需要自行确认。更多视频教程www.fgedu.net.cn
Part01-基础概念与理论知识
1.1 GraphX核心概念
GraphX是Spark的图计算框架,提供了图抽象和图操作算子,支持图并行计算。学习交流加群风哥微信: itpux-com
- Vertex:顶点,图中的节点
- Edge:边,连接两个顶点的关系
- Property Graph:属性图,顶点和边都有属性
- Triplet:三元组,包含顶点属性和边属性
- VertexRDD:顶点RDD
- EdgeRDD:边RDD
1.2 属性图模型
属性图模型详解:
1. 顶点
– VertexId: 顶点ID(Long类型)
– VD: 顶点属性(泛型)
2. 边
– srcId: 源顶点ID
– dstId: 目标顶点ID
– ED: 边属性(泛型)
3. 属性图
– vertices: VertexRDD[VD]
– edges: EdgeRDD[ED]
– triplets: RDD[EdgeTriplet[VD, ED]]
# 图示例
用户社交网络:
– 顶点:用户(ID, 姓名, 年龄)
– 边:关注关系(源用户, 目标用户, 关注时间)
# Graph类定义
class Graph[VD, ED] {
val vertices: VertexRDD[VD]
val edges: EdgeRDD[ED]
val triplets: RDD[EdgeTriplet[VD, ED]]
}
# 创建图
val vertices = sc.parallelize(Seq(
(1L, (“fgedu01”, 25)),
(2L, (“fgedu02”, 30)),
(3L, (“fgedu03”, 28))
))
val edges = sc.parallelize(Seq(
Edge(1L, 2L, “follow”),
Edge(2L, 3L, “follow”),
Edge(1L, 3L, “follow”)
))
val graph = Graph(vertices, edges)
1.3 图操作算子
GraphX图操作算子详解:
1. 属性操作
– mapVertices: 转换顶点属性
– mapEdges: 转换边属性
– mapTriplets: 转换三元组属性
2. 结构操作
– reverse: 反转边方向
– subgraph: 子图提取
– mask: 图掩码
– groupEdges: 合并边
3. 连接操作
– joinVertices: 连接顶点
– outerJoinVertices: 外连接顶点
4. 聚合操作
– aggregateMessages: 消息聚合
– collectNeighborIds: 收集邻居ID
– collectNeighbors: 收集邻居
5. 图算法
– PageRank: 页面排名
– ConnectedComponents: 连通分量
– TriangleCount: 三角形计数
– ShortestPaths: 最短路径
Part02-生产环境规划与建议
2.1 图数据设计
图数据设计建议:
1. 顶点设计
– 选择合适的顶点ID
– 设计顶点属性
– 考虑顶点类型
2. 边设计
– 确定边方向
– 设计边属性
– 考虑边权重
3. 图类型
– 有向图:边有方向
– 无向图:边无方向
– 多重图:两顶点间多条边
# 示例:社交网络图设计
顶点:
– ID: 用户ID
– 属性: (姓名, 年龄, 地区)
边:
– 源ID: 关注者ID
– 目标ID: 被关注者ID
– 属性: (关注时间, 互动次数)
# 示例:知识图谱设计
顶点:
– ID: 实体ID
– 属性: (名称, 类型, 属性列表)
边:
– 源ID: 头实体ID
– 目标ID: 尾实体ID
– 属性: (关系类型, 置信度)
2.2 图分区规划
图分区规划建议:
1. EdgePartition2D
– 二维边分区
– 减少顶点复制
– 适合大规模图
2. RandomVertexCut
– 随机顶点切分
– 简单高效
– 适合均匀分布
3. CanonicalRandomVertexCut
– 规范随机顶点切分
– 合并方向相反的边
– 适合无向图
# 分区配置
val graph = Graph(vertices, edges, defaultVertexAttr)
.partitionBy(PartitionStrategy.EdgePartition2D, numPartitions = 100)
# 分区数建议
– 小图(<100万边):分区数 = 核心数
- 中图(100万-1亿边):分区数 = 核心数 * 2-4
- 大图(>1亿边):分区数 = 核心数 * 4-8
# 查看分区信息
scala> graph.edges.partitions.size
res0: Int = 100
2.3 图算法选择
图算法选择建议:
1. 重要性分析
– PageRank: 网页排名
– 推荐场景:社交网络影响力分析
2. 社区发现
– ConnectedComponents: 连通分量
– LabelPropagation: 标签传播
– 推荐场景:社区发现、用户分群
3. 路径分析
– ShortestPaths: 最短路径
– 推荐场景:路由优化、推荐路径
4. 结构分析
– TriangleCount: 三角形计数
– 推荐场景:聚类系数计算
5. 关系分析
– SVDPlusPlus: 推荐算法
– 推荐场景:协同过滤推荐
# 算法复杂度
– PageRank: O(iterations * edges)
– ConnectedComponents: O(iterations * edges)
– ShortestPaths: O(iterations * vertices)
– TriangleCount: O(edges)
Part03-生产环境项目实施方案
3.1 图创建实战
3.1.1 从RDD创建图
$ /bigdata/app/spark/bin/spark-shell \
–master spark://192.168.1.60:7077 \
–executor-memory 4g
# 导入GraphX类
scala> import org.apache.spark.graphx._
import org.apache.spark.graphx._
scala> import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD
# 创建顶点RDD
scala> val users: RDD[(VertexId, (String, Int))] = sc.parallelize(Seq(
| (1L, (“fgedu01”, 25)),
| (2L, (“fgedu02”, 30)),
| (3L, (“fgedu03”, 28)),
| (4L, (“fgedu04”, 35)),
| (5L, (“fgedu05”, 27))
| ))
users: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.VertexId, (String, Int))] = ParallelCollectionRDD[0] at parallelize at
# 创建边RDD
scala> val relationships: RDD[Edge[String]] = sc.parallelize(Seq(
| Edge(1L, 2L, “friend”),
| Edge(1L, 3L, “friend”),
| Edge(2L, 3L, “friend”),
| Edge(3L, 4L, “follow”),
| Edge(4L, 5L, “friend”),
| Edge(5L, 1L, “follow”)
| ))
relationships: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[String]] = ParallelCollectionRDD[1] at parallelize at
# 创建图
scala> val graph = Graph(users, relationships)
graph: org.apache.spark.graphx.Graph[(String, Int),String] = org.apache.spark.graphx.impl.GraphImpl@7b5a12ae
# 查看图信息
scala> graph.numVertices
res0: Long = 5
scala> graph.numEdges
res1: Long = 6
# 查看顶点
scala> graph.vertices.collect.foreach(println)
(1,(fgedu01,25))
(2,(fgedu02,30))
(3,(fgedu03,28))
(4,(fgedu04,35))
(5,(fgedu05,27))
# 查看边
scala> graph.edges.collect.foreach(println)
Edge(1,2,friend)
Edge(1,3,friend)
Edge(2,3,friend)
Edge(3,4,follow)
Edge(4,5,friend)
Edge(5,1,follow)
3.1.2 从文件创建图
$ cat > /bigdata/spark-data/fgedu-edges.txt << 'EOF' 1 2 1 3 2 3 3 4 4 5 5 1 EOF $ hdfs dfs -put /bigdata/spark-data/fgedu-edges.txt /spark-data/ # 从文件加载图 scala> val edges = sc.textFile(“hdfs://192.168.1.60:9000/spark-data/fgedu-edges.txt”)
| .map(line => {
| val parts = line.split(” “)
| Edge(parts(0).toLong, parts(1).toLong, 1)
| })
edges: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[Int]] = MapPartitionsRDD[5] at map at
scala> val graph = Graph.fromEdges(edges, defaultValue = 1)
graph: org.apache.spark.graphx.Graph[Int,Int] = org.apache.spark.graphx.impl.GraphImpl@5b37e0d2
# 查看图信息
scala> graph.numVertices
res2: Long = 5
scala> graph.numEdges
res3: Long = 6
3.2 图操作实战
3.2.1 属性操作
scala> val graphWithAge = graph.mapVertices((id, attr) => attr._2)
graphWithAge: org.apache.spark.graphx.Graph[Int,String] = org.apache.spark.graphx.impl.GraphImpl@3a2b1c0d
scala> graphWithAge.vertices.collect.foreach(println)
(1,25)
(2,30)
(3,28)
(4,35)
(5,27)
# mapEdges操作
scala> val graphWithWeight = graph.mapEdges(edge => edge.attr.length)
graphWithWeight: org.apache.spark.graphx.Graph[(String, Int),Int] = org.apache.spark.graphx.impl.GraphImpl@4b3c2d1e
scala> graphWithWeight.edges.collect.foreach(println)
Edge(1,2,6)
Edge(1,3,6)
Edge(2,3,6)
Edge(3,4,6)
Edge(4,5,6)
Edge(5,1,6)
# mapTriplets操作
scala> val graphWithTriplets = graph.mapTriplets(triplet => {
| s”${triplet.srcAttr._1} -> ${triplet.dstAttr._1}: ${triplet.attr}”
| })
graphWithTriplets: org.apache.spark.graphx.Graph[(String, Int),String] = org.apache.spark.graphx.impl.GraphImpl@5c4d3e2f
scala> graphWithTriplets.edges.collect.foreach(println)
Edge(1,2,fgedu01 -> fgedu02: friend)
Edge(1,3,fgedu01 -> fgedu03: friend)
Edge(2,3,fgedu02 -> fgedu03: friend)
Edge(3,4,fgedu03 -> fgedu04: follow)
Edge(4,5,fgedu04 -> fgedu05: friend)
Edge(5,1,fgedu05 -> fgedu01: follow)
3.2.2 结构操作
scala> val reversedGraph = graph.reverse
reversedGraph: org.apache.spark.graphx.Graph[(String, Int),String] = org.apache.spark.graphx.impl.GraphImpl@6d5e4f3g
scala> reversedGraph.edges.collect.foreach(println)
Edge(2,1,friend)
Edge(3,1,friend)
Edge(3,2,friend)
Edge(4,3,follow)
Edge(5,4,friend)
Edge(1,5,follow)
# subgraph操作(子图提取)
scala> val subGraph = graph.subgraph(
| vpred = (id, attr) => attr._2 > 26
| )
subGraph: org.apache.spark.graphx.Graph[(String, Int),String] = org.apache.spark.graphx.impl.GraphImpl@7e6f5g4h
scala> subGraph.vertices.collect.foreach(println)
(2,(fgedu02,30))
(3,(fgedu03,28))
(4,(fgedu04,35))
(5,(fgedu05,27))
# mask操作(图掩码)
scala> val maskedGraph = graph.mask(subGraph)
maskedGraph: org.apache.spark.graphx.Graph[(String, Int),String] = org.apache.spark.graphx.impl.GraphImpl@8f7g6h5i
# 度数统计
scala> graph.degrees.collect.foreach(println)
(1,3)
(2,2)
(3,3)
(4,2)
(5,2)
scala> graph.inDegrees.collect.foreach(println)
(1,1)
(2,1)
(3,2)
(4,1)
(5,1)
scala> graph.outDegrees.collect.foreach(println)
(1,2)
(2,1)
(3,1)
(4,1)
(5,1)
3.2.3 聚合操作
scala> val inDegree: VertexRDD[Int] = graph.aggregateMessages[Int](
| ctx => ctx.sendToDst(1),
| _ + _
| )
inDegree: org.apache.spark.graphx.VertexRDD[Int] = VertexRDDImpl[9]
scala> inDegree.collect.foreach(println)
(1,1)
(2,1)
(3,2)
(4,1)
(5,1)
# collectNeighborIds操作
scala> val neighbors = graph.collectNeighborIds(EdgeDirection.Out)
neighbors: org.apache.spark.graphx.VertexRDD[Array[org.apache.spark.graphx.VertexId]] = VertexRDDImpl[10]
scala> neighbors.collect.foreach { case (id, nbrs) =>
| println(s”Vertex $id neighbors: ${nbrs.mkString(“, “)}”)
| }
Vertex 1 neighbors: 2, 3
Vertex 2 neighbors: 3
Vertex 3 neighbors: 4
Vertex 4 neighbors: 5
Vertex 5 neighbors: 1
# joinVertices操作
scala> val updatedGraph = graph.joinVertices(inDegree)((id, attr, deg) => {
| (attr._1, attr._2, deg)
| })
updatedGraph: org.apache.spark.graphx.Graph[(String, Int, Int),String] = org.apache.spark.graphx.impl.GraphImpl@0h9i8j7k
scala> updatedGraph.vertices.collect.foreach(println)
(1,(fgedu01,25,1))
(2,(fgedu02,30,1))
(3,(fgedu03,28,2))
(4,(fgedu04,35,1))
(5,(fgedu05,27,1))
3.3 图算法实战
scala> import org.apache.spark.graphx.lib.PageRank
import org.apache.spark.graphx.lib.PageRank
scala> val pageRankGraph = graph.pageRank(0.001)
pageRankGraph: org.apache.spark.graphx.Graph[Double,Double] = org.apache.spark.graphx.impl.GraphImpl@1i0j9k8l
scala> pageRankGraph.vertices.collect.foreach(println)
(1,0.9999999999999999)
(2,0.9999999999999999)
(3,1.4999999999999998)
(4,0.9999999999999999)
(5,0.9999999999999999)
# 连通分量算法
scala> import org.apache.spark.graphx.lib.ConnectedComponents
import org.apache.spark.graphx.lib.ConnectedComponents
scala> val ccGraph = graph.connectedComponents()
ccGraph: org.apache.spark.graphx.Graph[org.apache.spark.graphx.VertexId,String] = org.apache.spark.graphx.impl.GraphImpl@2j1k0l9m
scala> ccGraph.vertices.collect.foreach(println)
(1,1)
(2,1)
(3,1)
(4,1)
(5,1)
# 三角形计数
scala> import org.apache.spark.graphx.lib.TriangleCount
import org.apache.spark.graphx.lib.TriangleCount
scala> val tcGraph = graph.triangleCount()
tcGraph: org.apache.spark.graphx.Graph[Int,String] = org.apache.spark.graphx.impl.GraphImpl@3k2l1m0n
scala> tcGraph.vertices.collect.foreach(println)
(1,1)
(2,1)
(3,1)
(4,0)
(5,0)
# 最短路径
scala> import org.apache.spark.graphx.lib.ShortestPaths
import org.apache.spark.graphx.lib.ShortestPaths
scala> val spGraph = ShortestPaths.run(graph, Seq(1L))
spGraph: org.apache.spark.graphx.Graph[scala.collection.immutable.Map[org.apache.spark.graphx.VertexId,Int],String] = org.apache.spark.graphx.impl.GraphImpl@4l3m2n1o
scala> spGraph.vertices.collect.foreach(println)
(1,Map(1 -> 0))
(2,Map(1 -> 1))
(3,Map(1 -> 1))
(4,Map(1 -> 2))
(5,Map(1 -> 2))
Part04-生产案例与实战讲解
4.1 社交网络分析案例
# 创建社交网络数据
scala> val users = sc.parallelize(Seq(
| (1L, (“张三”, “北京”)),
| (2L, (“李四”, “上海”)),
| (3L, (“王五”, “北京”)),
| (4L, (“赵六”, “广州”)),
| (5L, (“钱七”, “上海”)),
| (6L, (“孙八”, “北京”))
| ))
users: org.apache.spark.rdd.RDD[(Long, (String, String))] = ParallelCollectionRDD[20] at parallelize at
scala> val relationships = sc.parallelize(Seq(
| Edge(1L, 2L, “friend”),
| Edge(1L, 3L, “friend”),
| Edge(2L, 3L, “friend”),
| Edge(2L, 5L, “friend”),
| Edge(3L, 4L, “friend”),
| Edge(3L, 6L, “friend”),
| Edge(4L, 5L, “friend”),
| Edge(5L, 6L, “friend”)
| ))
relationships: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[String]] = ParallelCollectionRDD[21] at parallelize at
scala> val socialGraph = Graph(users, relationships)
socialGraph: org.apache.spark.graphx.Graph[(String, String),String] = org.apache.spark.graphx.impl.GraphImpl@5m4n3o2p
# 计算用户影响力
scala> val influence = socialGraph.pageRank(0.001).vertices
influence: org.apache.spark.graphx.VertexRDD[Double] = VertexRDDImpl[22]
scala> influence.join(users).collect().foreach { case (id, (rank, (name, city))) =>
| println(s”$name ($city): 影响力 = $rank”)
| }
张三 (北京): 影响力 = 1.0
李四 (上海): 影响力 = 1.5
王五 (北京): 影响力 = 2.0
赵六 (广州): 影响力 = 1.0
钱七 (上海): 影响力 = 1.5
孙八 (北京): 影响力 = 1.0
# 社区发现
scala> val communities = socialGraph.connectedComponents().vertices
communities: org.apache.spark.graphx.VertexRDD[Long] = VertexRDDImpl[23]
scala> communities.join(users).collect().foreach { case (id, (community, (name, city))) =>
| println(s”$name -> 社区 $community”)
| }
张三 -> 社区 1
李四 -> 社区 1
王五 -> 社区 1
赵六 -> 社区 1
钱七 -> 社区 1
孙八 -> 社区 1
# 好友推荐(共同好友)
scala> val recommendations = socialGraph.aggregateMessages[Set[Long]](
| ctx => {
| ctx.sendToSrc(Set(ctx.dstId))
| ctx.sendToDst(Set(ctx.srcId))
| },
| _ ++ _
| )
recommendations: org.apache.spark.graphx.VertexRDD[Set[Long]] = VertexRDDImpl[24]
4.2 PageRank实战
# 创建网页链接图
scala> val pages = sc.parallelize(Seq(
| (1L, “首页”),
| (2L, “产品页”),
| (3L, “关于我们”),
| (4L, “联系方式”),
| (5L, “博客”)
| ))
pages: org.apache.spark.rdd.RDD[(Long, String)] = ParallelCollectionRDD[25] at parallelize at
scala> val links = sc.parallelize(Seq(
| Edge(1L, 2L, 1.0),
| Edge(1L, 3L, 1.0),
| Edge(1L, 4L, 1.0),
| Edge(2L, 1L, 1.0),
| Edge(2L, 5L, 1.0),
| Edge(3L, 1L, 1.0),
| Edge(4L, 1L, 1.0),
| Edge(5L, 2L, 1.0),
| Edge(5L, 3L, 1.0)
| ))
links: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[Double]] = ParallelCollectionRDD[26] at parallelize at
scala> val webGraph = Graph(pages, links)
webGraph: org.apache.spark.graphx.Graph[String,Double] = org.apache.spark.graphx.impl.GraphImpl@6n5o4p3q
# 运行PageRank
scala> val prGraph = webGraph.pageRank(0.001)
prGraph: org.apache.spark.graphx.Graph[Double,Double] = org.apache.spark.graphx.impl.GraphImpl@7o6p5q4r
# 输出排名结果
scala> prGraph.vertices.join(pages).collect()
| .sortBy(-_._2._1)
| .foreach { case (id, (rank, name)) =>
| println(f”$name: PageRank = $rank%.4f”)
| }
首页: PageRank = 1.8750
产品页: PageRank = 1.2500
关于我们: PageRank = 1.1250
博客: PageRank = 0.8750
联系方式: PageRank = 0.8750
4.3 常见问题处理
4.3.1 内存不足问题
# 排查步骤
# 1. 检查图大小
scala> graph.numVertices
scala> graph.numEdges
# 2. 检查分区数
scala> graph.edges.partitions.size
# 解决方案
# 1. 增加Executor内存
–executor-memory 16g
# 2. 增加分区数
scala> graph.partitionBy(PartitionStrategy.EdgePartition2D, 200)
# 3. 使用缓存
scala> graph.cache()
# 4. 减少顶点属性大小
scala> graph.mapVertices((id, attr) => attr._1)
4.3.2 计算慢问题
# 排查步骤
# 1. 检查迭代次数
# PageRank默认迭代直到收敛
# 2. 检查分区分布
scala> graph.edges.mapPartitions(iter => Iterator(iter.size)).collect()
# 解决方案
# 1. 减少迭代次数
scala> graph.pageRank(0.01, 10) // 设置容差和最大迭代次数
# 2. 优化分区
scala> graph.partitionBy(PartitionStrategy.EdgePartition2D, 100)
# 3. 增加并行度
spark.default.parallelism=200
Part05-风哥经验总结与分享
5.1 GraphX最佳实践
GraphX最佳实践建议:
1. 合理设计图模型
– 选择合适的顶点和边
– 设计属性结构
2. 优化分区策略
– 使用EdgePartition2D
– 合理设置分区数
3. 减少数据复制
– 使用高效分区策略
– 避免频繁属性转换
4. 缓存常用图
– 缓存处理后的图
– 及时释放不用的图
5.2 性能调优建议
性能调优建议:
- 使用EdgePartition2D分区策略
- 合理设置分区数
- 缓存常用图数据
- 减少顶点复制
5.3 工具推荐
GraphX工具推荐:
- GraphFrames:DataFrame API的图计算库
- NetworkX:Python图计算库(小规模)
- Gephi:图可视化工具
- Neo4j:图数据库
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
