package com.spat
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext
/**
* Created by ThinkPad on 2017/5/20.
*/
object SparkStreaming
def main(args: Array[String]): Unit = {
System.setProperty("hadoop.home.dir", "D:\\hadoop-2.6.0");
//创建sparkContext对象
val conf = new SparkConf().setAppName("spark streaming ").setMaster("local[2]")
val context = new SparkContext(conf)
//创建sparkStreaming 对象
val streaming = new StreamingContext(context, Seconds(10)
val dataSource = streaming.textFileStream("hdfs://ip/user/aimcpro/flume_logs/word/")
val counst = dataSource.flatMap(_.split(",")).map(x => (x,1)).reduceByKey(_+_)
//count.saveAsTextFiles("hdfs://172.16.11.12:8020/user/aimcpro/flume_logs/out")
counst.print()
streaming.start()
//等待结束
streaming.awaitTermination()
}
}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext
/**
* Created by ThinkPad on 2017/5/20.
*/
object SparkStreaming
def main(args: Array[String]): Unit = {
System.setProperty("hadoop.home.dir", "D:\\hadoop-2.6.0");
//创建sparkContext对象
val conf = new SparkConf().setAppName("spark streaming ").setMaster("local[2]")
val context = new SparkContext(conf)
//创建sparkStreaming 对象
val streaming = new StreamingContext(context, Seconds(10)
val dataSource = streaming.textFileStream("hdfs://ip/user/aimcpro/flume_logs/word/")
val counst = dataSource.flatMap(_.split(",")).map(x => (x,1)).reduceByKey(_+_)
//count.saveAsTextFiles("hdfs://172.16.11.12:8020/user/aimcpro/flume_logs/out")
counst.print()
streaming.start()
//等待结束
streaming.awaitTermination()
}
}