安装
从Spark官网下载安装包,然后解压即可。非常简单
启动主机和worker
进入spark目录,然后运行脚本
./sbin/start-master.sh
即可。进程会在后台运行,你可以通过 http://localhost:8080 进行监控。
启动worker的脚本是
./bin/spark-class org.apache.spark.deploy.worker.Worker spark://IsP:PORT
其中IP和PORT可以在监控页面看到。
关闭worker很简单,直接关闭worker运行的shell或者ctr + c中断即可。
关闭主机需要运行脚本
./sbin/stop-master.sh
Spark shell
启动scala版的shell命令为./bin/spark-shell,python版的命令为./bin/pyspark
SparkContext
sc是spark的入口,通过SparkConf来创建它。
val sparkConf = new SparkConf().setAppName("FromPostgreSql") .setMaster("local[4]") .set("spark.executor.memory", "2g") val sc = new SparkCsontext(sparkConf)
对了,目前spark只支持的scala版本是2.10.x,所以用2.11.x版本可能会出错。
使用sc.stop()方法停止SparkContext。貌似不执行stop,本地用sbt run运行时会出现错误信息,
但是提交jar方式运行没问题。
参考https://stackoverflow.com/questions/28362341/error-utils-uncaught-exception-in-thread-sparklistenerbus.
- issue
- 使用
sbt run方式运行任务,如果涉及到saveAsTextFile操作时,会出错,原因未知。
- 使用
RDD
- RDD,全称为Resilient Distributed Datasets,是一个容错的、并行的数据结构,可以让用户显式地将数据存储到磁盘和内存中,并能控制数据的分区。
- in-memory cache.
cache() - RDD 常用操作
count()foreach,map,flatMap,filter,
- 并行化容器,可以通过
SparkContext.parallelize方法创建分布式便于并行计算的数据结构。也可以用来将scala的容器转换为RDD结构的tips
val data = Array(1,2,4,5,6,7) val distData = sc.parallelize(data)
- 从外部数据库创建,支持本地文件系统,HDFS,Cassandra, HBase, Amazon S3, 等。
支持的文件格式包括文本文件, SequenceFiles,其他Hadoop输入格式。
其中文本格式可以通过SparkContext.textFile(URI [, partition_number])方法创建RDD。- 支持本地文件和网络文件的URI,"/home/user/path-to-file", "hdfs://path-to-file"
- 支持文件夹,压缩文件,通配符等方式。例如"/path-to-file/*.gz", "/path-to-file/directory"
- 指定分区数目,每一个分区是64MB,默认创建一个分区。
- 也可以通过
SparkContext.wholeTextFiles读取一个目录下的所有文本文件,返回的是 (filename, content),
而textFile则返回所有的行 - 其他Hadoop输入格式可以使用
SparkContext.hadoopRDD方法。 - 其他基于
org.apache.hadoop.mapreduceAPI 的输入格式可以通过SparkContext.newAPIHadoopRDD方法创建 RDD.saveAsObjectFile和SparkContext.objectFile支持保存RDD为简单的序列化java对象。
RDD 操作
- 支持两种操作 map, reduce
- 变换:从一个已经存在的数据创建新的数据,如
map,reduce,reduceByKey。所有的变换操作都是惰性求值,而且不保存
中间结果。如果重新计算,中间结果也会重新计算。如果需要保存中间结果可以通过RDD.persist()方法指明保存该RDD。 - 传递函数给spark,不同的语言不同
- scala中可以通过以下几种方式
- 匿名函数
- 单例模式对象的一个静态方法
- 一个类的实例对象的一个成员方法,这种情况需要传递整个对象过去。同样,如果函数应用了外部的对象的一个域,那么也需要传递整个对象。
为了避免这个问题,可以创建该域的一个本地拷贝。
- scala中可以通过以下几种方式
class MyClass { val field = "Hello" def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) } } // 修改后的doStuff 函数 def doStuff(rdd: RDD[String]): RDD[String] = { val field_ = this.field rdd.map(x => field_ + x) }
- java, `org.apache.spark.api.java.function` 对象,或者java 8 的lambda表达式 - python, lambda表达式,本地函数,模块的顶级函数,对象的方法
- 重新分区,
repartition会重新分配所有数据,如果是降低分区数目,可以用coalesce,它会避免移动所有数据,
而只是移动丢弃的分区的数据,参考stackoverflow的讨论。
RDD持久化
持久化的两个方法 .cache()和.persist(StorageLevel.SOME_LEVEL),存储级别有:
- MEMORY_ONLY : 默认级别,以 deserialized Java objects 保存在内存(JVM),内存放不下的部分每次也是重新计算
- MEMORY_AND_DISK : 保存在内存,放不下的放在磁盘
- MEMORY_ONLY_SER : 序列化后再保存在内存,放不下重新计算
- MEMORY_AND_DISK_SER :与上一个术语差异在于放不下的放磁盘
- DISK_ONLY : 只放磁盘
- MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. : 多保存一个备份
- OFF_HEAP (experimental) : Store RDD in serialized format in Tachyon
在python中都是用pickle序列化,只有这一种。
手动移除cache的方法是 RDD.unpersist(),如果不手动移除,Spark 也会自动处理cache的。
理解闭包
- 在RDD的foreach中,对外部变量的引用实际上是复制了该对象到executor中,然后引用executor中的那个对像,所以不会改变本想引用的那个对象。
可以使用Accumulator来实现改变主对象。 - 输出RDD到stdout,同样存在一个问题,在foreach和map中的prinln是输出到executor的stdout。可以通过
RDD.collect().foreach(println)方法实现,
如果该只是打印一部分,可以通过RDD.take(100).foreach(println)来实现。
KV值操作
- 由于KV类型可以是很多不同类型,通用的操作不多,最常用的是
shuffle操作,例如 grouping 和 aggregating by key。 - 在spark中通过创建Tuple2对象实现K-V,例如在下述代码中
val lines = sc.textFile("data.txt") val pairs = lines.map(s => (s, 1)) val counts = pairs.reduceByKey((a, b) => a + b)
注意,在使用自定义的对象作为key的时候,需要确保.equals()方法与hashCode()方法兼容。
通用的变换
- map(func)
- filter(func)
- flatMap(func), 相当于先做map,然后做flat操作
- mapPartitions(func),map到每一个分区
- mapPartitionsWithIndex(func), 带有index的版本
- sample, 采样
- union,并集
- intersection,交集
- distinc, 去重
- groupByKey,输入(K,V),输出(K, Iter
) - reduceByKey(func),输入(K,V)
- aggregateByKey
- sortByKey
join(otherDataset [, numTasks]), (K,V), (K,W) -> (K, (V,W))- cogroup
- cartesian 笛卡尔积?
- pipe
- coalesce
- repartition
略
Action
- reduce
- collect
- count
- first
- take(n)
- takeSample
- takeOrdered
- saveAsTextFile(path)
- saveAsSequenceFile(path), java and scala
- countByKey,对每一个key单独计数
- foreach(func)
共享变量
- broadcast变量,不同的executor共享
val broadcastVar = sc.broadcast(Array(1, 2, 3)) broadcastVar.value
优点在于,不同于简单复制,可以采用P2P协议来提升在多个节点之间复制的性能!对于很大的共享对象,性能提升很明显!
https://stackoverflow.com/questions/26884871/advantage-of-broadcast-variables
- Accumulator,
val accum = sc.accumulator(0, "My Accumulator") sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) accum.value
一般需要实现自己的AccumulatorParam子类,
object VectorAccumulatorParam extends AccumulatorParam[Vector] { def zero(initialValue: Vector): Vector = { Vector.zeros(initialValue.size) } def addInPlace(v1: Vector, v2: Vector): Vector = { v1 += v2 } } // Then, create an Accumulator of this type: val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)
提交spark任务
使用 bin/spark-submit 脚本提交,语法
./bin/spark-submit \ --class <main-class> \ --master <master-url> \ --deploy-mode <deploy-mode> \ --conf <key>=<value> \ ... # other options <application-jar> \ [application-arguments]
For Python applications, simply pass a .py file in the place of
instead of a JAR, and add Python .zip, .egg or .py files to the search path with --py-files.
Spark Streaming
简单地说,就是用来从其他地方拉数据的。
输入数据流 => Spark streaming => batches of input data => Spark engine => batches of processed data
Spark SQLContext,
- 从SparkContext创建
org.apache.spark.sql.SQLContext val sc: SparkContext // An existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc)
- 使用
.sql函数进行SQL查询,Spark SQL支持的语法
SELECT [DISTINCT] [column names]|[wildcard] FROM [kesypace name.]table name [JOIN clause table name ON join condition] [WHERE condition] [GROUP BY column name] [HAVING conditions] [ORDER BY column names [ASC | DSC]]
如果使用join进行查询,则支持的语法为:
SELECT statement FROM statement [JOIN | INNER JOIN | LEFT JOIN | LEFT SEMI JOIN | LEFT OUTER JOIN | RIGHT JOIN | RIGHT OUTER JOIN | FULL JOIN | FULL OUTER JOIN] ON join condition
-
DataFrame
Spark DataFrame的设计灵感正是基于R与Pandas。
我们通过外部Json文件创建一个DataFrame:
val dataFrame = sqlContext.load("/example/data.json", "json") dataFrame.show()
With a SQLContext, applications can create DataFrames from an existing RDD, from a Hive table, or from data sources.
// Create the DataFrame val df = sqlContext.read.json("examples/src/main/resources/people.json") // Show the content of the DataFrame df.show() // age name // null Michael // 30 Andy // 19 Justin // Print the schema in a tree format df.printSchema() // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Select only the "name" column df.select("name").show() // name // Michael // Andy // Justin // Select everybody, but increment the age by 1 df.select(df("name"), df("age") + 1).show() // name (age + 1) // Michael null // Andy 31 // Justin 20 // Select people older than 21 df.filter(df("age") > 21).show() // age name // 30 Andy // Count people by age df.groupBy("age").count().show() // age count // null 1 // 19 1 // 30 1
- 直接在文件上运行SQL!
val df = sqlContext.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
- 注册UDF
sqlContext.udf.register("strLen", (s: String) => s.length())
MLlib
- 不同的包的特点,推荐
spark.mlspark.mllibcontains the original API built on top of RDDs. 在2.0版本不在支持新特性了,不再维护。spark.mlprovides higher-level API built on top ofDataFramesfor constructing ML pipelines.
spark.ml 包
基础类
- 基于DataFrame,借助于抽象,将模型抽象为三个基本类,estimators(实现fit方法), transformers(实现transform方法), pipelines
- 一个正常的模型应该同时实现
fit和transform两个方法 transform将生成一个新的DataFrame,包含了预测的结果fit的DataFrame需要包含两列 featuresCol 和 labelCol 默认名字为 label-
transform之前的DataFrame需要包含一列 featuresCol,默认名字为features,输出三列(依赖于参数),三列有默认名字,都可以通过setter函数进行设置。- predictedCol 预测的标签,默认名字为
prediction - rawPredictedCol 预测的裸数据?向量?逻辑回归是
wx貌似,默认名字为rawPrediction - probabilityCol 预测的概率,默认名字为
probability
- predictedCol 预测的标签,默认名字为
-
模型参数封装类
Param,他的一个常用子类是ParamMap,实现了Map接口,可以通过get, put进行操作。
在2.0版本开始,Spark对Estimators和Transformers提供统一的参数API。
val paramMap = ParamMap(lr.maxIter -> 20) .put(lr.maxIter, 30) // Specify 1 Param. This overwrites the original maxIter. .put(lr.regParam -> 0.1, lr.threshold -> 0.55) // Specify multiple Params.
paramMap = {lr.maxIter: 20} paramMap[lr.maxIter] = 30 # Specify 1 Param, overwriting the original maxIter. paramMap.update({lr.regParam: 0.1, lr.threshold: 0.55}) # Specify multiple Params.
pipeline将不同模型(transform)堆叠起来,类似于sklearn里面的pipeline。
pipeline保存了一个Array[PipelineStage],可以通过.setStage(Array[_ <: PipelineStage])函数进行设置。
pipeline实现了estimator的fit接口和transformer的transform接口。- 模型持久化
save, load PipelineStage抽象类,啥也没干????????!!!!!transformer还是它的子类!!-
UnaryTransformer单列转换对象,是transformer的子抽象类,也实现了pipelinestage接口。
有两个变量inputCol和outputCol代表输入输出列的名字。
有几个常用的实例,例如Tokenizer,HashingTF等。 -
模型的保存和加载,利用类的静态方法
.load加载(MLReader的实现),而用实例的.save方法(MLWriter的实现)保存模型到文件。 -
模型评估
Evaluator(实现evaluate(dataFrame)方法),RegressionEvaluator回归,BinaryClassificationEvaluator二元分类,
MulticlassClassificationEvaluator多元分类。BinaryClassificationEvaluator除了evaluate方法之外,还有几个重要的属性和属性setter。标签列名labelCol,度量名称metricName默认为areaUnderROC,即AUC。rawPredictionCol预测结果列名。以及相应的setter和getter。MulticlassClassificationEvaluator,三个属性labelCol,metricName(supports "f1" (default), "precision", "recall", "weightedPrecision", "weightedRecall"),predictionColRegressionEvaluator,三个属性labelCol,metricName("rmse" (default): root mean squared error, "mse": mean squared error, "r2": R2 metric, "mae": mean absolute error),predictionCol
-
交叉验证选择模型超参数。交叉验证
CrossValidator类,有4个基本方法.setEstimator.setEvaluator.setEstimatorParamMaps(paramGrid)参数网络.setNumFolds(k)k-fold交叉验证的参数k
同是他也是一个estimator,调用它的fit方法训练模型,返回训练好的模型CrossValidatorModel或模型序列。
他也是一个transformer,调用transform方法直接执行多个transform。
-
训练集和测试集的分割
TrainValidationSplit与交叉验证类类似,取代.setNumFolds的是函数.setTrainRatio(ratio)。 -
参数网格可以通过
ParamGridBuilder对象创建,他有三个方法,addGrid(param, values:Array)添加一个参数网格,
baseOn(paramPair)设置指定参数为固定值,build()方法返回一个Array[ParamMap]数组
特征提取
- TF-IDF(HashingTF and IDF),传统的词统计是通过维护一个查找的词典(hash表或者查找树实现),
HashTF则是直接通过对特征计算hash函数映射到低维索引。还可以通过第二个hash函数确定是否存在冲突。
有什么优势???使用的类HashingTF, IDF, Tokenizer - Word2Vec,低维词向量学习,对应的类:
Word2Vec - CountVectorizer,直接统计:
CountVectorizer
特征变换
- Tokenizer:将文本转换为一个一个的词。例如中文分词就算一个,对于英文可以简单的用空白字符分割就行。可用的类有:
Tokenizer常规TokenizerRegexTokenizer正则式Tokenizer
- StopWordsRemover:停止词的移除。
StopWordsRemover,可以通过setCaseSensitive设置大小写敏感,
和setStopWords(value: Array[String])设置停止词词典。 - n-gram:将输入的一串词转换为n-gram。
NGram - Binarizer:通过阈值将数值特征变成二值特征。类
Binarizer,主要方法:setThreshold - PCA:PCA 降维。
PCA,方法setK - PolynomialExpansion:将特征展开为多项式特征,实现特征交叉。
x1,x2->x1^2,x2^2,x1x2。PolynomialExpansion方法:setDegree - DCT:离散余弦变换。
DCT StringIndexer将字符串类型的变量(或者label)转换为索引序号,序号会按照频率排序,不是字典序。对于不在词典的string,默认抛出异常,也可以通过setHandleInvalid("skip")直接丢弃。IndexToString和StringIndexer配合使用可以让字符串类型的变量的处理变得透明,这个是将index变成原来的字符串OneHotEncoder:将单个数字转换为0-1编码的向量。1->(0,1,0,0)。常用在类别特征的变换。VectorIndexer:将输入向量中的类别特征自动编码为index。比较高端,需要学习一下!setMaxCategories(4)表示特征的值数目超过4个就认为是连续特征,否则认为需要编码。Normalizer:归一化。需要指定p-norm的值setP。按照p范数归一化,默认为2。可以用在输出概率或score时归一化?StandardScaler:标准化特征到方差为1,也可以将均值设为0. 方法:setWithStd(bool), setWithMean(bool)MinMaxScaler:归一化到0-1之间。也可以指定min和maxMaxAbsScaler:@since(2.0.0),除以最大值的绝对值,从而将特征归一化到[-1,1]Bucketizer:分桶。方法setSplits(splits)来设置分割点,分割点需要严格递增。ElementwiseProduct:对输入向量乘以一个权值。方法setScalingVec设置权值。SQLTransformer:让你用SQL语句进行变换特征。例子:
val sqlTrans = new SQLTransformer().setStatement( "SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
VectorAssembler:将多个特征合并到一个向量,也可以合并向量。通过setInputCols(Array[String]设置要合并的列。QuantileDiscretizer:首先对特征采样,然后根据采样值将特征按照等量分桶(等频离散化)。基于采样,所以每次不同。方法setNumBuckets(i:Int)设置桶的个数。
特征选择
VectorSlicer:通过slice选择特征,人工指定indices。方法setIndices设置选择的indices。字符串indices通过setNames方法设置索引。RFormula:通过R模型公式选择特征,例如clicked ~ country + hour,输出列是默认是公式的响应变量名字。它会对字符串one-hot编码,对数值列转换为double类型。需要人工指定哪些特征。ChiSqSelector:通过卡方独立性检验来选择特征。方法setNumTopFeatures指定要选择的卡方值前多少个。
val labelIndexer = new StringIndexer() .setInputCol("label") .setOutputCol("indexedLabel") .fit(training) val rf = new RandomForestClassifier() .setPredictionCol("indexedPrediction") .setLabelCol("indexedLabel") setRFParam(rf, param) val labelConverter = new IndexToString() .setInputCol("indexedPrediction") .setOutputCol("prediction") .setLabels(labelIndexer.labels) val pipeline = new Pipeline() .setStages(Array(labelIndexer, rf, labelConverter))
分类 org.apache.spark.ml.classification
- 逻辑回归:
LogisticRegression,参数maxIter, regParam, elasticNetParam分别是最大迭代次数、正则项系数、elastic网的参数。 - 决策树:
DecisionTreeClassifier - 随机森林:
RandomForestClassifier - GBDT:
GBTClassifier - 多层感知器(全连接神经网络):
MultilayerPerceptronClassifier,setLayers指定每层的节点数目。有没有预训练?需要研究一下!!! - One-vs-All:
OneVsRest,将二分类变成多分类模型,采用One-vs-all策略。方法setClassifier设置二分类器。 - 朴素贝叶斯:
NaiveBayes,
回归 org.apache.spark.ml.regression
- 线性回归:
LinearRegression - 广义线性回归:
GeneralizedLinearRegression@since(2.0.0) - 决策树回归:
DecisionTreeRegressor - 随机森林回归:
RandomForestRegressor - GBDT回归:
GBTRegressor - Survival regression:
AFTSurvivalRegression,什么东西? - Isotonic 回归:
IsotonicRegression,什么东西?
聚类 org.apache.spark.ml.clustering
KMeans,K-means聚类,通过setK设置类的数目。K-means++的分布式实现。LDA:Latent Dirichlet allocationBisectingKMeans:Bisecting k-means 聚类,@since(2.0.0) 不懂?GaussianMixture:GMM 模型。@since(2.0.0)
协同过滤
ALS:ALS算法,2.0才加到ml包里面,之前在mllib包。
DataFrame
DataFrame相当于 RDD[Row],而Row相当于一个可以包含各种不同数据的Seq。
DataFrame通过collect函数之后就是Array[Row]
通过工厂方法SQLContext.createDataFrame创建DataFrame,可以从一下几个数据源创建
- 从
List(label, FeatureVector)序列创建 - 从
JavaRDD创建 - 从
RDD创建 - 从
List[Row]创建 - 从
RDD[Row]创建
val training = sqlContext.createDataFrame(Seq( (1.0, Vectors.dense(0.0, 1.1, 0.1)), (0.0, Vectors.dense(2.0, 1.0, -1.0)), (0.0, Vectors.dense(2.0, 1.3, 1.0)), (1.0, Vectors.dense(0.0, 1.2, -0.5)) )).toDF("label", "features")
spark的DataFrame每一列可以存储向量!甚至图像!任意值都行!!
-
SQL操作
- select(col1, col2, ...) 选取部分列
- sample 采样
- sort 排序
- unionAll 融合其他表
- orderBy
- limit
- join 内连接
- groupyBy
- filter(sql表达式)
-
lazy val rdd 对象,可以通过RDD接口操作
-
df.sqlContext 可以访问创建该DataFrame 的SQLContext对象,rdd.sparkContext 可以访问创建RDD的SparkContext对象。
-
保存到磁盘
df.rdd.map { 转换操作 } .saveAsTextFile(filepath)
spark.mLlib
- LogisticRegressionWithLBFGS
- LogisticRegressionModel, 要
model.clearThresholdpredict才会输出概率,否则输出的是判决后的值
基本数据结构
- Vector, 可以通过工厂对象
Vectors创建,普通向量Vectors.dense,稀疏向量Vectors.sparse,通过.toArray方法转换为Array[Double] - LabeledPoint, 二元组
(label:Double, features: Vector) - Matrix, 可以通过工厂对象
Matrices创建,普通矩阵Matrices.dense,稀疏矩阵Matrices.sparse - RowMatrix,前面的向量和矩阵都是存在单机中,这种和下面的矩阵是分布式存储的。
- IndexedRowMatrix,indexedrow是(long, vector)的包装使得index是有意义的
- CoordinateMatrix,
- BlockMatrix
val rows: RDD[Vector] = ... // an RDD of local vectors // Create a RowMatrix from an RDD[Vector]. val mat: RowMatrix = new RowMatrix(rows) val rows: RDD[IndexedRow] = ... // an RDD of indexed rows // Create an IndexedRowMatrix from an RDD[IndexedRow]. val mat: IndexedRowMatrix = new IndexedRowMatrix(rows)
模型评估
包名org.apache.spark.mllib.evaluation
- 两分类 BinaryClassificationMetrics
- 多分类 MulticlassMetrics
TIPS
使用log4j
package org.apache.log4j; public class Logger { // Creation & retrieval methods: public static Logger getRootLogger(); public static Logger getLogger(String name); // printing methods: public void trace(Object message); public void debug(Object message); public void info(Object message); public void warn(Object message); public void error(Object message); public void fatal(Object message); // generic printing method: public void log(Level l, Object message); } // 例子 import org.apache.log4j.Logger val log = Logger.getLogger(getClass.getName) log.info("info")
-
如果对RDD操作里面有随机的因素在里面,那么每次操作会不一样!!
-
Spark in Action [BOOK] https://zhangyi.gitbooks.io/spark-in-action
- Spark Programming Guide https://spark.apache.org/docs/latest/programming-guide.html