Spark

安装

从Spark官网下载安装包,然后解压即可。非常简单

启动主机和worker

进入spark目录,然后运行脚本

./sbin/start-master.sh

即可。进程会在后台运行,你可以通过 http://localhost:8080 进行监控。

启动worker的脚本是

./bin/spark-class org.apache.spark.deploy.worker.Worker spark://IsP:PORT

其中IP和PORT可以在监控页面看到。

关闭worker很简单,直接关闭worker运行的shell或者ctr + c中断即可。
关闭主机需要运行脚本

./sbin/stop-master.sh

Spark shell

启动scala版的shell命令为./bin/spark-shell,python版的命令为./bin/pyspark

SparkContext

sc是spark的入口,通过SparkConf来创建它。

val sparkConf = new SparkConf().setAppName("FromPostgreSql")
  .setMaster("local[4]")
  .set("spark.executor.memory", "2g")
val sc = new SparkCsontext(sparkConf)

对了,目前spark只支持的scala版本是2.10.x,所以用2.11.x版本可能会出错。

使用sc.stop()方法停止SparkContext。貌似不执行stop,本地用sbt run运行时会出现错误信息,
但是提交jar方式运行没问题。
参考https://stackoverflow.com/questions/28362341/error-utils-uncaught-exception-in-thread-sparklistenerbus.

RDD

val data = Array(1,2,4,5,6,7)
val distData = sc.parallelize(data)

RDD 操作

class MyClass {
  val field = "Hello"
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}

// 修改后的doStuff 函数
def doStuff(rdd: RDD[String]): RDD[String] = {
  val field_ = this.field
  rdd.map(x => field_ + x)
}
- java,  `org.apache.spark.api.java.function` 对象,或者java 8 的lambda表达式
- python, lambda表达式,本地函数,模块的顶级函数,对象的方法

RDD持久化

持久化的两个方法 .cache().persist(StorageLevel.SOME_LEVEL),存储级别有:

在python中都是用pickle序列化,只有这一种。
手动移除cache的方法是 RDD.unpersist(),如果不手动移除,Spark 也会自动处理cache的。

理解闭包

KV值操作

val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)

注意,在使用自定义的对象作为key的时候,需要确保.equals()方法与hashCode()方法兼容。

通用的变换

Action

共享变量

val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar.value

优点在于,不同于简单复制,可以采用P2P协议来提升在多个节点之间复制的性能!对于很大的共享对象,性能提升很明显!
https://stackoverflow.com/questions/26884871/advantage-of-broadcast-variables

val accum = sc.accumulator(0, "My Accumulator")
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
accum.value

一般需要实现自己的AccumulatorParam子类,

object VectorAccumulatorParam extends AccumulatorParam[Vector] {
  def zero(initialValue: Vector): Vector = {
    Vector.zeros(initialValue.size)
  }
  def addInPlace(v1: Vector, v2: Vector): Vector = {
    v1 += v2
  }
}

// Then, create an Accumulator of this type:
val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)

提交spark任务

使用 bin/spark-submit 脚本提交,语法

./bin/spark-submit \
  --class <main-class> \
  --master <master-url> \
  --deploy-mode <deploy-mode> \
  --conf <key>=<value> \
  ... # other options
  <application-jar> \
  [application-arguments]

For Python applications, simply pass a .py file in the place of instead of a JAR, and add Python .zip, .egg or .py files to the search path with --py-files.

Spark Streaming

简单地说,就是用来从其他地方拉数据的。
输入数据流 => Spark streaming => batches of input data => Spark engine => batches of processed data

Spark SQLContext,

org.apache.spark.sql.SQLContext
val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
SELECT [DISTINCT] [column names]|[wildcard]
FROM [kesypace name.]table name
[JOIN clause table name ON join condition]
[WHERE condition]
[GROUP BY column name]
[HAVING conditions]
[ORDER BY column names [ASC | DSC]]

如果使用join进行查询,则支持的语法为:

SELECT statement
FROM statement
[JOIN | INNER JOIN | LEFT JOIN | LEFT SEMI JOIN | LEFT OUTER JOIN | RIGHT JOIN | RIGHT OUTER JOIN | FULL JOIN | FULL OUTER JOIN]
ON join condition

-

DataFrame

