饭可以一日不吃,觉可以一日不睡,书不可以一日不读——毛泽东
通常,当程序计算出一个resultRDD时,我们想知道这个RDD中包含多少个分区, 以及每个分区中包含了那些record。我们可以使用mapPartionsWithIndex()来输出这些数据。
/** * mapPartitiosWithIndex(func) 用法: rdd2=rdd1.mapPartitiosWithIndex(func) 语义: 对rdd1中每个分区进行func操作,输出新的一组数据 分区中的数据带有索引 * */ public static void mapPartitiosWithIndex() { //创建SparkConf SparkConf conf = new SparkConf().setAppName("map").setMaster("local"); //创建javaSparkContext JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6,7,8,9), 3); JavaRDD<String> mapPartitionsWithIndex = rdd .mapPartitionsWithIndex(new Function2<Integer, Iterator<Integer>, Iterator<String>>() { private static final long serialVersionUID = 1L; @Override public Iterator<String> call(Integer index, Iterator<Integer> iterator) throws Exception { //return null; List<String> list = new ArrayList<>(); Integer odd=0; Integer even=0; while (iterator.hasNext()) { Integer value=iterator.next(); if(value%2==0) even+=value; else odd+=value; //list.add("partition" + index + ":" + iterator.next()); } //将(pid,odd)存放到list中 list.add("partition:" + index + "," +"value:"+ odd); //将(pid,odd)存放到list中 list.add("partition:" + index + "," +"value:"+ even); return list.iterator(); } }, true); mapPartitionsWithIndex.foreach(x->System.out.println(x)); sc.stop(); }
答案: B
;
版权声明:
本文为智客工坊「楠木大叔」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。