scala - Can only zip RDDs with same number of elements in each partition despite repartition -
i load dataset
val data = sc.textfile("/home/kybe/documents/datasets/img.csv",defp) i want put index on data thus
val nb = data.count.toint val tozip = sc.parallelize(1 nb).repartition(data.getnumpartitions) val res = tozip.zip(data) unfortunately have following error
can zip rdds same number of elements in each partition how can modify number of element partition if possible ?
why doesn't work?
the documentation zip() states:
zips rdd one, returning key-value pairs first element in each rdd, second element in each rdd, etc. assumes 2 rdds have same number of partitions , same number of elements in each partition (e.g. 1 made through map on other).
so need make sure meet 2 conditions:
- both rdds have same number of partitions
- respective partitions in rdds have same size
you making sure have same number of partitions repartition() spark doesn't guarantee have same distribution in each partition each rdd.
why that?
because there different types of rdds , of them have different partitioning strategies! example:
- parallelcollectionrdd created when parallelise collection
sc.parallelize(collection)see how many partitions there should be, check size of collection , calculatestepsize. i.e. have 15 elements in list , want 4 partitions, first 3 have 4 consecutive elements last 1 have remaining 3. - hadooprdd if remember correctly, 1 partition per file block. though using local file internally spark first creates kind of rdd when read local file , maps rdd since rdd pair rdd of
<long, text>, wantstring:-) - etc.etc.
in example spark internally create different types of rdds (coalescedrdd , shuffledrdd) while doing repartitioning think got global idea different rdds have different partitioning strategies :-)
notice last part of zip() doc mentions map() operation. operation not repartition it's narrow transformation data it would guarantee both conditions.
solution
in simple example mentioned can data.zipwithindex. if need more complicated creating new rdd zip() should created map() mentioned above.
Comments
Post a Comment