Spark DataFrame的设计灵感正是基于R与Pandas。
我们通过外部Json文件创建一个DataFrame:

val dataFrame = sqlContext.load("/example/data.json", "json")
dataFrame.show()

With a SQLContext, applications can create DataFrames from an existing RDD, from a Hive table, or from data sources.

// Create the DataFrame
val df = sqlContext.read.json("examples/src/main/resources/people.json")

// Show the content of the DataFrame
df.show()
// age  name
// null Michael
// 30   Andy
// 19   Justin

// Print the schema in a tree format
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show()
// name
// Michael
// Andy
// Justin

// Select everybody, but increment the age by 1
df.select(df("name"), df("age") + 1).show()
// name    (age + 1)
// Michael null
// Andy    31
// Justin  20

// Select people older than 21
df.filter(df("age") > 21).show()
// age name
// 30  Andy

// Count people by age
df.groupBy("age").count().show()
// age  count
// null 1
// 19   1
// 30   1
val df = sqlContext.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
sqlContext.udf.register("strLen", (s: String) => s.length())

MLlib

spark.ml 包

基础类

val paramMap = ParamMap(lr.maxIter -> 20)
  .put(lr.maxIter, 30) // Specify 1 Param.  This overwrites the original maxIter.
  .put(lr.regParam -> 0.1, lr.threshold -> 0.55) // Specify multiple Params.
paramMap = {lr.maxIter: 20}
paramMap[lr.maxIter] = 30 # Specify 1 Param, overwriting the original maxIter.
paramMap.update({lr.regParam: 0.1, lr.threshold: 0.55}) # Specify multiple Params.

特征提取

特征变换

val sqlTrans = new SQLTransformer().setStatement(
  "SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")

特征选择

val labelIndexer = new StringIndexer()
    .setInputCol("label")
    .setOutputCol("indexedLabel")
    .fit(training)
val rf = new RandomForestClassifier()
    .setPredictionCol("indexedPrediction")
    .setLabelCol("indexedLabel")
setRFParam(rf, param)
val labelConverter = new IndexToString()
    .setInputCol("indexedPrediction")
    .setOutputCol("prediction")
    .setLabels(labelIndexer.labels)
val pipeline = new Pipeline()
    .setStages(Array(labelIndexer, rf, labelConverter))

分类 org.apache.spark.ml.classification

回归 org.apache.spark.ml.regression

聚类 org.apache.spark.ml.clustering

协同过滤

DataFrame

DataFrame相当于 RDD[Row],而Row相当于一个可以包含各种不同数据的Seq。
DataFrame通过collect函数之后就是Array[Row]

通过工厂方法SQLContext.createDataFrame创建DataFrame,可以从一下几个数据源创建

val training = sqlContext.createDataFrame(Seq(
  (1.0, Vectors.dense(0.0, 1.1, 0.1)),
  (0.0, Vectors.dense(2.0, 1.0, -1.0)),
  (0.0, Vectors.dense(2.0, 1.3, 1.0)),
  (1.0, Vectors.dense(0.0, 1.2, -0.5))
)).toDF("label", "features")

spark的DataFrame每一列可以存储向量!甚至图像!任意值都行!!

df.rdd.map { 转换操作 } .saveAsTextFile(filepath)

spark.mLlib

基本数据结构

val rows: RDD[Vector] = ... // an RDD of local vectors
// Create a RowMatrix from an RDD[Vector].
val mat: RowMatrix = new RowMatrix(rows)


val rows: RDD[IndexedRow] = ... // an RDD of indexed rows
// Create an IndexedRowMatrix from an RDD[IndexedRow].
val mat: IndexedRowMatrix = new IndexedRowMatrix(rows)

模型评估

包名org.apache.spark.mllib.evaluation

TIPS

使用log4j

package org.apache.log4j;

  public class Logger {

    // Creation & retrieval methods:
    public static Logger getRootLogger();
    public static Logger getLogger(String name);

    // printing methods:
    public void trace(Object message);
    public void debug(Object message);
    public void info(Object message);
    public void warn(Object message);
    public void error(Object message);
    public void fatal(Object message);

    // generic printing method:
    public void log(Level l, Object message);
}

// 例子
import org.apache.log4j.Logger
val log = Logger.getLogger(getClass.getName)
log.info("info")