kafka+streaming实例.txt


立即下载 谦逊的毛巾
2024-03-23
val String -> Set
909 B

object BashSpark { case class alert(raw: String) def main(args: Array[String]) { val confspark = new SparkConf() .setAppName("do Application") val sc = new SparkContext(confspark) val ssc = new StreamingContext(sc, Milliseconds(500)) // smallest largest val kafkaMapParams = Map( "zookeeper.connect" -> "", "zookeeper.session.timeout.ms" -> "40000", "auto.offset.reset" -> "largest", "group.id" -> "cb", "metadata.broker.list" -> "" ) val topicsSet = Set("boy") val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaMapParams, topicsSet) val ok= lines.foreachRDD(rdd => { rdd.map(_._2).foreachPartition( ub => { ub.foreach(b => { //业务逻辑 }) }) }) ssc.start() ssc.awaitTermination() }}

val/String/->/Set/ val/String/->/Set/
-1 条回复
登录 后才能参与评论
-->