scala - Can only zip RDDs with same number of elements in each partition despite repartition -


i load dataset

val data = sc.textfile("/home/kybe/documents/datasets/img.csv",defp) 

i want put index on data thus

val nb = data.count.toint val tozip = sc.parallelize(1 nb).repartition(data.getnumpartitions)  val res = tozip.zip(data) 

unfortunately have following error

can zip rdds same number of elements in each partition 

how can modify number of element partition if possible ?

why doesn't work?

the documentation zip() states:

zips rdd one, returning key-value pairs first element in each rdd, second element in each rdd, etc. assumes 2 rdds have same number of partitions , same number of elements in each partition (e.g. 1 made through map on other).

so need make sure meet 2 conditions:

  • both rdds have same number of partitions
  • respective partitions in rdds have same size

you making sure have same number of partitions repartition() spark doesn't guarantee have same distribution in each partition each rdd.

why that?

because there different types of rdds , of them have different partitioning strategies! example:

  • parallelcollectionrdd created when parallelise collection sc.parallelize(collection) see how many partitions there should be, check size of collection , calculate step size. i.e. have 15 elements in list , want 4 partitions, first 3 have 4 consecutive elements last 1 have remaining 3.
  • hadooprdd if remember correctly, 1 partition per file block. though using local file internally spark first creates kind of rdd when read local file , maps rdd since rdd pair rdd of <long, text> , want string :-)
  • etc.etc.

in example spark internally create different types of rdds (coalescedrdd , shuffledrdd) while doing repartitioning think got global idea different rdds have different partitioning strategies :-)

notice last part of zip() doc mentions map() operation. operation not repartition it's narrow transformation data it would guarantee both conditions.

solution

in simple example mentioned can data.zipwithindex. if need more complicated creating new rdd zip() should created map() mentioned above.


Comments

Popular posts from this blog

java - Run spring boot application error: Cannot instantiate interface org.springframework.context.ApplicationListener -

python - pip wont install .WHL files -

Excel VBA "Microsoft Windows Common Controls 6.0 (SP6)" Location Changes -