大家好,欢迎来到IT知识分享网。
sparkStreaming实践:
案例一:
object Scala_List {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.spark-project").setLevel(Level.WARN)
if(args == null || args.length < 3){
println(
s"""
|请查看参数是否传入完整......
| batchInterval : Streaming作业拉取数据的批次
| hostname : 主机名
| port :端口
""".stripMargin)
System.exit(-1)
}
val Array(batchInterval,hostname,port) = args
val conf = new SparkConf().setMaster("local[*]").setAppName("sparkstreaming")
val sc = new SparkContext(conf)
val ssc: StreamingContext = new StreamingContext(sc,Seconds(batchInterval.toLong))
/** * def socketTextStream( * hostname: String, * port: Int, * storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 * ): ReceiverInputDStream[String] */
val input: ReceiverInputDStream[String]
= ssc.socketTextStream(hostname,port.toInt,StorageLevel.MEMORY_ONLY)
val flatMapDS: DStream[String] = input.flatMap(line => {
line.split(",")
})
val reduceByKeyDS: DStream[(String, Int)]
= flatMapDS.map((_,1)).reduceByKey(_+_)
reduceByKeyDS.print()
ssc.start() //启动程序
//start 之后不允许做高阶算子的操作
// flatMapDS.map()
ssc.awaitTermination()//等待程序结束
//如果想执行后面的打印,必须让程序结束 ssc.stop()
println("--------------------------")
}
}
本地调试的args参数不能被打包(如果想动态给程序设置参数,只能通过部署线上的脚本进行配置)
为什么要这样写程序?
1.分析哪些配置是需要改变的(简单的参数:参数比较少)
2.特质-类(常量)
代码段A:
常量1=10
常量2=30
代码段B【线上的代码】:
引用了常量
sparkStreaming本地模式 local【参数】这个参数和分区没有任何关系,表示当前程序执行分配几个线程。
配置文件、设计程序的入口
1.local —–local【1】分配一个线程
2.local【1】分配一个线程
3.local【2】分配两个线程
4.local【*】电脑有多少空闲线程就分配几个线程
sparkStreaming程序分为两个部分:
1.接收数据【receiver】【必须要分配一个线程】 2.处理数据
所以sparkStreaming至少要有两个线程
案例二:sparkStreaming从文件系统中读数据(两种:本地磁盘、HDFS)
只能监控目录(目录里只要有新增的文件就能监控到,.号开头的除外)
不能监控文件的增加(flume–>kafka—>sparkStreaming)
object T2 {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.spark-project").setLevel(Level.WARN)
val conf = new SparkConf().setMaster("local[2]").setAppName("sparkstreaming")
val sc = new SparkContext(conf)
val ssc: StreamingContext = new StreamingContext(sc, Seconds(4))
val input: DStream[String] = ssc.textFileStream("C:\\Users\\Administrator\\Desktop\\phaha")
val resDS: DStream[(String, Int)] = input.flatMap(_.split(",")).map((_, 1)).reduceByKey(_ + _)
resDS.print()
ssc.start()
ssc.awaitTermination()
}
}
object TT {
def main(args: Array[String]): Unit = {
val bw: BufferedWriter = new BufferedWriter(new FileWriter("C:\\Users\\Administrator\\Desktop\\phaha\\1.txt"))
bw.write("hh,aa")
bw.write("aa,hh")
bw.close()
}
}
数据如何进行处理
1.监听某个目录下的文件夹(只能监听里面新增的文件,不能监听到文件中新增的数据)
2.如果sparkStreaming程序重启的话,会忽略文件夹中所有的文件
3.如果文件以 . 开头会被忽略
案例:从hdfs中读数据
resources中添加hdfs.xml和core.xml
object T2 {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.spark-project").setLevel(Level.WARN)
val conf = new SparkConf().setMaster("local[2]").setAppName("sparkstreaming")
val sc = new SparkContext(conf)
val ssc: StreamingContext = new StreamingContext(sc, Seconds(4))
//数据如何处理
//只能监听某个目录下的文件夹(只能监听里面新增的文件,不能监听到文件中新增的数据)
val input: DStream[String] = ssc.textFileStream("hdfs://myha01/a3")//myha01代表hadoop02和hadoop03,在hdfsxml文件中配置
val resDS: DStream[(String, Int)] = input.flatMap(_.split(",")).map((_,1)).reduceByKey(_+_)
resDS.print()
ssc.start()
ssc.awaitTermination()
}
}
将linux目录下的文件put到hadoop/home/a3下能够监控到变化。
sparkContext和DStream
编程范式:
1.创建初始DStream
2.数据分析(高阶算子—转换成DF/DS)
3.启动程序(start)
4.等待程序结束(awaitTermination)
注意:
1.实时流程序如果提交了再关闭不能再提交,因为把SparkContext停了,再start没有开启SparkContext。
ssc.start()
ssc.stop()
//停StreamingContext之前先停SparkContext,SparkContext维护了driver类和stage划分(DAGScheduler),task分发这三个类的实例对象
ssc.start()
ssc.awaitTermination()
2.一个实例程序只能存在一个SparkContext对象(conf),一个SparkContext可以创建多个StreamingContext
val sc = new SparkContext(conf)
val sc1 = new SparkContext(conf)//不可以
3.ssc.stop(false)可以不关闭SparkContext,只关闭了StreamingContext。(默认是true,即手动关闭StreamingContext那么SparkContext也会被关闭,被关闭的话意味着stage不会划分,task不会分发。)
sparkstreaming整合kafka
sparkstreaming整合kafka的两种方式:Receiver、Direct
添加依赖:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
kafka偏移量存储在哪里:
生产者主动推送数据到kafka,数据、索引文件都落本地。偏移量会存到zookeeper里。
消费者找kafka消费数据,消费完会告诉zk自己是哪个组的并且消费了哪个主题的哪个分区消费到了哪个偏移量。(10,9,8,7,6,5,4,3,2,1 第一次次消费到了6第二次就知道从7开始消费)
本地只存储了当前的log日志和index偏移量,真正消费到了哪里是存储到zk的。
消费者当第二次启动时需要去查看从哪里开始消费,找本地的index,然后找log再开始消费。注意需要编码才可以从指定位置消费。
kafka是什么?
分布式的、持久化(不依赖hdfs只依赖磁盘)、容错(副本)、高并发、高吞吐量(同一时刻往kafka里写大量数据)的消息队列(可以有多个生产者同时写数据,多个消费者同时消费数据)
应用场景:消除峰值、异步
生产者:生产数据
消费者:消费数据,同组的消费者只能消费一次数据
broker:存在kafka实例(进程),当台节点就是broker
1.一个topic可以有多分区,一个分区的副本要存到不同的broker里
2.一个topic对应多个分区,partiton数量可以动态增加,增加数量可以提升读写并发
3.partiton物理文件:文件的命名:主题-分区号 里面存储了segment(index、log)
4.任何一条消息都有偏移量和数据,消息只能追加不能修改和不能手动删除(比如对某个消息删除),但是消息生命周期(log.retention.hours)(7天)到了可以删除。
一个消费者组可以有多个实例
每个消费者消费相互不影响,
一个消费者则内并行的消费partition,消费数据没重复
假如感觉消费者消费慢,不可以增加消费者,消费者上限取决于分区的个数(比如有4个分区,10个消费者,那有6个消费者没东西消费,多个消费者不能消费同一个分区),所以要先增加分区数再增加消费者。
消费者数量大于分区的数量会比较浪费,小于分区数量数据处理会有延迟(消费者消费能本地尽量本地不能本地就要跨服务)
消费者消费数据有三种语义:
至少一次
至多一次
仅一次语义
整合kafka
一、receiver方式整合kafka
receiver消费kafka不需要手动提交偏移量
至多一次:偏移量保存成功,数据消费失败(部分失败)
至少一次:因为网络延迟偏移量没有提交成功,数据消费了,
仅一次:数据仅消费一次
案例一: sparkStreamnig整合kafka
验证偏移量:
ls /consumers/1234/offsets/phaha/2
ls要查看到叶子节点才能get
get /consumers/1234/offsets/phaha/2
object Scala_List {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.spark-project").setLevel(Level.WARN)
val conf = new SparkConf().setMaster("local[*]").setAppName("sparkstreaming")
val sc = new SparkContext(conf)
val ssc: StreamingContext = new StreamingContext(sc,Seconds(6))
val zkQuorum = "hadoop02:2181,hadoop03:2181,hadoop04:2181"
val groupId = "1234"
val topics: Map[String, Int] = Map(
"phaha" -> 3
)
/** * def createStream( * ssc: StreamingContext, * zkQuorum: String, * groupId: String, * topics: Map[String, Int], * storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 * ): ReceiverInputDStream[(String, String)] * * ReceiverInputDStream[(String, String)] * tuple2[(String, String)] = * 第一个泛型只的是key的泛型 * 第二个泛型只的是value的泛型、 */
val input: ReceiverInputDStream[(String, String)] =
KafkaUtils.createStream(ssc,zkQuorum,groupId,topics)
val flatMapDS: DStream[String] = input.flatMap {
case (key, value) => {
value.split("\\s+")
}
}
val rbDS: DStream[(String, Int)] = flatMapDS.map((_,1)).reduceByKey(_+_)
rbDS.print()
ssc.start()
ssc.awaitTermination()
}
}
补充说明:
1.receiver只能保证读取kafka的数据一条不丢,但是保证不了重复消费的问题。
receiver会开启wal(预写日志),拉取数据要再写到日志中,一旦程序数据丢失会从预写日志进行恢复。
预写日志保存永久。(kafka有一个很大的不同,默认保存7天)
kafka先写内存再写磁盘,wal会异步的写磁盘数据
2.kafka中的主题分区与sparkstreaming中生成的RDD分区无关
注:假如分区数只有3个(并行度不管给多大,最多只有三个消费者)
3.KafkaUtils.createStream()
仅增加特定于主题的分区的数量就可以增加在单个接收器中使用哪些主题消耗线程的数量
val topics: Map[String, Int] = Map(“phaha” -> 3 )
4.如果启用了预写日志
设置为StorageLevel.MEMORY_AND_DISK_SER
二、Direct方式整合kafka
object Scala_List {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.spark-project").setLevel(Level.WARN)
val conf = new SparkConf().setMaster("local[*]").setAppName("sparkstreaming")
val sc = new SparkContext(conf)
val ssc: StreamingContext = new StreamingContext(sc,Seconds(6))
/**
* 当前消费在消费,挂掉了,故障转移之前,立即重启 【丢失】【基本见不到】
*/
val kafkaParams :Map[String, String] = Map(
"bootstrap.servers" -> "hadoop02:9092,hadoop03:9092,hadoop04:9092",
"auto.offset.reset" -> "largest",//largest | smallest
"group.id" -> "1234"
)
val topics : Set[String] = "phaha".split(",").toSet
/**
* def createDirectStream[
* K: ClassTag,
* V: ClassTag,
* KD <: Decoder[K]: ClassTag,
* VD <: Decoder[V]: ClassTag] (
* ssc: StreamingContext,
* kafkaParams: Map[String, String],
* topics: Set[String]
* ): InputDStream[(K, V)]
*/
val input: InputDStream[(String, String)] =
KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)
val flatMapDS: DStream[String] = input.flatMap {
case (key, value) => {
value.split("\\s+")
}
}
val rbDS: DStream[(String, Int)] = flatMapDS.map((_,1)).reduceByKey(_+_)
rbDS.print()
ssc.start()
ssc.awaitTermination()
}
}
Direct方式整合kafka有自动提交,给我们提供了一个手动提交的方式
Direct方式获取数据会带有当前消息的偏移量
简化的并行处理:Spark Streaming将创建与要使用的Kafka整合、Kafka和RDD分区之间存在一对一的映射
kafka数据拉取,假如kafka有三个分区,那么创建的DStream(RDD)默认就有三个分区,可以通过线上的资源调优改变并行度,但是没人做。
如果分区内的数据多了,kafka有个内置脚本,对kafka的分区可以进行重分区,间接改变RDD并行度。
效率:不开启预写日志(数据恢复从kafka本身进行恢复,所以配置写数据级别:内存+磁盘+序列化)
仅一次语义:
第一种方式靠检查点checkpoint,第二种方式手动维护偏移量(使用外部存储系统:mysql、zk、hbase)
kafka数据写内存,没有持久化磁盘数据就丢了所以要先生产者给kafka中生产的数据一条不丢(利用acks),才能保障消费者消费仅一次语义。
案例三:Direct方式保存
checkpoint方式:(老版本写法)
object Scala_List {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.spark-project").setLevel(Level.WARN)
val conf = new SparkConf().setMaster("local[*]").setAppName("sparkstreaming")
val sc = new SparkContext(conf)
//两次启动的进程使用的不是一个StreamingContext对象
val ssc: StreamingContext = new StreamingContext(sc,Seconds(6))
ssc.checkpoint("")
val kafkaParams :Map[String, String] = Map(
"bootstrap.servers" -> "hadoop02:9092,hadoop03:9092,hadoop04:9092",
"auto.offset.reset" -> "largest",//largest | smallest
"group.id" -> "1234"
)
val topics : Set[String] = "phaha".split(",").toSet
val input: InputDStream[(String, String)] =
KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)
val flatMapDS: DStream[String] = input.flatMap {
case (key, value) => {
value.split("\\s+")
}
}
val rbDS: DStream[(String, Int)] = flatMapDS.map((_,1)).reduceByKey(_+_)
rbDS.print()
ssc.start()
ssc.awaitTermination()
}
}
checkpoint方式:(新版本写法)
object Scala_List {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.spark-project").setLevel(Level.WARN)
val conf = new SparkConf().setMaster("local[*]").setAppName("sparkstreaming")
val sc = new SparkContext(conf)
val checkpoint = "C:\\Users\\LSG\\Desktop\\checkpoint"
def createFunc(): StreamingContext ={
val ssc: StreamingContext = new StreamingContext(sc,Seconds(6))
ssc.checkpoint(checkpoint) //设置检查点
val kafkaParams :Map[String, String] = Map(
"bootstrap.servers" -> "hadoop02:9092,hadoop03:9092,hadoop04:9092",
"auto.offset.reset" -> "smallest",//largest | smallest
"group.id" -> "1234"
)
val topics : Set[String] = "phaha".split(",").toSet
val input: InputDStream[(String, String)] =
KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)
val flatMapDS: DStream[String] = input.flatMap {
case (key, value) => {
value.split("\\s+")
}
}
val rbDS: DStream[(String, Int)] = flatMapDS.map((_,1)).reduceByKey(_+_)
rbDS.print()
ssc
}
val ssc: StreamingContext=StreamingContext.getOrCreate(checkpoint,createFunc)
ssc.start()
ssc.awaitTermination()
}
}
老版本如果Run两次会构建2次StreamingContext实例数据,产生的StreamingContext对象不是同一个,所以都会从头读数据。
新的方式:想让检查点生效,必须让多个进程之间使用的是同一个对象,对象流可以使不同的进程之间共享一个对象。使用StreamingContext.getOrCreate(checkpoint,createFunc)会先检查checkpoint是否存在 StreamingContext实例数据 ,如果存在则恢复StreamingContext。如果不存在使用后面的函数进行创建createFunc对象流由此可以让不同的进程之间共享一个对象。
补充说明:
checkpoints不足(真实开发中不用):实时批次很小的不用(微批)(0.1s,0.2s不用十几分钟的可以用),实时如果部署到线上要写hdfs路径,要在resource中传入两个资源文件,线上的微批会在hdfs上产生大量小文件。
案例四:采用外部存储系统(zookeeper)
Curator是对zookeeper的封装,提供高级api,让zk使用更加便捷。
zk提供了存储系统(znode)和监听机制(事件:NodeCreated、NodeDeleted、NodeChildrenChanged、NodeDataChanged api触发:getData(),getChilden()、exist() shell触发:create、delete、setdata 看表)
object Scala_List {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.spark-project").setLevel(Level.WARN)
val curator: CuratorFramework = {
val curator = CuratorFrameworkFactory.builder()
.connectString("hadoop02:2181,hadoop03:2181,hadoop04:2181")
.namespace("cc")//在什么目录下进行操作,默认是根目录
//一共重试三次,每间隔1s重试一次
.retryPolicy(new ExponentialBackoffRetry(1000,3))
.build()
//使用之前必须start
curator.start()
curator
}
//zk创建一个节点
// curator.create().forPath("aa","2020-09-11".getBytes())
//zk级联创建一个节点
// curator.create().creatingParentsIfNeeded().forPath("bb/haha/mmd","2020-09-11".getBytes())
//curator获取数据
println(new String(curator.getData.forPath("aa")))
//curator设置数据
curator.setData().forPath("aa","heihei".getBytes())
println(new String(curator.getData.forPath("aa")))
curator.delete()
val list: util.List[String] = curator.getChildren().forPath("")
val newList: mutable.Buffer[String] = JavaConversions.asScalaBuffer(curator.getChildren().forPath(""))
//java集合不能使用 /如果想遍历就行集合转换
for(path <- list){
// path 是否包含 cc | dd
}
curator.close()
}
}
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/14779.html