在工作中,发现一个线上问题,某一天任务少数据,对原始数据分析,对 DataFrame 调用 show() 有数据,但 count() 竟然为 0 ??
val df = spark.read.parquet("/tmp/test_data") df.show(10) pintln(df.count())
代码如上,show() 的时候有数据,但 count() 的结果竟然为 0 !
单从代码上进行分析,很难分析出来数据的问题,也执行了 printSchema ,也可以正常输出数据的 schema 。
所以考虑 yarn 上观察该任务,寻找原因,从 yarn ui 上该任务 Tracking URL 进入,从 Completed Jobs 中找到 count() 函数执行的那个 task ,找到其 Logs stderr ,很快发现了报错。
22/06/27 17:27:33 WARN FileScanRDD: Skipped the rest of the content in the corrupted file: path: hdfs://HDFS001/tmp/test_data/part-00019-d38910e3-f5d5-4f87-a820-a2eb680f9501-c000.snappy.parquet, range: 0-3466308, partition values: [empty row] java.lang.RuntimeException: Found duplicate field(s) "etaG": [etaG, etag] in case-insensitive mode
“Found duplicate field(s) "etaG": [etaG, etag] in case-insensitive mode
” 这个错误是在于,默认 spark 是不区分大小写的,比如在合并 schema 的时候,在从内置数据源 Parquet、ORC、Avro 和 JSON 读取时,检查同一级别(顶级或嵌套级别)上没有重复的列名。如果存在这样的重复列,则抛出这样的异常。
而该数据中存在两个忽略大小写情况下相同的字段,“eatg” 和 “etaG” ,所以合并 schema 的时候报错了。那么如何控制 spark 在计算的时候区分大小写呢?是通过 spark.sql.caseSensitive 这个配置项控制的,默认是 false ,即不区分, true 则区分大小写。这里因为我的 spark 任务配置读取的是机器上 SPARK_HOME 下的 conf ,所以这里我在 conf 里加上 spark.sql.caseSensitive: true 。如果,你是代码控制配置的话 ,也可以通过加上下面这行代码进行控制:
// sql方式 spark.sql(“set spark.sql.caseSensitive=true”) // 配置项方式 spark.sqlContext.setConf("spark.sql.caseSensitive", "true")
重新执行任务,发现报错消失,数据正常。
这里,你可能会纳闷,这个错误为什么会造成 show 正常,但 count 为 0 呢,为什么任务执行不失败呢?其实,最相关的信息,是这一行 “22/06/27 17:27:33 WARN FileScanRDD: Skipped the rest of the content in the corrupted file: path: hdfs://HDFS001/tmp/test_data/part-00019-d38910e3-f5d5-4f87-a820-a2eb680f9501-c000.snappy.parquet, range: 0-3466308, partition values: [empty row]
” 。
这个信息的原因是,spark 默认会跳过“损坏”的文件,然后会将损坏的文件作为 WARN 消息记录在程序执行程序日志中。这里出现大小写字段无法合并的文件,也就被当成了“损坏”的文件,所以 count() 函数执行的时候,忽略了这些数据文件。但是至于 show() 为什么不跳过,暂时未知,需要再仔细研究。
spark.read.parquet("/tmp/test_data").agg(count("type")).show()
同样测试发现,上面这种方式也可以正常输出。
遇到“损坏”文件的策略,是由这两项配置控制的。
RDD:spark.files.ignoreCorruptFiles
DataFrame:spark.sql.files.ignoreCorruptFiles
如果需要遇到“损坏”文件不跳过的话,则将两个属性值设置为 false 即可。
本文参考
- The .schema() API behaves incorrectly for nested schemas that have column duplicates in case-insensitive mode:https://issues.apache.org/jira/browse/SPARK-32431
- Spark SQL Upgrading Guide:https://spark.apache.org/docs/2.4.2/sql-migration-guide-upgrade.html
- Spark Configuration Properties:https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-properties.html
- Spark – ignoring corrupted files:https://stackoverflow.com/questions/53541593/spark-ignoring-corrupted-files
发表评论