本文档风哥主要介绍Spark MLlib机器学习实战,包括MLlib核心概念、ML Pipeline管道机制、特征工程、分类算法、聚类算法等内容,风哥教程参考Spark官方文档MLlib Guide、ML Programming Guide等内容,适合大数据开发运维人员在学习和测试中使用,如果要应用于生产环境则需要自行确认。更多视频教程www.fgedu.net.cn
Part01-基础概念与理论知识
1.1 MLlib核心概念
MLlib是Spark的机器学习库,提供了常用的机器学习算法和特征工程工具。学习交流加群风哥微信: itpux-com
- DataFrame API:基于DataFrame的机器学习API
- Transformer:转换器,将DataFrame转换为另一个DataFrame
- Estimator:估计器,训练模型
- Pipeline:管道,将多个阶段串联起来
- Parameter:参数,统一配置接口
- Model:模型,训练后的结果
1.2 ML Pipeline管道机制
ML Pipeline管道机制详解:
1. 数据准备
– 读取数据
– 数据清洗
– 特征提取
2. 特征工程
– 特征转换
– 特征选择
– 特征缩放
3. 模型训练
– 算法选择
– 参数调优
– 模型训练
4. 模型评估
– 预测
– 评估指标
– 模型选择
# Pipeline示例
Tokenizer -> HashingTF -> LogisticRegression
# Pipeline组件
– Transformer: transform()方法
– Estimator: fit()方法返回Model
– Pipeline: 组合多个Stage
# Pipeline代码示例
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
val tokenizer = new Tokenizer()
.setInputCol(“text”)
.setOutputCol(“words”)
val hashingTF = new HashingTF()
.setNumFeatures(1000)
.setInputCol(“words”)
.setOutputCol(“features”)
val lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.01)
val pipeline = new Pipeline()
.setStages(Array(tokenizer, hashingTF, lr))
val model = pipeline.fit(trainingData)
1.3 机器学习算法类型
MLlib支持的算法类型:
1. 分类算法
– 逻辑回归
– 决策树
– 随机森林
– 梯度提升树
– 朴素贝叶斯
– 支持向量机
2. 回归算法
– 线性回归
– 决策树回归
– 随机森林回归
– 梯度提升树回归
3. 聚类算法
– K-Means
– 高斯混合模型
– LDA主题模型
4. 协同过滤
– ALS交替最小二乘法
5. 特征工程
– 特征提取
– 特征转换
– 特征选择
6. 降维算法
– PCA主成分分析
– SVD奇异值分解
Part02-生产环境规划与建议
2.1 特征工程规划
特征工程规划建议:
1. 数据探索
– 数据分布分析
– 缺失值处理
– 异常值检测
2. 特征提取
– 文本特征:TF-IDF、Word2Vec
– 类别特征:One-Hot、Label Encoding
– 数值特征:标准化、归一化
3. 特征选择
– 过滤法:方差、相关系数
– 包装法:递归特征消除
– 嵌入法:L1正则化
4. 特征转换
– 数值化:StringIndexer
– 向量化:VectorAssembler
– 标准化:StandardScaler
# 特征工程最佳实践
1. 保持特征一致性
2. 处理缺失值
3. 避免数据泄露
4. 保存特征处理流程
2.2 模型选择规划
模型选择规划建议:
1. 问题类型
– 分类问题:逻辑回归、决策树、随机森林
– 回归问题:线性回归、GBDT
– 聚类问题:K-Means、GMM
2. 数据规模
– 小数据:复杂模型
– 大数据:简单模型、分布式算法
3. 模型复杂度
– 简单模型:训练快、解释性好
– 复杂模型:精度高、训练慢
4. 评估指标
– 分类:准确率、精确率、召回率、F1
– 回归:MSE、RMSE、MAE、R2
– 聚类:轮廓系数、SSE
# 交叉验证
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
val paramGrid = new ParamGridBuilder()
.addGrid(lr.regParam, Array(0.01, 0.1, 1.0))
.addGrid(lr.elasticNetParam, Array(0.0, 0.5, 1.0))
.build()
val cv = new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(new BinaryClassificationEvaluator())
.setEstimatorParamMaps(paramGrid)
.setNumFolds(5)
2.3 模型部署规划
模型部署规划建议:
1. 模型保存
model.write.overwrite().save(“/model/fgedu-model”)
2. 模型加载
val model = PipelineModel.load(“/model/fgedu-model”)
3. 在线预测
val predictions = model.transform(newData)
4. 模型监控
– 预测性能监控
– 模型效果监控
– 数据漂移检测
5. 模型更新
– 定期重新训练
– 增量更新
– A/B测试
# 模型版本管理
/model/fgedu-model/
├── v1.0/
│ ├── metadata/
│ └── stages/
├── v1.1/
│ ├── metadata/
│ └── stages/
└── current -> v1.1/
Part03-生产环境项目实施方案
3.1 特征处理实战
3.1.1 基本特征处理
$ /bigdata/app/spark/bin/spark-shell \
–master spark://192.168.1.60:7077 \
–executor-memory 4g
# 导入MLlib类
scala> import org.apache.spark.ml.feature._
import org.apache.spark.ml.feature._
# 创建测试数据
scala> val data = Seq(
| (0, “hello world”, 1.0, 25),
| (1, “spark mllib”, 2.0, 30),
| (2, “machine learning”, 3.0, 28),
| (3, “data science”, 4.0, 35)
| ).toDF(“id”, “text”, “label”, “age”)
data: org.apache.spark.sql.DataFrame = [id: int, text: string … 2 more fields]
scala> data.show()
+—+—————-+—–+—+
| id| text|label|age|
+—+—————-+—–+—+
| 0| hello world| 1.0| 25|
| 1| spark mllib| 2.0| 30|
| 2|machine learning| 3.0| 28|
| 3| data science| 4.0| 35|
+—+—————-+—–+—+
# 分词
scala> val tokenizer = new Tokenizer()
| .setInputCol(“text”)
| .setOutputCol(“words”)
tokenizer: org.apache.spark.ml.feature.Tokenizer = tokenizer_8b5a12ae
scala> val wordsData = tokenizer.transform(data)
wordsData: org.apache.spark.sql.DataFrame = [id: int, text: string … 3 more fields]
scala> wordsData.select(“text”, “words”).show()
+—————-+——————+
| text| words|
+—————-+——————+
| hello world| [hello, world]|
| spark mllib| [spark, mllib]|
|machine learning|[machine, learning]|
| data science| [data, science]|
+—————-+——————+
# TF-IDF特征提取
scala> val hashingTF = new HashingTF()
| .setInputCol(“words”)
| .setOutputCol(“rawFeatures”)
| .setNumFeatures(20)
hashingTF: org.apache.spark.ml.feature.HashingTF = hashingTF_7b5a12ae
scala> val featurizedData = hashingTF.transform(wordsData)
featurizedData: org.apache.spark.sql.DataFrame = [id: int, text: string … 4 more fields]
scala> val idf = new IDF()
| .setInputCol(“rawFeatures”)
| .setOutputCol(“features”)
idf: org.apache.spark.ml.feature.IDF = idf_5b37e0d2
scala> val idfModel = idf.fit(featurizedData)
idfModel: org.apache.spark.ml.feature.IDFModel = idf_5b37e0d2
scala> val rescaledData = idfModel.transform(featurizedData)
rescaledData: org.apache.spark.sql.DataFrame = [id: int, text: string … 5 more fields]
3.1.2 特征转换与组装
scala> val indexer = new StringIndexer()
| .setInputCol(“text”)
| .setOutputCol(“textIndex”)
indexer: org.apache.spark.ml.feature.StringIndexer = strIdx_3a2b1c0d
scala> val indexed = indexer.fit(data).transform(data)
indexed: org.apache.spark.sql.DataFrame = [id: int, text: string … 3 more fields]
# OneHotEncoder(独热编码)
scala> val encoder = new OneHotEncoder()
| .setInputCol(“textIndex”)
| .setOutputCol(“textVec”)
encoder: org.apache.spark.ml.feature.OneHotEncoder = oneHot_4b3c2d1e
scala> val encoded = encoder.fit(indexed).transform(indexed)
encoded: org.apache.spark.sql.DataFrame = [id: int, text: string … 4 more fields]
# StandardScaler(标准化)
scala> val assembler = new VectorAssembler()
| .setInputCols(Array(“age”))
| .setOutputCol(“ageVec”)
assembler: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_5c4d3e2f
scala> val assembled = assembler.transform(data)
assembled: org.apache.spark.sql.DataFrame = [id: int, text: string … 3 more fields]
scala> val scaler = new StandardScaler()
| .setInputCol(“ageVec”)
| .setOutputCol(“scaledAge”)
| .setWithStd(true)
| .setWithMean(false)
scaler: org.apache.spark.ml.feature.StandardScaler = stdScaler_6d5e4f3g
scala> val scalerModel = scaler.fit(assembled)
scalerModel: org.apache.spark.ml.feature.StandardScalerModel = stdScaler_6d5e4f3g
scala> val scaledData = scalerModel.transform(assembled)
scaledData: org.apache.spark.sql.DataFrame = [id: int, text: string … 4 more fields]
# VectorAssembler(特征组装)
scala> val finalAssembler = new VectorAssembler()
| .setInputCols(Array(“features”, “scaledAge”))
| .setOutputCol(“finalFeatures”)
finalAssembler: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_7e6f5g4h
scala> val finalData = finalAssembler.transform(rescaledData)
finalData: org.apache.spark.sql.DataFrame = [id: int, text: string … 6 more fields]
3.2 分类算法实战
3.2.1 逻辑回归分类
scala> val training = spark.createDataFrame(Seq(
| (0L, “a b c d e spark”, 1.0),
| (1L, “b d”, 0.0),
| (2L, “spark f g h”, 1.0),
| (3L, “hadoop mapreduce”, 0.0),
| (4L, “spark mllib”, 1.0)
| )).toDF(“id”, “text”, “label”)
training: org.apache.spark.sql.DataFrame = [id: bigint, text: string … 1 more field]
# 构建Pipeline
scala> import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.Pipeline
scala> import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.classification.LogisticRegression
scala> val tokenizer = new Tokenizer()
| .setInputCol(“text”)
| .setOutputCol(“words”)
tokenizer: org.apache.spark.ml.feature.Tokenizer = tokenizer_8f7g6h5i
scala> val hashingTF = new HashingTF()
| .setNumFeatures(1000)
| .setInputCol(“words”)
| .setOutputCol(“features”)
hashingTF: org.apache.spark.ml.feature.HashingTF = hashingTF_9g8h7i6j
scala> val lr = new LogisticRegression()
| .setMaxIter(10)
| .setRegParam(0.01)
lr: org.apache.spark.ml.classification.LogisticRegression = logreg_0h9i8j7k
scala> val pipeline = new Pipeline()
| .setStages(Array(tokenizer, hashingTF, lr))
pipeline: org.apache.spark.ml.Pipeline = pipeline_1i0j9k8l
# 训练模型
scala> val model = pipeline.fit(training)
model: org.apache.spark.ml.PipelineModel = pipeline_1i0j9k8l
# 预测
scala> val test = spark.createDataFrame(Seq(
| (5L, “spark test”),
| (6L, “hadoop test”)
| )).toDF(“id”, “text”)
test: org.apache.spark.sql.DataFrame = [id: bigint, text: string]
scala> val prediction = model.transform(test)
prediction: org.apache.spark.sql.DataFrame = [id: bigint, text: string … 5 more fields]
scala> prediction.select(“text”, “prediction”, “probability”).show()
+———–+———-+——————–+
| text|prediction| probability|
+———–+———-+——————–+
| spark test| 1.0|[0.02584268054755…|
|hadoop test| 0.0|[0.97415731945244…|
+———–+———-+——————–+
3.2.2 随机森林分类
scala> import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.classification.RandomForestClassifier
scala> val data = Seq(
| (1.0, 2.0, 0.0),
| (2.0, 3.0, 0.0),
| (3.0, 4.0, 1.0),
| (4.0, 5.0, 1.0),
| (5.0, 6.0, 1.0)
| ).toDF(“feature1”, “feature2”, “label”)
data: org.apache.spark.sql.DataFrame = [feature1: double, feature2: double … 1 more field]
scala> val assembler = new VectorAssembler()
| .setInputCols(Array(“feature1”, “feature2”))
| .setOutputCol(“features”)
assembler: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_2j1k0l9m
scala> val assembledData = assembler.transform(data)
assembledData: org.apache.spark.sql.DataFrame = [feature1: double, feature2: double … 2 more fields]
# 训练随机森林
scala> val rf = new RandomForestClassifier()
| .setLabelCol(“label”)
| .setFeaturesCol(“features”)
| .setNumTrees(10)
rf: org.apache.spark.ml.classification.RandomForestClassifier = rfc_3k2l1m0n
scala> val rfModel = rf.fit(assembledData)
rfModel: org.apache.spark.ml.classification.RandomForestClassificationModel = rfc_3k2l1m0n
# 预测
scala> val predictions = rfModel.transform(assembledData)
predictions: org.apache.spark.sql.DataFrame = [feature1: double, feature2: double … 4 more fields]
scala> predictions.select(“label”, “prediction”, “probability”).show()
+—–+———-+——————–+
|label|prediction| probability|
+—–+———-+——————–+
| 0.0| 0.0|[0.9,0.0999999999…|
| 0.0| 0.0|[0.9,0.0999999999…|
| 1.0| 1.0|[0.1,0.8999999999…|
| 1.0| 1.0|[0.1,0.8999999999…|
| 1.0| 1.0|[0.1,0.8999999999…|
+—–+———-+——————–+
# 查看特征重要性
scala> rfModel.featureImportances
res0: org.apache.spark.ml.linalg.Vector = [0.5,0.5]
3.3 聚类算法实战
scala> import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.clustering.KMeans
scala> val data = spark.createDataFrame(Seq(
| (0.0, 0.0, 0.0),
| (0.1, 0.1, 0.1),
| (0.2, 0.2, 0.2),
| (9.0, 9.0, 9.0),
| (9.1, 9.1, 9.1),
| (9.2, 9.2, 9.2)
| )).toDF(“f1”, “f2”, “f3”)
data: org.apache.spark.sql.DataFrame = [f1: double, f2: double … 1 more field]
scala> val assembler = new VectorAssembler()
| .setInputCols(Array(“f1”, “f2”, “f3”))
| .setOutputCol(“features”)
assembler: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_4l3m2n1o
scala> val dataset = assembler.transform(data)
dataset: org.apache.spark.sql.DataFrame = [f1: double, f2: double … 2 more fields]
# 训练K-Means
scala> val kmeans = new KMeans()
| .setK(2)
| .setSeed(1L)
kmeans: org.apache.spark.ml.clustering.KMeans = kmeans_5m4n3o2p
scala> val model = kmeans.fit(dataset)
model: org.apache.spark.ml.clustering.KMeansModel = kmeans_5m4n3o2p
# 预测
scala> val predictions = model.transform(dataset)
predictions: org.apache.spark.sql.DataFrame = [f1: double, f2: double … 3 more fields]
scala> predictions.show()
+—-+—-+—-+—————-+———-+
| f1| f2| f3| features|prediction|
+—-+—-+—-+—————-+———-+
| 0.0| 0.0| 0.0| [0.0,0.0,0.0]| 0|
| 0.1| 0.1| 0.1|[0.1,0.1,0.1]| 0|
| 0.2| 0.2| 0.2|[0.2,0.2,0.2]| 0|
| 9.0| 9.0| 9.0| [9.0,9.0,9.0]| 1|
| 9.1| 9.1| 9.1|[9.1,9.1,9.1]| 1|
| 9.2| 9.2| 9.2|[9.2,9.2,9.2]| 1|
+—-+—-+—-+—————-+———-+
# 评估聚类效果
scala> import org.apache.spark.ml.evaluation.ClusteringEvaluator
import org.apache.spark.ml.evaluation.ClusteringEvaluator
scala> val evaluator = new ClusteringEvaluator()
evaluator: org.apache.spark.ml.evaluation.ClusteringEvaluator = ClusteringEvaluator: uid=clusteringEval_6n5o4p3q
scala> val silhouette = evaluator.evaluate(predictions)
silhouette: Double = 0.99975
# 查看聚类中心
scala> model.clusterCenters
res1: Array[org.apache.spark.ml.linalg.Vector] = Array([0.1,0.1,0.1], [9.1,9.1,9.1])
Part04-生产案例与实战讲解
4.1 预测分析实战案例
# 创建用户数据
scala> val userData = Seq(
| (1, 25, 5000.0, 12, 0),
| (2, 30, 8000.0, 24, 0),
| (3, 35, 12000.0, 6, 1),
| (4, 28, 6000.0, 18, 0),
| (5, 40, 15000.0, 3, 1),
| (6, 32, 9000.0, 15, 0),
| (7, 45, 20000.0, 2, 1),
| (8, 27, 5500.0, 20, 0)
| ).toDF(“user_id”, “age”, “income”, “tenure”, “churn”)
userData: org.apache.spark.sql.DataFrame = [user_id: int, age: int … 3 more fields]
# 特征工程
scala> val assembler = new VectorAssembler()
| .setInputCols(Array(“age”, “income”, “tenure”))
| .setOutputCol(“features”)
assembler: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_7o6p5q4r
scala> val scaler = new StandardScaler()
| .setInputCol(“features”)
| .setOutputCol(“scaledFeatures”)
scaler: org.apache.spark.ml.feature.StandardScaler = stdScaler_8p7q6r5s
# 构建Pipeline
scala> val lr = new LogisticRegression()
| .setLabelCol(“churn”)
| .setFeaturesCol(“scaledFeatures”)
| .setMaxIter(20)
lr: org.apache.spark.ml.classification.LogisticRegression = logreg_9q8r7s6t
scala> val pipeline = new Pipeline()
| .setStages(Array(assembler, scaler, lr))
pipeline: org.apache.spark.ml.Pipeline = pipeline_0r9s8t7u
# 训练模型
scala> val model = pipeline.fit(userData)
model: org.apache.spark.ml.PipelineModel = pipeline_0r9s8t7u
# 预测新用户
scala> val newUsers = Seq(
| (9, 38, 10000.0, 8),
| (10, 29, 7000.0, 22)
| ).toDF(“user_id”, “age”, “income”, “tenure”)
newUsers: org.apache.spark.sql.DataFrame = [user_id: int, age: int … 2 more fields]
scala> val predictions = model.transform(newUsers)
predictions: org.apache.spark.sql.DataFrame = [user_id: int, age: int … 5 more fields]
scala> predictions.select(“user_id”, “prediction”, “probability”).show()
+——-+———-+——————–+
|user_id|prediction| probability|
+——-+———-+——————–+
| 9| 1.0|[0.35,0.65]|
| 10| 0.0|[0.72,0.28]|
+——-+———-+——————–+
4.2 推荐系统实战
scala> import org.apache.spark.ml.recommendation.ALS
import org.apache.spark.ml.recommendation.ALS
scala> val ratings = Seq(
| (1, 1, 5.0),
| (1, 2, 4.0),
| (1, 3, 3.0),
| (2, 1, 4.0),
| (2, 2, 5.0),
| (2, 4, 4.0),
| (3, 2, 3.0),
| (3, 3, 5.0),
| (3, 4, 4.0)
| ).toDF(“userId”, “movieId”, “rating”)
ratings: org.apache.spark.sql.DataFrame = [userId: int, movieId: int … 1 more field]
# 训练ALS模型
scala> val als = new ALS()
| .setUserCol(“userId”)
| .setItemCol(“movieId”)
| .setRatingCol(“rating”)
| .setRank(10)
| .setMaxIter(10)
| .setRegParam(0.01)
als: org.apache.spark.ml.recommendation.ALS = als_1s0t9u8v
scala> val model = als.fit(ratings)
model: org.apache.spark.ml.recommendation.ALSModel = als_1s0t9u8v
# 为用户推荐电影
scala> val userRecs = model.recommendForAllUsers(3)
userRecs: org.apache.spark.sql.DataFrame = [userId: int, recommendations: array
scala> userRecs.show(false)
+——+——————————–+
|userId|recommendations |
+——+——————————–+
|1 |[[4,4.8], [2,4.0], [3,3.0]] |
|2 |[[3,4.2], [1,4.0], [2,5.0]] |
|3 |[[1,4.5], [2,3.0], [3,5.0]] |
+——+——————————–+
# 为电影推荐用户
scala> val movieRecs = model.recommendForAllItems(3)
movieRecs: org.apache.spark.sql.DataFrame = [movieId: int, recommendations: array
scala> movieRecs.show(false)
+——-+——————————–+
|movieId|recommendations |
+——-+——————————–+
|1 |[[2,4.0], [1,5.0], [3,4.5]] |
|2 |[[2,5.0], [1,4.0], [3,3.0]] |
|3 |[[3,5.0], [1,3.0], [2,4.2]] |
|4 |[[1,4.8], [2,4.0], [3,4.0]] |
+——-+——————————–+
4.3 常见问题处理
4.3.1 模型训练慢
# 排查步骤
# 1. 检查数据量
scala> data.count()
# 2. 检查分区数
scala> data.rdd.getNumPartitions
# 解决方案
# 1. 增加并行度
spark.default.parallelism=200
# 2. 增加Executor资源
–num-executors 20
–executor-cores 4
# 3. 减少迭代次数
lr.setMaxIter(5)
# 4. 使用缓存
scala> data.cache()
4.3.2 模型效果差
# 排查步骤
# 1. 检查数据质量
scala> data.describe().show()
# 2. 检查特征分布
scala> data.select(“feature1”).describe().show()
# 解决方案
# 1. 特征工程
# – 处理缺失值
# – 特征标准化
# – 特征选择
# 2. 参数调优
scala> val paramGrid = new ParamGridBuilder()
| .addGrid(lr.regParam, Array(0.01, 0.1, 1.0))
| .addGrid(lr.elasticNetParam, Array(0.0, 0.5, 1.0))
| .build()
# 3. 交叉验证
scala> val cv = new CrossValidator()
| .setEstimator(pipeline)
| .setEvaluator(new BinaryClassificationEvaluator())
| .setEstimatorParamMaps(paramGrid)
| .setNumFolds(5)
Part05-风哥经验总结与分享
5.1 MLlib最佳实践
MLlib最佳实践建议:
1. 使用Pipeline管理流程
2. 保存完整的特征处理流程
3. 使用交叉验证调参
4. 监控模型效果
5. 定期更新模型
# 特征工程最佳实践
1. 保持训练和预测特征一致
2. 处理缺失值和异常值
3. 特征标准化
4. 特征选择
5.2 模型调优建议
模型调优建议:
- 使用交叉验证选择最优参数
- 调整正则化参数防止过拟合
- 选择合适的评估指标
- 监控训练过程
5.3 工具推荐
MLlib工具推荐:
- Spark UI:查看训练进度
- CrossValidator:交叉验证调参
- TrainValidationSplit:训练验证切分
- Pipeline:流程管理
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
