Spark Streaming

简介

Spark Streaming 将流数据按照时间离散化,每单位时间一个batch!这是和其他流处理系统不同的地方。
好处是效率更高,缺点是牺牲了一定的实时性。

streaming-flow

StreamingContext

import org.apache.spark._
import org.apache.spark.streaming._

val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))

每一个 Batch 是一个DStream对象,一个 DStream 对象实际上是一些列的RDD。

dstream

Input DStreams and Receivers

基本源

DStreams 的变换操作

UpdateStateByKey 操作

用于维护一个持续的状态,状态可以是任意数据类型。

def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
    val newCount = ...  // add the new values with the previous running count to get the new count
    Some(newCount)
}
val runningCounts = pairs.updateStateByKey[Int](updateFunction _)

Transform 操作

直接对rdd进行操作

val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information

val cleanedDStream = wordCounts.transform { rdd =>
  rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
  ...
}

Window 操作

window

// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))

join 操作