安装
从Spark官网下载安装包,然后解压即可。非常简单
启动主机和worker
进入spark目录,然后运行脚本
./sbin/start-master.sh
即可。进程会在后台运行,你可以通过 http://localhost:8080 进行监控。
启动worker的脚本是
./bin/spark-class org.apache.spark.deploy.worker.Worker spark://IsP:PORT
其中IP和PORT可以在监控页面看到。
关闭worker很简单,直接关闭worker运行的shell或者ctr + c中断即可。
关闭主机需要运行脚本
./sbin/stop-master.sh
Spark shell
启动scala版的shell命令为./bin/spark-shell
,python版的命令为./bin/pyspark
SparkContext
sc是spark的入口,通过SparkConf
来创建它。
val sparkConf = new SparkConf().setAppName("FromPostgreSql") .setMaster("local[4]") .set("spark.executor.memory", "2g") val sc = new SparkCsontext(sparkConf)
对了,目前spark只支持的scala版本是2.10.x,所以用2.11.x版本可能会出错。
使用sc.stop()
方法停止SparkContext。貌似不执行stop,本地用sbt run
运行时会出现错误信息,
但是提交jar方式运行没问题。
参考https://stackoverflow.com/questions/28362341/error-utils-uncaught-exception-in-thread-sparklistenerbus.
- issue
- 使用
sbt run
方式运行任务,如果涉及到saveAsTextFile
操作时,会出错,原因未知。
- 使用
RDD
- RDD,全称为Resilient Distributed Datasets,是一个容错的、并行的数据结构,可以让用户显式地将数据存储到磁盘和内存中,并能控制数据的分区。
- in-memory cache.
cache()
- RDD 常用操作
count()
foreach
,map
,flatMap
,filter
,
- 并行化容器,可以通过
SparkContext.parallelize
方法创建分布式便于并行计算的数据结构。也可以用来将scala的容器转换为RDD结构的tips
val data = Array(1,2,4,5,6,7) val distData = sc.parallelize(data)
- 从外部数据库创建,支持本地文件系统,HDFS,Cassandra, HBase, Amazon S3, 等。
支持的文件格式包括文本文件, SequenceFiles,其他Hadoop输入格式。
其中文本格式可以通过SparkContext.textFile(URI [, partition_number])
方法创建RDD。- 支持本地文件和网络文件的URI,"/home/user/path-to-file", "hdfs://path-to-file"
- 支持文件夹,压缩文件,通配符等方式。例如"/path-to-file/*.gz", "/path-to-file/directory"
- 指定分区数目,每一个分区是64MB,默认创建一个分区。
- 也可以通过
SparkContext.wholeTextFiles
读取一个目录下的所有文本文件,返回的是 (filename, content),
而textFile
则返回所有的行 - 其他Hadoop输入格式可以使用
SparkContext.hadoopRDD
方法。 - 其他基于
org.apache.hadoop.mapreduce
API 的输入格式可以通过SparkContext.newAPIHadoopRDD
方法创建 RDD.saveAsObjectFile
和SparkContext.objectFile
支持保存RDD为简单的序列化java对象。
RDD 操作
- 支持两种操作 map, reduce
- 变换:从一个已经存在的数据创建新的数据,如
map
,reduce
,reduceByKey
。所有的变换操作都是惰性求值,而且不保存
中间结果。如果重新计算,中间结果也会重新计算。如果需要保存中间结果可以通过RDD.persist()
方法指明保存该RDD。 - 传递函数给spark,不同的语言不同
- scala中可以通过以下几种方式
- 匿名函数
- 单例模式对象的一个静态方法
- 一个类的实例对象的一个成员方法,这种情况需要传递整个对象过去。同样,如果函数应用了外部的对象的一个域,那么也需要传递整个对象。
为了避免这个问题,可以创建该域的一个本地拷贝。
- scala中可以通过以下几种方式
class MyClass { val field = "Hello" def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) } } // 修改后的doStuff 函数 def doStuff(rdd: RDD[String]): RDD[String] = { val field_ = this.field rdd.map(x => field_ + x) }
- java, `org.apache.spark.api.java.function` 对象,或者java 8 的lambda表达式 - python, lambda表达式,本地函数,模块的顶级函数,对象的方法
- 重新分区,
repartition
会重新分配所有数据,如果是降低分区数目,可以用coalesce
,它会避免移动所有数据,
而只是移动丢弃的分区的数据,参考stackoverflow的讨论。
RDD持久化
持久化的两个方法 .cache()
和.persist(StorageLevel.SOME_LEVEL)
,存储级别有:
- MEMORY_ONLY : 默认级别,以 deserialized Java objects 保存在内存(JVM),内存放不下的部分每次也是重新计算
- MEMORY_AND_DISK : 保存在内存,放不下的放在磁盘
- MEMORY_ONLY_SER : 序列化后再保存在内存,放不下重新计算
- MEMORY_AND_DISK_SER :与上一个术语差异在于放不下的放磁盘
- DISK_ONLY : 只放磁盘
- MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. : 多保存一个备份
- OFF_HEAP (experimental) : Store RDD in serialized format in Tachyon
在python中都是用pickle序列化,只有这一种。
手动移除cache的方法是 RDD.unpersist()
,如果不手动移除,Spark 也会自动处理cache的。
理解闭包
- 在RDD的foreach中,对外部变量的引用实际上是复制了该对象到executor中,然后引用executor中的那个对像,所以不会改变本想引用的那个对象。
可以使用Accumulator
来实现改变主对象。 - 输出RDD到stdout,同样存在一个问题,在foreach和map中的prinln是输出到executor的stdout。可以通过
RDD.collect().foreach(println)
方法实现,
如果该只是打印一部分,可以通过RDD.take(100).foreach(println)
来实现。
KV值操作
- 由于KV类型可以是很多不同类型,通用的操作不多,最常用的是
shuffle
操作,例如 grouping 和 aggregating by key。 - 在spark中通过创建Tuple2对象实现K-V,例如在下述代码中
val lines = sc.textFile("data.txt") val pairs = lines.map(s => (s, 1)) val counts = pairs.reduceByKey((a, b) => a + b)
注意,在使用自定义的对象作为key的时候,需要确保.equals()
方法与hashCode()
方法兼容。
通用的变换
- map(func)
- filter(func)
- flatMap(func), 相当于先做map,然后做flat操作
- mapPartitions(func),map到每一个分区
- mapPartitionsWithIndex(func), 带有index的版本
- sample, 采样
- union,并集
- intersection,交集
- distinc, 去重
- groupByKey,输入(K,V),输出(K, Iter
) - reduceByKey(func),输入(K,V)
- aggregateByKey
- sortByKey
join(otherDataset [, numTasks])
, (K,V), (K,W) -> (K, (V,W))- cogroup
- cartesian 笛卡尔积?
- pipe
- coalesce
- repartition
略
Action
- reduce
- collect
- count
- first
- take(n)
- takeSample
- takeOrdered
- saveAsTextFile(path)
- saveAsSequenceFile(path), java and scala
- countByKey,对每一个key单独计数
- foreach(func)
共享变量
- broadcast变量,不同的executor共享
val broadcastVar = sc.broadcast(Array(1, 2, 3)) broadcastVar.value
优点在于,不同于简单复制,可以采用P2P协议来提升在多个节点之间复制的性能!对于很大的共享对象,性能提升很明显!
https://stackoverflow.com/questions/26884871/advantage-of-broadcast-variables
- Accumulator,
val accum = sc.accumulator(0, "My Accumulator") sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) accum.value
一般需要实现自己的AccumulatorParam子类,
object VectorAccumulatorParam extends AccumulatorParam[Vector] { def zero(initialValue: Vector): Vector = { Vector.zeros(initialValue.size) } def addInPlace(v1: Vector, v2: Vector): Vector = { v1 += v2 } } // Then, create an Accumulator of this type: val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)
提交spark任务
使用 bin/spark-submit 脚本提交,语法
./bin/spark-submit \ --class <main-class> \ --master <master-url> \ --deploy-mode <deploy-mode> \ --conf <key>=<value> \ ... # other options <application-jar> \ [application-arguments]
For Python applications, simply pass a .py file in the place of
instead of a JAR, and add Python .zip, .egg or .py files to the search path with --py-files.
Spark Streaming
简单地说,就是用来从其他地方拉数据的。
输入数据流 => Spark streaming => batches of input data => Spark engine => batches of processed data
Spark SQLContext,
- 从SparkContext创建
org.apache.spark.sql.SQLContext val sc: SparkContext // An existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc)
- 使用
.sql
函数进行SQL查询,Spark SQL支持的语法
SELECT [DISTINCT] [column names]|[wildcard] FROM [kesypace name.]table name [JOIN clause table name ON join condition] [WHERE condition] [GROUP BY column name] [HAVING conditions] [ORDER BY column names [ASC | DSC]]
如果使用join进行查询,则支持的语法为:
SELECT statement FROM statement [JOIN | INNER JOIN | LEFT JOIN | LEFT SEMI JOIN | LEFT OUTER JOIN | RIGHT JOIN | RIGHT OUTER JOIN | FULL JOIN | FULL OUTER JOIN] ON join condition
-
DataFrame
Spark DataFrame的设计灵感正是基于R与Pandas。
我们通过外部Json文件创建一个DataFrame:
val dataFrame = sqlContext.load("/example/data.json", "json") dataFrame.show()
With a SQLContext, applications can create DataFrames from an existing RDD, from a Hive table, or from data sources.
// Create the DataFrame val df = sqlContext.read.json("examples/src/main/resources/people.json") // Show the content of the DataFrame df.show() // age name // null Michael // 30 Andy // 19 Justin // Print the schema in a tree format df.printSchema() // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Select only the "name" column df.select("name").show() // name // Michael // Andy // Justin // Select everybody, but increment the age by 1 df.select(df("name"), df("age") + 1).show() // name (age + 1) // Michael null // Andy 31 // Justin 20 // Select people older than 21 df.filter(df("age") > 21).show() // age name // 30 Andy // Count people by age df.groupBy("age").count().show() // age count // null 1 // 19 1 // 30 1
- 直接在文件上运行SQL!
val df = sqlContext.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
- 注册UDF
sqlContext.udf.register("strLen", (s: String) => s.length())
MLlib
- 不同的包的特点,推荐
spark.ml
spark.mllib
contains the original API built on top of RDDs. 在2.0版本不在支持新特性了,不再维护。spark.ml
provides higher-level API built on top ofDataFrames
for constructing ML pipelines.
spark.ml 包
基础类
- 基于DataFrame,借助于抽象,将模型抽象为三个基本类,estimators(实现fit方法), transformers(实现transform方法), pipelines
- 一个正常的模型应该同时实现
fit
和transform
两个方法 transform
将生成一个新的DataFrame,包含了预测的结果fit
的DataFrame需要包含两列 featuresCol 和 labelCol 默认名字为 label-
transform
之前的DataFrame需要包含一列 featuresCol,默认名字为features,输出三列(依赖于参数),三列有默认名字,都可以通过setter函数进行设置。- predictedCol 预测的标签,默认名字为
prediction
- rawPredictedCol 预测的裸数据?向量?逻辑回归是
wx
貌似,默认名字为rawPrediction
- probabilityCol 预测的概率,默认名字为
probability
- predictedCol 预测的标签,默认名字为
-
模型参数封装类
Param
,他的一个常用子类是ParamMap
,实现了Map接口,可以通过get, put
进行操作。
在2.0版本开始,Spark对Estimators和Transformers提供统一的参数API。
val paramMap = ParamMap(lr.maxIter -> 20) .put(lr.maxIter, 30) // Specify 1 Param. This overwrites the original maxIter. .put(lr.regParam -> 0.1, lr.threshold -> 0.55) // Specify multiple Params.
paramMap = {lr.maxIter: 20} paramMap[lr.maxIter] = 30 # Specify 1 Param, overwriting the original maxIter. paramMap.update({lr.regParam: 0.1, lr.threshold: 0.55}) # Specify multiple Params.
pipeline
将不同模型(transform)堆叠起来,类似于sklearn里面的pipeline。
pipeline保存了一个Array[PipelineStage],可以通过.setStage(Array[_ <: PipelineStage])
函数进行设置。
pipeline实现了estimator的fit接口和transformer的transform接口。- 模型持久化
save, load
PipelineStage
抽象类,啥也没干????????!!!!!transformer
还是它的子类!!-
UnaryTransformer
单列转换对象,是transformer的子抽象类,也实现了pipelinestage接口。
有两个变量inputCol
和outputCol
代表输入输出列的名字。
有几个常用的实例,例如Tokenizer,HashingTF等。 -
模型的保存和加载,利用类的静态方法
.load
加载(MLReader的实现),而用实例的.save
方法(MLWriter的实现)保存模型到文件。 -
模型评估
Evaluator
(实现evaluate(dataFrame)
方法),RegressionEvaluator
回归,BinaryClassificationEvaluator
二元分类,
MulticlassClassificationEvaluator
多元分类。BinaryClassificationEvaluator
除了evaluate
方法之外,还有几个重要的属性和属性setter。标签列名labelCol
,度量名称metricName
默认为areaUnderROC,即AUC。rawPredictionCol
预测结果列名。以及相应的setter和getter。MulticlassClassificationEvaluator
,三个属性labelCol
,metricName
(supports "f1" (default), "precision", "recall", "weightedPrecision", "weightedRecall"),predictionCol
RegressionEvaluator
,三个属性labelCol
,metricName
("rmse" (default): root mean squared error, "mse": mean squared error, "r2": R2 metric, "mae": mean absolute error),predictionCol
-
交叉验证选择模型超参数。交叉验证
CrossValidator
类,有4个基本方法.setEstimator
.setEvaluator
.setEstimatorParamMaps(paramGrid)
参数网络.setNumFolds(k)
k-fold交叉验证的参数k
同是他也是一个estimator,调用它的fit
方法训练模型,返回训练好的模型CrossValidatorModel或模型序列。
他也是一个transformer,调用transform
方法直接执行多个transform。
-
训练集和测试集的分割
TrainValidationSplit
与交叉验证类类似,取代.setNumFolds
的是函数.setTrainRatio(ratio)
。 -
参数网格可以通过
ParamGridBuilder
对象创建,他有三个方法,addGrid(param, values:Array)
添加一个参数网格,
baseOn(paramPair)
设置指定参数为固定值,build()
方法返回一个Array[ParamMap]
数组
特征提取
- TF-IDF(HashingTF and IDF),传统的词统计是通过维护一个查找的词典(hash表或者查找树实现),
HashTF则是直接通过对特征计算hash函数映射到低维索引。还可以通过第二个hash函数确定是否存在冲突。
有什么优势???使用的类HashingTF, IDF, Tokenizer
- Word2Vec,低维词向量学习,对应的类:
Word2Vec
- CountVectorizer,直接统计:
CountVectorizer
特征变换
- Tokenizer:将文本转换为一个一个的词。例如中文分词就算一个,对于英文可以简单的用空白字符分割就行。可用的类有:
Tokenizer
常规TokenizerRegexTokenizer
正则式Tokenizer
- StopWordsRemover:停止词的移除。
StopWordsRemover
,可以通过setCaseSensitive
设置大小写敏感,
和setStopWords(value: Array[String])
设置停止词词典。 - n-gram:将输入的一串词转换为n-gram。
NGram
- Binarizer:通过阈值将数值特征变成二值特征。类
Binarizer
,主要方法:setThreshold
- PCA:PCA 降维。
PCA
,方法setK
- PolynomialExpansion:将特征展开为多项式特征,实现特征交叉。
x1,x2->x1^2,x2^2,x1x2
。PolynomialExpansion
方法:setDegree
- DCT:离散余弦变换。
DCT
StringIndexer
将字符串类型的变量(或者label)转换为索引序号,序号会按照频率排序,不是字典序。对于不在词典的string,默认抛出异常,也可以通过setHandleInvalid("skip")
直接丢弃。IndexToString
和StringIndexer
配合使用可以让字符串类型的变量的处理变得透明,这个是将index变成原来的字符串OneHotEncoder
:将单个数字转换为0-1编码的向量。1->(0,1,0,0)
。常用在类别特征的变换。VectorIndexer
:将输入向量中的类别特征自动编码为index。比较高端,需要学习一下!setMaxCategories(4)
表示特征的值数目超过4个就认为是连续特征,否则认为需要编码。Normalizer
:归一化。需要指定p-norm的值setP
。按照p范数归一化,默认为2。可以用在输出概率或score时归一化?StandardScaler
:标准化特征到方差为1,也可以将均值设为0. 方法:setWithStd(bool), setWithMean(bool)
MinMaxScaler
:归一化到0-1之间。也可以指定min和maxMaxAbsScaler
:@since(2.0.0),除以最大值的绝对值,从而将特征归一化到[-1,1]Bucketizer
:分桶。方法setSplits(splits)
来设置分割点,分割点需要严格递增。ElementwiseProduct
:对输入向量乘以一个权值。方法setScalingVec
设置权值。SQLTransformer
:让你用SQL语句进行变换特征。例子:
val sqlTrans = new SQLTransformer().setStatement( "SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
VectorAssembler
:将多个特征合并到一个向量,也可以合并向量。通过setInputCols(Array[String]
设置要合并的列。QuantileDiscretizer
:首先对特征采样,然后根据采样值将特征按照等量分桶(等频离散化)。基于采样,所以每次不同。方法setNumBuckets(i:Int)
设置桶的个数。
特征选择
VectorSlicer
:通过slice选择特征,人工指定indices。方法setIndices
设置选择的indices。字符串indices通过setNames
方法设置索引。RFormula
:通过R模型公式选择特征,例如clicked ~ country + hour
,输出列是默认是公式的响应变量名字。它会对字符串one-hot编码,对数值列转换为double类型。需要人工指定哪些特征。ChiSqSelector
:通过卡方独立性检验来选择特征。方法setNumTopFeatures
指定要选择的卡方值前多少个。
val labelIndexer = new StringIndexer() .setInputCol("label") .setOutputCol("indexedLabel") .fit(training) val rf = new RandomForestClassifier() .setPredictionCol("indexedPrediction") .setLabelCol("indexedLabel") setRFParam(rf, param) val labelConverter = new IndexToString() .setInputCol("indexedPrediction") .setOutputCol("prediction") .setLabels(labelIndexer.labels) val pipeline = new Pipeline() .setStages(Array(labelIndexer, rf, labelConverter))
分类 org.apache.spark.ml.classification
- 逻辑回归:
LogisticRegression
,参数maxIter, regParam, elasticNetParam
分别是最大迭代次数、正则项系数、elastic网的参数。 - 决策树:
DecisionTreeClassifier
- 随机森林:
RandomForestClassifier
- GBDT:
GBTClassifier
- 多层感知器(全连接神经网络):
MultilayerPerceptronClassifier
,setLayers
指定每层的节点数目。有没有预训练?需要研究一下!!! - One-vs-All:
OneVsRest
,将二分类变成多分类模型,采用One-vs-all策略。方法setClassifier
设置二分类器。 - 朴素贝叶斯:
NaiveBayes
,
回归 org.apache.spark.ml.regression
- 线性回归:
LinearRegression
- 广义线性回归:
GeneralizedLinearRegression
@since(2.0.0) - 决策树回归:
DecisionTreeRegressor
- 随机森林回归:
RandomForestRegressor
- GBDT回归:
GBTRegressor
- Survival regression:
AFTSurvivalRegression
,什么东西? - Isotonic 回归:
IsotonicRegression
,什么东西?
聚类 org.apache.spark.ml.clustering
KMeans
,K-means聚类,通过setK
设置类的数目。K-means++的分布式实现。LDA
:Latent Dirichlet allocationBisectingKMeans
:Bisecting k-means 聚类,@since(2.0.0) 不懂?GaussianMixture
:GMM 模型。@since(2.0.0)
协同过滤
ALS
:ALS算法,2.0才加到ml包里面,之前在mllib包。
DataFrame
DataFrame相当于 RDD[Row],而Row相当于一个可以包含各种不同数据的Seq。
DataFrame通过collect函数之后就是Array[Row]
通过工厂方法SQLContext.createDataFrame
创建DataFrame,可以从一下几个数据源创建
- 从
List(label, FeatureVector)
序列创建 - 从
JavaRDD
创建 - 从
RDD
创建 - 从
List[Row]
创建 - 从
RDD[Row]
创建
val training = sqlContext.createDataFrame(Seq( (1.0, Vectors.dense(0.0, 1.1, 0.1)), (0.0, Vectors.dense(2.0, 1.0, -1.0)), (0.0, Vectors.dense(2.0, 1.3, 1.0)), (1.0, Vectors.dense(0.0, 1.2, -0.5)) )).toDF("label", "features")
spark的DataFrame每一列可以存储向量!甚至图像!任意值都行!!
-
SQL操作
- select(col1, col2, ...) 选取部分列
- sample 采样
- sort 排序
- unionAll 融合其他表
- orderBy
- limit
- join 内连接
- groupyBy
- filter(sql表达式)
-
lazy val rdd 对象,可以通过RDD接口操作
-
df.sqlContext 可以访问创建该DataFrame 的SQLContext对象,rdd.sparkContext 可以访问创建RDD的SparkContext对象。
-
保存到磁盘
df.rdd.map { 转换操作 } .saveAsTextFile(filepath)
spark.mLlib
- LogisticRegressionWithLBFGS
- LogisticRegressionModel, 要
model.clearThreshold
predict才会输出概率,否则输出的是判决后的值
基本数据结构
- Vector, 可以通过工厂对象
Vectors
创建,普通向量Vectors.dense
,稀疏向量Vectors.sparse
,通过.toArray
方法转换为Array[Double]
- LabeledPoint, 二元组
(label:Double, features: Vector)
- Matrix, 可以通过工厂对象
Matrices
创建,普通矩阵Matrices.dense
,稀疏矩阵Matrices.sparse
- RowMatrix,前面的向量和矩阵都是存在单机中,这种和下面的矩阵是分布式存储的。
- IndexedRowMatrix,indexedrow是(long, vector)的包装使得index是有意义的
- CoordinateMatrix,
- BlockMatrix
val rows: RDD[Vector] = ... // an RDD of local vectors // Create a RowMatrix from an RDD[Vector]. val mat: RowMatrix = new RowMatrix(rows) val rows: RDD[IndexedRow] = ... // an RDD of indexed rows // Create an IndexedRowMatrix from an RDD[IndexedRow]. val mat: IndexedRowMatrix = new IndexedRowMatrix(rows)
模型评估
包名org.apache.spark.mllib.evaluation
- 两分类 BinaryClassificationMetrics
- 多分类 MulticlassMetrics
TIPS
使用log4j
package org.apache.log4j; public class Logger { // Creation & retrieval methods: public static Logger getRootLogger(); public static Logger getLogger(String name); // printing methods: public void trace(Object message); public void debug(Object message); public void info(Object message); public void warn(Object message); public void error(Object message); public void fatal(Object message); // generic printing method: public void log(Level l, Object message); } // 例子 import org.apache.log4j.Logger val log = Logger.getLogger(getClass.getName) log.info("info")
-
如果对RDD操作里面有随机的因素在里面,那么每次操作会不一样!!
-
Spark in Action [BOOK] https://zhangyi.gitbooks.io/spark-in-action
- Spark Programming Guide https://spark.apache.org/docs/latest/programming-guide.html