网页资讯视频图片知道文库贴吧地图采购
进入贴吧全吧搜索

 
 
 
日一二三四五六
       
       
       
       
       
       

签到排名:今日本吧第个签到,

本吧因你更精彩,明天继续来努力!

本吧签到人数:0

一键签到
成为超级会员,使用一键签到
一键签到
本月漏签0次!
0
成为超级会员,赠送8张补签卡
如何使用?
点击日历上漏签日期,即可进行补签。
连续签到:天  累计签到:天
0
超级会员单次开通12个月以上,赠送连续签到卡3张
使用连续签到卡
05月22日漏签0天
spark吧 关注:5,585贴子:14,119
  • 看贴

  • 吧主推荐

  • 游戏

  • 3回复贴,共1页
<<返回spark吧
>0< 加载中...

大数据之spark离线学习篇

  • 取消只看楼主
  • 收藏

  • 回复
  • 爽朗的wanghy
  • 核心吧友
    7
该楼层疑似违规已被系统折叠 隐藏此楼查看此楼
RDD及其特点
1、RDD是Spark的核心数据模型,但是个抽象类,全称为Resillient Distributed Dataset,即弹性分布式数据集。
2、RDD在抽象上来说是一种元素集合,包含了数据。它是被分区的,分为多个分区,每个分区分布在集群中的不同节点上,从而让RDD中的数据可以被并行操作。(分布式数据集)
3、RDD通常通过Hadoop上的文件,即HDFS文件或者Hive表,来进行创建;有时也可以通过应用程序中的集合来创建。
4、RDD最重要的特性就是,提供了容错性,可以自动从节点失败中恢复过来。即如果某个节点上的RDDpartition,因为节点故障,导致数据丢了,那么RDD会自动通过自己的数据来源重新计算该partition。这一切对使用者是透明的。
5、RDD的数据默认情况下存放在内存中的,但是在内存资源不足时,Spark会自动将RDD数据写入磁盘。(弹性)
创建RDD
进行Spark核心编程的第一步就是创建一个初始的RDD。该RDD,通常就代表和包含了Spark应用程序的输入源数据。然后通过Spark Core提供的transformation算子,对该RDD进行转换,来获取其他的RDD。
Spark Core提供了三种创建RDD的方式:
1.使用程序中的集合创建RDD(主要用于测试)
List<Integer> numbers = Arrays.asList(1,2,3,4,5,6,7,8,9,10);JavaRDD<Integer> numbersRDD = sc.parallelize(numbers);
2.使用本地文件创建RDD(主要用于临时性处理有大量数据的文件)
SparkSession spark = SparkSession.builder().master("local").appName("WordCountLocal").getOrCreate();JavaRDD<String> lines = spark.read().textFile("D:\\Users\\Administrator\\Desktop\\spark.txt").javaRDD();
3.使用HDFS文件创建RDD(生产环境的常用方式)
SparkSession spark = SparkSession.builder().appName("WordCountCluster").getOrCreate();JavaRDD<String> lines = spark.read().textFile("hdfs://h0:9000/spark.txt").javaRDD();
使用HDFS文件创建RDD对比使用本地文件创建RDD,需要修改的,只有两个地方:
第一,将SparkSession对象的master("local")方法去掉
第二,我们针对的不是本地文件了,修改为hadoop hdfs上的真正的存储大数据的文件
操作RDD
Spark支持两种RDD操作:transformation和action。
transformation操作
transformation操作会针对已有的RDD创建一个新的RDD。transformation具有lazy特性,即transformation不会触发spark程序的执行,它们只是记录了对RDD所做的操作,不会自发的执行。只有执行了一个action,之前的所有transformation才会执行。
常用的transformation介绍:
map :将RDD中的每个元素传人自定义函数,获取一个新的元素,然后用新的元素组成新的RDD。
filter:对RDD中每个元素进行判断,如果返回true则保留,返回false则剔除。
flatMap:与map类似,但是对每个元素都可以返回一个或多个元素。
groupByKey:根据key进行分组,每个key对应一个Iterable<value>。
reduceByKey:对每个key对应的value进行reduce操作。
sortByKey:对每个key对应的value进行排序操作。
join:对两个包含<key,value>对的RDD进行join操作,每个keyjoin上的pair,都会传入自定义函数进行处理。
cogroup:同join,但是每个key对应的Iterable<value>都会传入自定义函数进行处理。
领取视频资料加 群 593- - 188--- 212



  • 爽朗的wanghy
  • 核心吧友
    7
