饭可以一日不吃,觉可以一日不睡,书不可以一日不读——毛泽东
通常,当程序计算出一个resultRDD时,我们想知道这个RDD中包含多少个分区, 以及每个分区中包含了那些record。我们可以使用mapPartionsWithIndex()来输出这些数据。
/**
* mapPartitiosWithIndex(func)
用法: rdd2=rdd1.mapPartitiosWithIndex(func)
语义: 对rdd1中每个分区进行func操作,输出新的一组数据
分区中的数据带有索引
* */
public static void mapPartitiosWithIndex()
{
//创建SparkConf
SparkConf conf = new SparkConf().setAppName("map").setMaster("local");
//创建javaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6,7,8,9), 3);
JavaRDD<String> mapPartitionsWithIndex = rdd
.mapPartitionsWithIndex(new Function2<Integer, Iterator<Integer>, Iterator<String>>() {
private static final long serialVersionUID = 1L;
@Override
public Iterator<String> call(Integer index, Iterator<Integer> iterator) throws Exception {
//return null;
List<String> list = new ArrayList<>();
Integer odd=0;
Integer even=0;
while (iterator.hasNext()) {
Integer value=iterator.next();
if(value%2==0)
even+=value;
else
odd+=value;
//list.add("partition" + index + ":" + iterator.next());
}
//将(pid,odd)存放到list中
list.add("partition:" + index + "," +"value:"+ odd);
//将(pid,odd)存放到list中
list.add("partition:" + index + "," +"value:"+ even);
return list.iterator();
}
}, true);
mapPartitionsWithIndex.foreach(x->System.out.println(x));
sc.stop();
}
答案: B
;
版权声明:
本文为智客工坊「楠木大叔」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
