Spark(6):RDD的创建和并行度

目录

0. 相关文章链接

1. 从集合(内存)中创建 RDD

2. 从外部存储(文件)创建 RDD

3. 从其他 RDD 创建

4. 直接创建 RDD(new)

5. RDD 并行度与分区


0. 相关文章链接

 Spark文章汇总 

1. 从集合(内存)中创建 RDD

从集合中创建 RDD, Spark 主要提供了两个方法: parallelize 和 makeRDD

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
val rdd1 = sparkContext.parallelize(
    List(1,2,3,4)
)
val rdd2 = sparkContext.makeRDD(
    List(1,2,3,4)
)
rdd1.collect().foreach(println)
rdd2.collect().foreach(println)
sparkContext.stop()

从底层代码实现来讲, makeRDD 方法其实就是 parallelize 方法

def makeRDD[T: ClassTag](
    seq: Seq[T],
    numSlices: Int = defaultParallelism): RDD[T] = withScope {
    parallelize(seq, numSlices)
}

2. 从外部存储(文件)创建 RDD

由外部存储系统的数据集创建 RDD 包括:本地的文件系统,所有 Hadoop 支持的数据集,比如 HDFS、 HBase 等

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
val fileRDD: RDD[String] = sparkContext.textFile("input")
fileRDD.collect().foreach(println)
sparkContext.stop()

3. 从其他 RDD 创建

主要是通过一个 RDD 运算完后,再产生新的 RDD。 

4. 直接创建 RDD(new)

使用 new 的方式直接构造 RDD,一般由 Spark 框架自身使用。

5. RDD 并行度与分区

        默认情况下, Spark 可以将一个作业切分多个任务后,发送给 Executor 节点并行计算,而能够并行计算的任务数量我们称之为并行度。这个数量可以在构建 RDD 时指定。 记住,这里的并行执行的任务数量,并不是指的切分任务的数量,不要混淆了。

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
val dataRDD: RDD[Int] = sparkContext.makeRDD(List(1,2,3,4),4)
val fileRDD: RDD[String] = sparkContext.textFile("input",2)
fileRDD.collect().foreach(println)
sparkContext.stop()

读取内存数据时,数据可以按照并行度的设定进行数据的分区操作,数据分区规则的Spark 核心源码如下:

def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
    (0 until numSlices).iterator.map { 
        i =>
            val start = ((i * length) / numSlices).toInt
            val end = (((i + 1) * length) / numSlices).toInt
            (start, end)
    }
}

读取文件数据时,数据是按照 Hadoop 文件读取的规则进行切片分区,而切片规则和数据读取的规则有些差异,具体 Spark 核心源码如下:

public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {

    long totalSize = 0; // compute total size
    for (FileStatus file: files) { // check we have valid files
        if (file.isDirectory()) {
        throw new IOException("Not a file: "+ file.getPath());
        }
        totalSize += file.getLen();
    }

    long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
    long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
    ...

    for (FileStatus file: files) {
        ...
        if (isSplitable(fs, path)) {
            long blockSize = file.getBlockSize();
            long splitSize = computeSplitSize(goalSize, minSize, blockSize);
        ...
    }

protected long computeSplitSize(long goalSize, long minSize, long blockSize) {
    return Math.max(minSize, Math.min(goalSize, blockSize));
}

注:其他Spark相关系列文章链接由此进 ->  Spark文章汇总