该楼层疑似违规已被系统折叠 隐藏此楼查看此楼
小案例实战1案例需求:
1、对文本文件内的每个单词都统计出其出现的次数。
2、按照每个单词出现次数的数量,降序排序。
步骤:
1.创建RDD
2.将文本进行拆分 (flatMap)
3.将拆分后的单词进行统计 (mapToPair,reduceByKey)
4.反转键值对 (mapToPair)
5.按键升序排序 (sortedByKey)
6.再次反转键值对 (mapToPair)
7.打印输出(foreach)
Java版本jdk1.8以下
public class SortWordCount { public static void main(String[] args) throws Exception { SparkConf conf = new SparkConf().setAppName("SortWordCount").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); // 创建lines RDD JavaRDD<String> lines = sc.textFile("D:\\Users\\Administrator\\Desktop\\spark.txt"); // 将文本分割成单词RDD JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @override public Iterator<String> call(String s) throws Exception { return Arrays.asList(s.split(" ")).iterator(); } }); //将单词RDD转换为(单词,1)键值对RDD JavaPairRDD<String,Integer> wordPair = words.mapToPair(new PairFunction<String, String,Integer>() { @Override public Tuple2<String,Integer> call(String s) throws Exception { return new Tuple2<String,Integer>(s,1); } }); //对wordPair 进行按键计数 JavaPairRDD<String,Integer> wordCount = wordPair.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) throws Exception { return integer +integer2; } }); // 到这里为止,就得到了每个单词出现的次数 // 我们的新需求,是要按照每个单词出现次数的顺序,降序排序 // wordCounts RDD内的元素是这种格式:(spark, 3) (hadoop, 2) // 因此我们需要将RDD转换成(3, spark) (2, hadoop)的这种格式,才能根据单词出现次数进行排序 // 进行key-value的反转映射 JavaPairRDD<Integer,String> countWord = wordCount.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() { @Override public Tuple2<Integer, String> call(Tuple2<String, Integer> s) throws Exception { return new Tuple2<Integer, String>(s._2,s._1); } }); // 按照key进行排序 JavaPairRDD<Integer, String> sortedCountWords = countWord.sortByKey(false); // 再次将value-key进行反转映射 JavaPairRDD<String,Integer> sortedWordCount = sortedCountWords.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() { @Override public Tuple2<String, Integer> call(Tuple2<Integer, String> s) throws Exception { return new Tuple2<String, Integer>(s._2,s._1); } }); // 到此为止,我们获得了按照单词出现次数排序后的单词计数 // 打印出来 sortedWordCount.foreach(new VoidFunction<Tuple2<String, Integer>>() { @Override public void call(Tuple2<String, Integer> s) throws Exception { System.out.println("word \""+s._1+"\" appears "+ s._2+" times."); } }); sc.close(); }}
Java版本jdk1.8
可以使用lambda表达式,简化代码:
public class SortWordCount { public static void main(String[] args) throws Exception { SparkConf conf = new SparkConf().setAppName("SortWordCount").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); // 创建lines RDD JavaRDD<String> lines = sc.textFile("D:\\Users\\Administrator\\Desktop\\spark.txt"); JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator()); JavaPairRDD<String,Integer> wordPair = words.mapToPair(word -> new Tuple2<>(word,1)); JavaPairRDD<String,Integer> wordCount = wordPair.reduceByKey((a,b) ->(a+b)); JavaPairRDD<Integer,String> countWord = wordCount.mapToPair(word -> new Tuple2<>(word._2,word._1)); JavaPairRDD<Integer,String> sortedCountWord = countWord.sortByKey(false); JavaPairRDD<String,Integer> sortedWordCount = sortedCountWord.mapToPair(word -> new Tuple2<>(word._2,word._1)); sortedWordCount.foreach(s->System.out.println("word \""+s._1+"\" appears "+ s._2+" times.")); sc.close(); }}
scala版本
由于spark2 有了统一切入口SparkSession,在这里就使用了SparkSession。
package cn.spark.study.coreimport org.apache.spark.sql.SparkSessionobject SortWordCount { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("SortWordCount").master("local").getOrCreate() val lines = spark.sparkContext.textFile("D:\\Users\\Administrator\\Desktop\\spark.txt") val words = lines.flatMap{line => line.split(" ")} val wordCounts = words.map{word => (word,1)}.reduceByKey(_ + _) val countWord = wordCounts.map{word =>(word._2,word._1)} val sortedCountWord = countWord.sortByKey(false) val sortedWordCount = sortedCountWord.map{word => (word._2, word._1)} sortedWordCount.foreach(s=> { println("word \""+s._1+ "\" appears "+s._2+" times.") }) spark.stop() }}


2025-05-22 12:17:19
广告
  • 爽朗的wanghy
  • 核心吧友
    7
该楼层疑似违规已被系统折叠 隐藏此楼查看此楼


  • 爽朗的wanghy
  • 核心吧友
    7
该楼层疑似违规已被系统折叠 隐藏此楼查看此楼


登录百度账号

扫二维码下载贴吧客户端

下载贴吧APP
看高清直播、视频!
  • 贴吧页面意见反馈
  • 违规贴吧举报反馈通道
  • 贴吧违规信息处理公示
  • 3回复贴,共1页
<<返回spark吧
分享到:
©2025 Baidu贴吧协议|隐私政策|吧主制度|意见反馈|网络谣言警示