scala - how to avoid spark NumberFormatException: null -
i have general question derived specific exception have encountered.
i'm querying data dataproc using spark 1.6. need 1 day of data (~10000 files) 2 logs , transformations.
however, data may (or may not) have bad data after not succeeding in full day query, tried hour 00-09 , got no error. tried hour 10-19 , got exception. tried hour hour , found out bad data in hour:10. hour 11 , 12 fine
basically code is:
val imps = sqlcontext.read.format("com.databricks.spark.csv").option("header", "false").option("inferschema", "true").load("gs://logs.xxxx.com/2016/03/14/xxxxx/imps/2016-03-14-10*").select("c0","c18","c7","c9","c33","c29","c63").registertemptable("imps") val conv = sqlcontext.read.format("com.databricks.spark.csv").option("header", "false").option("inferschema", "true").load("gs://logs.xxxx.com/2016/03/14/xxxxx/conv/2016-03-14-10*").select("c0","c18","c7","c9","c33","c29","c65").registertemptable("conversions") val ff = sqlcontext.sql("select * (select * imps) inner join (select * conversions) b on a.c0=b.c0 , a.c7=b.c7 , a.c18=b.c18 ").coalesce(16).write.format("com.databricks.spark.csv").save("gs://xxxx-spark-results/newsparkresults/plara2.6mar14_10_1/") {over - simplified}
the error is:
org.apache.spark.sparkexception: job aborted due stage failure: task 38 in stage 130.0 failed 4 times, recent failure: lost task 38.3 in stage 130.0 (tid 88495, plara26-0317-0001-sw-v8oc.c.xxxxx-analytics.internal): java.lang.numberformatexception: null @ java.lang.integer.parseint(integer.java:542) @ java.lang.integer.parseint(integer.java:615) @ scala.collection.immutable.stringlike$class.toint(stringlike.scala:229) @ scala.collection.immutable.stringops.toint(stringops.scala:31) @ com.databricks.spark.csv.util.typecast$.castto(typecast.scala:53) @ com.databricks.spark.csv.csvrelation$$anonfun$buildscan$6.apply(csvrelation.scala:181) @ com.databricks.spark.csv.csvrelation$$anonfun$buildscan$6.apply(csvrelation.scala:162) @ scala.collection.iterator$$anon$13.hasnext(iterator.scala:371) @ scala.collection.iterator$$anon$11.hasnext(iterator.scala:327) @ scala.collection.iterator$$anon$14.hasnext(iterator.scala:388) @ org.apache.spark.sql.execution.aggregate.tungstenaggregationiterator.processinputs(tungstenaggregationiterator.scala:511) @ org.apache.spark.sql.execution.aggregate.tungstenaggregationiterator.<init>(tungstenaggregationiterator.scala:686) @ org.apache.spark.sql.execution.aggregate.tungstenaggregate$$anonfun$doexecute$1$$anonfun$2.apply(tungstenaggregate.scala:95) @ org.apache.spark.sql.execution.aggregate.tungstenaggregate$$anonfun$doexecute$1$$anonfun$2.apply(tungstenaggregate.scala:86) @ org.apache.spark.rdd.rdd$$anonfun$mappartitions$1$$anonfun$apply$20.apply(rdd.scala:710) @ org.apache.spark.rdd.rdd$$anonfun$mappartitions$1$$anonfun$apply$20.apply(rdd.scala:710) @ org.apache.spark.rdd.mappartitionsrdd.compute(mappartitionsrdd.scala:38) @ org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd.scala:306) @ org.apache.spark.rdd.rdd.iterator(rdd.scala:270) @ org.apache.spark.rdd.mappartitionsrdd.compute(mappartitionsrdd.scala:38) @ org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd.scala:306) @ org.apache.spark.rdd.rdd.iterator(rdd.scala:270) @ org.apache.spark.scheduler.shufflemaptask.runtask(shufflemaptask.scala:73) @ org.apache.spark.scheduler.shufflemaptask.runtask(shufflemaptask.scala:41) @ org.apache.spark.scheduler.task.run(task.scala:89) @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:213) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1142) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:617) @ java.lang.thread.run(thread.java:745) so question - how implement exception handling using spark-csv ? can convert dataframe rdd , work on there seems there must better way.....
anyone solved similar problem?
this because inferring automatically schema not safe against invalid data input files.
this can cause dataframe schema different, when using different input files.
suppose have csv file floats, contaminated string:
0.018 0.095 0.000 'hoi' 0.000 0.093 0.012 when read dataframe using inferschema, so:
>>> df = spark.read.format('csv').option('inferschema', true).load('./test_csv.dat') >>> df.show() +-----+ | _c0| +-----+ |0.018| |0.095| |0.000| |'hoi'| |0.000| |0.093| |0.012| +-----+ then type not inferred:
>>> df.schema structtype(list(structfield(_c0,stringtype,true))) you can solve manually casting column, so:
>>> df = df.withcolumn('val_float', df_tmp._c0.cast(floattype())).select('val_float') >>> df.show() +---------+ |val_float| +---------+ | 0.018| | 0.095| | 0.0| | null| | 0.0| | 0.093| | 0.012| +---------+ >>> df.schema structtype(list(structfield(val_float,floattype,true)))
Comments
Post a Comment