1 概述
spark Streaming是Spark core API的扩展,支持实时数据流的处理,并且具有可扩展,高吞吐量,容错的特点。 数据可以从许多来源获取,如kafka,flume,Kinesis或TCP sockets,并且可以使用复杂的算法进行处理,这些算法使用诸如map,reduce,join和window等高级函数表示。 最后,处理后的数据可以推送到文件系统,数据库等。 实际上,您可以将Spark的机器学习和图形处理算法应用于数据流。
总的来说我们可以从三点进行考虑:输入—–计算—–输出。正如下图所示:
1. 输入:可以从Kafka,Flume,HDFS等获取数据
2. 计算:我们可以通过map,reduce,join等一系列算子通过spark计算引擎进行计算(基本和RDD一样,使用起来更方便。)
3. 输出:可以输出到HDFS,数据库,hbase等。
2 处理数据的特点
在内部,它的工作原理如下。 Spark Streaming接收实时输入数据流并将数据分成批,然后由Spark引擎处理,以批量生成最终结果流。
从图中也能看出它将输入的数据分成多个batch进行处理,严格来说spark streaming 并不是一个真正的实时框架,因为他是分批次进行处理的。
Spark Streaming提供了一个高层抽象,称为discretized stream或DStream,它表示连续的数据流。 DStream可以通过Kafka,Flume和Kinesis等来源的输入数据流创建,也可以通过在其他DStream上应用高级操作来创建。在内部,DStream表示为一系列RDD。
3 wordcount代码演示进行进一步认识
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SocketWordCountApp {
def main(args: Array[String]): Unit = {
//创建SparkConf
val conf=new SparkConf().setAppName("SocketWordCountApp").setMaster("local[2]")
//通过conf 得到StreamingContext,底层就是创建了一个SparkContext
val ssc=new StreamingContext(conf,Seconds(5))
//通过socketTextStream创建一个DSteam,可以看出这里返回的是ReceiverInputDStream[T],后面从源码进行分析
val DStream=ssc.socketTextStream("192.168.137.130",9998)
//wc (看看是不是和RDD中的wc一样呢)
DStream.flatMap(x=>x.split(",")).map(x=>(x,1)).reduceByKey(_+_).print()
// 开始计算
ssc.start()
// 等待计算结束
ssc.awaitTermination()
}
}
我们在通过nc命令像端口9998输入数据;
[hadoop@hadoop ~]$ nc -lp 9998
a,a,a,a,b,b,b
查看结果
(b,3)
(a,4)
4 初始化StreamingContext
要初始化Spark Streaming程序,必须创建一个StreamingContext对象,它是所有Spark Streaming功能的主要入口点。StreamingContext对象也可以从现有的SparkContext对象创建。
val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))
当一个Context被定义,你必须做以下的事情:
1. 通过定义输入DStream来创建输入源。
2. 通过在DStream上应用转换操作和输出操作来定义流计算。
3. 使用StreamContext.start()开始来接收数据和处理数据。
4. 使用StreamContext.awaitTermination()来等待计算完成(手动或者因错误终止)。
5. 可以StreamContext.stop()来手动停止计算(一般不会停止)。
注意
a.一旦一个StreamingContext被启动,就不能再设置或添加新的流计算。
b.一旦一个StreamingContext被停止,就不能重新启动。
c.同一时间内,在JVM内部只有一个StreamingContext处于活跃状态。
d.默认情况下使用stop()方法停止StreamingContext的同时也会停止SparkContext,如果执行停止
StreamingContext,可以将stop()的可选参数设置为false。
e.SparkContext可以复用,即用来创建多个StreamingContext,只要在创建新的StreamingContext时,
之前创建的StreamingContext是处于stop状态即可(SparkContext没有被停止)。
5 Discretized Streams (DStreams)
Discretized Streams或DStream是Spark Streaming提供的基本抽象。 它表示连续的数据流,即从源接收的输入数据流或通过转换输入流生成的已处理数据流。 在内部,DStream由连续的RDD系列表示,这是Spark对不可变的分布式数据集的抽象。 DStream中的每个RDD都包含来自特定时间间隔的数据,如下图所示。
在DStream上应用的任何操作都会转化为对每个RDD的操作。例如,wordcount案例中(下面会进行代码演示),flatMap操作应用于DStream行中的每个RDD,以生成单词DStream的RDD。 这在下图中显示。
这些基础RDD转换由Spark引擎计算。 DStream操作隐藏了大部分这些细节,并为开发人员提供了更高级别的API以方便使用。 这些操作将在后面的章节中详细讨论。
6 spark streaming架构
我们应该知道spark有很多种运行模式,下面通过spark on yarn (cluster模式)的模式图进行介绍,所以想要对spark streaming的运行架构进行理解,你要知道在yarn上提交作业的流程(可以参考该篇博客),以及spark的运行流程(参考该篇博客),下面是我在网上找的一幅图,我们根据这幅图 进行一个学习:
下面对于这幅图进行详细的剖析:
符号表示:1,2,3….代表Spar on Yarn启动流程 ;(1)(2)(3)….代表Spark Streaming执行过程。
1. 通过spark client提交作业到RM
2. ResouceManager为该作业分配第一个Container,并与对应的NodeManager通信,创建Spark ApplicationMaster(每个SparkContext都有一个ApplicationMaster)
3. NodeManager启动Spark AppMaster。
4. Spark AppMaster并向ResourceManager AsM注册,用户就可以通过UI查看作业情况。
5. ResourceManager通知NodeManager分配Container。(每个container的对应一个executor)
6. NodeManager准备资源,并分配给executor。
Spark Streaming执行过程
spark on Yarn模式Driver运行在NM的container之中,运行application的main()函数并自动创建SparkContext,在SparkContext之上会创建一个 StreamingContext(因为途中并没有标出,这里说明下)。
(1)SparkContext向资源管理器注册并申请运行Executor资源;
(2)Executor会启动Receive接收数据(Data Received),分批处理。
(3)Receive接收到数据后汇报给streamingcontext(底层调用的sparkcontext),他会以多个副本存储,默认两个(后面进行源码解读就知道了。)
(4)Spark ApplicationMaster和executor(container)进行交互,分配task。
(5)每个executor上会运行多个task执行任务。
最后把结果保存在HDFS上。
Saprk Streaming数据处理过程
首先,Spark Streaming把实时输入数据流以时间片Δt (如1秒)为单位切分成块。Spark Streaming会把每块数据作为一个RDD,并使用RDD操作处理每一小块数据。每个块都会生成一个Spark Job处理,最终结果也返回多块。这里也和上面所说的一致。
7 源码解析
前面我们说了StreamingContext,底层就是创建了一个SparkContext,我们从源码中进行证明:
new StreamingContext(conf,Seconds(5))
/**
* Create a StreamingContext by providing the configuration necessary for a new SparkContext.
* @param conf a org.apache.spark.SparkConf object specifying Spark parameters
* @param batchDuration the time interval at which streaming data will be divided into batches
*/
def this(conf: SparkConf, batchDuration: Duration) = {
this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
}
我们new StreamingContext(conf,Seconds(5)) 其实调用的上面的方法,
1.传递一个SparkConf应该不陌生把,指定Spark参数的org.apache.spark.SparkConf对象;
2.Duration流式数据分成批次的时间间隔
我们 接着看看这句话StreamingContext.createNewSparkContext(conf)
private[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = {
new SparkContext(conf)
}
可以看到底层就是给我们创建了一个SparkContext
通过socketTextStream创建一个DSteam,可以看出这里返回的是ReceiverInputDStream[T],从源码进行分析:
ssc.socketTextStream("192.168.137.130",9998)
/**
* Creates an input stream from TCP source hostname:port. Data is received using
* a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited
* lines.
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
* @param storageLevel Storage level to use for storing the received objects
* (default: StorageLevel.MEMORY_AND_DISK_SER_2)
* @see [[socketStream]]
*/
def socketTextStream(
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}
1.通过tcp socket监听hostname:port,接受字节(UTF8编码,`\ n`分隔)
2.这里我们看到了默认的存储级别StorageLevel.MEMORY_AND_DISK_SER_2,因为他是一个默认参数,所以我们直接使用了默认的就木有传递。(和前面对应了吧)
(还有一个问题,还记得spark中缓存级别吗???)
3.返回ReceiverInputDStream
4.调用了socketStream方法
继续查看socketStream方法
/**
* Creates an input stream from TCP source hostname:port. Data is received using
* a TCP socket and the receive bytes it interpreted as object using the given
* converter.
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
* @param converter Function to convert the byte stream to objects
* @param storageLevel Storage level to use for storing the received objects
* @tparam T Type of the objects received (after converting bytes to objects)
*/
def socketStream[T: ClassTag](
hostname: String,
port: Int,
converter: (InputStream) => Iterator[T],
storageLevel: StorageLevel
): ReceiverInputDStream[T] = {
new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
}
这些参数大家应该明白了吧,我们继续看看SocketInputDStream[T]到底是什么吧。
class SocketInputDStream[T: ClassTag](
_ssc: StreamingContext,
host: String,
port: Int,
bytesToObjects: InputStream => Iterator[T],
storageLevel: StorageLevel
) extends ReceiverInputDStream[T](_ssc) {
SocketInputDStream这个类继承了ReceiverInputDStream,感觉快看到希望了啊。
/**
* Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]]
* that has to start a receiver on worker nodes to receive external data.
* Specific implementations of ReceiverInputDStream must
* define [[getReceiver]] function that gets the receiver object of type
* [[org.apache.spark.streaming.receiver.Receiver]] that will be sent
* to the workers to receive data.
* @param _ssc Streaming context that will execute this input stream
* @tparam T Class type of the object of this stream
*/
abstract class ReceiverInputDStream[T: ClassTag](_ssc: StreamingContext)
extends InputDStream[T](_ssc) {
1.ReceiverInputDStream是一个抽象列继承了InputDStream,必须在工作节点上启动接收器才能接收外部数据,
2.ReceiverInputDStream 通过getReceiver函数获取类型的接收器对象,即org.apache.spark.streaming.receiver.Receiver,
3.Receiver的作用是接受数据。
InputDStream[T](_ssc)
是什么呢?
/**
* This is the abstract base class for all input streams. This class provides methods
* start() and stop() which are called by Spark Streaming system to start and stop
* receiving data, respectively.
* Input streams that can generate RDDs from new data by running a service/thread only on
* the driver node (that is, without running a receiver on worker nodes), can be
* implemented by directly inheriting this InputDStream. For example,
* FileInputDStream, a subclass of InputDStream, monitors a HDFS directory from the driver for
* new files and generates RDDs with the new files. For implementing input streams
* that requires running a receiver on the worker nodes, use
* [[org.apache.spark.streaming.dstream.ReceiverInputDStream]] as the parent class.
*
* @param _ssc Streaming context that will execute this input stream
*/
abstract class InputDStream[T: ClassTag](_ssc: StreamingContext)
extends DStream[T](_ssc) {
1.这是所有输入流的抽象基类。这个类提供了方法由Spark Streaming系统调用的start()和stop()来启动和停止
接收数据。
2.输入流会被差分成多个rdd,运行在每一个线程中。
3.驱动程序节点(即,不在工作节点上运行接收器)可以 通过直接继承此InputDStream实现。例如,
FileInputDStream,InputDStream的一个子类。产生一个新文件或者生成多个rdd都会产生新文件,
这是通过driver监视HDFS目录
4.用于实现输入流需要在工作节点上运行接收器,请使用
[[org.apache.spark.streaming.dstream.ReceiverInputDStream]]作为父类。
这里我们可以看到InputDStream继承DStream,终于看到了DStream,全村人的希望啊.
**
* A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
* sequence of RDDs (of the same type) representing a continuous stream of data (see
* org.apache.spark.rdd.RDD in the Spark core documentation for more details on RDDs).
* DStreams can either be created from live data (such as, data from TCP sockets, Kafka, Flume,
* etc.) using a [[org.apache.spark.streaming.StreamingContext]] or it can be generated by
* transforming existing DStreams using operations such as `map`,
* `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each DStream
* periodically generates a RDD, either from live data or by transforming the RDD generated by a
* parent DStream.
*
* This class contains the basic operations available on all DStreams, such as `map`, `filter` and
* `window`. In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains
* operations available only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and
* `join`. These operations are automatically available on any DStream of pairs
* (e.g., DStream[(Int, Int)] through implicit conversions.
*
* A DStream internally is characterized by a few basic properties:
* - A list of other DStreams that the DStream depends on
* - A time interval at which the DStream generates an RDD
* - A function that is used to generate an RDD after each time interval
*/
abstract class DStream[T: ClassTag] (
@transient private[streaming] var ssc: StreamingContext
) extends Serializable with Logging {
这一大段注释,,头大。。。
其实就是我们前面说的输入-----计算-----输出:
可以通过TCP socket,Kafka,Flume 输入数据,转化为DStream,
通过一些算子 `map`, `filter` and`window`进行计算
DStream内部具有几个基本属性:
* - DStream依赖的其他DStream列表(个人感觉就是RDD之间的依赖关系)
* - DStream生成RDD的时间间隔(可自行设置)
* - 每个时间间隔后用于生成RDD的函数(生成多个RDD)
终于从头到尾给看完了哈,进行了一个简单的介绍,不知道小伙伴有没有理解呢。
8 输入数据流
从源端接受的数据代表输入数据流,通过接受输入数据会产生一个DStream,例如我们上面进行的wc中val DStream=ssc.socketTextStream("192.168.137.130",9998)这句代码接收数据后返回一个DStream, 每个输入DStream都与Receiver对象相关联,Receiver对象从源接收数据并将其存储在Spark的内存中进行处理。
Spark Streaming提供了两类内置streaming sources。
基本来源:StreamingContext API中直接可用的来源。 示例:文件系统和socket connections。
高级来源:可通过额外的实用程序课程获得Kafka,Flume,Kinesis等来源。 这些要求链接部分中讨论的额外依赖关系。(后面我们会进行讲解)
注意,如果您想在流式传输应用程序中并行接收多个数据流,则可以创建多个输入DStream(在性能调整部分中进一步讨
论)。 这将创建多个接收器,它将同时接收多个数据流。 但请注意,Spark worker / executor是一项长期运行的任
务,因此它占用了分配给Spark Streaming应用程序的核心之一。 因此,重要的是要记住,Spark Streaming应用程序
需要分配足够的内核(或线程,如果在本地运行)来处理接收到的数据以及运行接收器。
对local进行进一步理解
1. local :用一个工作线程在本地运行Spark
2. local[K]:使用K工作线程在本地运行Spark(理想情况下,将其设置为您计算机上的核心数)。
3. local[K,F]:使用K工作线程,最多为F在本地运行Spark(可以通过参数spark.task.maxFailures设置)
4. local[*]:使用与您的计算机上的逻辑内核一样多的工作线程在本地运行Spark。
在本地运行Spark Streaming程序时,请勿使用“local”或“local [1]”作为主URL。 这两者中的任何一个都意味着只有一个线程将用于本地运行任务。 如果您使用的是基于接收器的输入DStream(例如套接字,Kafka,Flume等),那么receive接收器对象将占用一个线程,那就意味着没有足够的线程来处理数据。因此,在本地运行时,请始终用“local [n]”作为主URL,其中n>要运行的接收器的数量。在群集上运行,分配给Spark Streaming应用程序的内核数量必须多于接收器的数量。 否则系统将接收数据,但无法处理它。
原文:https://blog.csdn.net/yu0_zhang0/article/details/80569946
发表评论:
◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。