饭可以一日不吃,觉可以一日不睡,书不可以一日不读——毛泽东

通常,当程序计算出一个resultRDD时,我们想知道这个RDD中包含多少个分区, 以及每个分区中包含了那些record。我们可以使用mapPartionsWithIndex()来输出这些数据。

/**
     * mapPartitiosWithIndex(func)
     用法: rdd2=rdd1.mapPartitiosWithIndex(func)
     语义: 对rdd1中每个分区进行func操作,输出新的一组数据
          分区中的数据带有索引
     * */
  public static  void  mapPartitiosWithIndex()
  {
      //创建SparkConf
      SparkConf conf = new SparkConf().setAppName("map").setMaster("local");
      //创建javaSparkContext
      JavaSparkContext sc = new JavaSparkContext(conf);
      JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6,7,8,9), 3);
      JavaRDD<String> mapPartitionsWithIndex = rdd
              .mapPartitionsWithIndex(new Function2<Integer, Iterator<Integer>, Iterator<String>>() {

                  private static final long serialVersionUID = 1L;

                  @Override
                  public Iterator<String> call(Integer index, Iterator<Integer> iterator) throws Exception {
                      //return null;

                      List<String> list = new ArrayList<>();

                      Integer odd=0;
                      Integer even=0;

                      while (iterator.hasNext()) {
                          Integer value=iterator.next();
                          if(value%2==0)
                              even+=value;
                          else
                              odd+=value;

                          //list.add("partition" + index + ":" + iterator.next());
                      }

                      //将(pid,odd)存放到list中
                      list.add("partition:" + index + "," +"value:"+ odd);

                      //将(pid,odd)存放到list中
                      list.add("partition:" + index + "," +"value:"+ even);


                      return list.iterator();
                  }
              }, true);

      mapPartitionsWithIndex.foreach(x->System.out.println(x));
      sc.stop();
  }
答案: B
;
版权声明: 本文为智客工坊「楠木大叔」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。

results matching ""

    No results matching ""