java - Out of memory error when writing out spark dataframes to parquet format -
i'm trying query data database, transformations on , save new data in parquet format on hdfs.
since database query returns large number of rows, i'm getting data in batches , running above process on every incoming batch.
update 2: batch processing logic is:
import scala.collection.javaconverters._ import org.apache.spark.sparkcontext import org.apache.spark.sql.sqlcontext import org.apache.spark.sql.row import org.apache.spark.sql.types.{structtype, structfield, stringtype} class batch(rows: list[string], sqlcontext: sqlcontext) { // actual schema has around 60 fields val schema = array("name", "age", "address").map(field => structfield(field, stringtype, true) ) val transformedrows = rows.map(rows => { // transformation logic (returns array[array[string]] type) }).map(row => row.fromseq(row.toseq)) val dataframe = sqlcontext.createdataframe(transformedrows.asjava, schema) } val sparkconf = new sparkconf().setappname("spark app") val sparkcontext = new sparkcontext(sparkconf) val sqlcontext = new sqlcontext(sparkcontext) // code query database // queryresponse iterator fetches next batch on calling queryresponse.next var batch_num = 0 while (queryresponse.hasnext) { val batch = queryresponse.next val batchtosave = new batch( batch.tolist.map(_.getdocument.tostring), sqlcontext) batchtosave.dataframe.write.parquet(batch_num + "_parquet") batch_num += 1 }
my spark version in 1.6.1 , spark-submit is:
spark-submit target/scala-2.10/spark\ application-assembly-1.0.jar
the problem after number of batches, java.lang.outofmemoryerror
error.
the entire stacktrace is:
exception in thread "main" java.lang.outofmemoryerror: java heap space @ java.util.arrays.copyofrange(arrays.java:2694) @ java.lang.string.<init>(string.java:203) @ java.lang.stringbuilder.tostring(stringbuilder.java:405) @ scala.stringcontext.standardinterpolator(stringcontext.scala:125) @ scala.stringcontext.s(stringcontext.scala:90) @ org.apache.spark.sql.execution.queryexecution.tostring(queryexecution.scala:70) @ org.apache.spark.sql.execution.sqlexecution$.withnewexecutionid(sqlexecution.scala:52) @ org.apache.spark.sql.execution.datasources.insertintohadoopfsrelation.run(insertintohadoopfsrelation.scala:108) @ org.apache.spark.sql.execution.executedcommand.sideeffectresult$lzycompute(commands.scala:58) @ org.apache.spark.sql.execution.executedcommand.sideeffectresult(commands.scala:56) @ org.apache.spark.sql.execution.executedcommand.doexecute(commands.scala:70) @ org.apache.spark.sql.execution.sparkplan$$anonfun$execute$5.apply(sparkplan.scala:132) @ org.apache.spark.sql.execution.sparkplan$$anonfun$execute$5.apply(sparkplan.scala:130) @ org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:150) @ org.apache.spark.sql.execution.sparkplan.execute(sparkplan.scala:130) @ org.apache.spark.sql.execution.queryexecution.tordd$lzycompute(queryexecution.scala:55) @ org.apache.spark.sql.execution.queryexecution.tordd(queryexecution.scala:55) @ org.apache.spark.sql.execution.datasources.resolveddatasource$.apply(resolveddatasource.scala:256) @ org.apache.spark.sql.dataframewriter.save(dataframewriter.scala:148) @ org.apache.spark.sql.dataframewriter.save(dataframewriter.scala:139) @ org.apache.spark.sql.dataframewriter.parquet(dataframewriter.scala:334) @ app.application$.main(app.scala:156) @ app.application.main(app.scala) @ sun.reflect.nativemethodaccessorimpl.invoke0(native method) @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:57) @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43) @ java.lang.reflect.method.invoke(method.java:606) @ org.apache.spark.deploy.sparksubmit$.org$apache$spark$deploy$sparksubmit$$runmain(sparksubmit.scala:731) @ org.apache.spark.deploy.sparksubmit$.dorunmain$1(sparksubmit.scala:181) @ org.apache.spark.deploy.sparksubmit$.submit(sparksubmit.scala:206) @ org.apache.spark.deploy.sparksubmit$.main(sparksubmit.scala:121) @ org.apache.spark.deploy.sparksubmit.main(sparksubmit.scala)
i tried coalescing data single partition didn't make difference.
dataframe.coalesce(1).write.parquet(batch_num + "_parquet")
any appreciated.
update 1
not doing coalesce transform on rdd still gives error stacktrace follows. seems issue parquet.
driver stacktrace: @ org.apache.spark.scheduler.dagscheduler.org$apache$spark$scheduler$dagscheduler$$failjobandindependentstages(dagscheduler.scala:1431) @ org.apache.spark.scheduler.dagscheduler$$anonfun$abortstage$1.apply(dagscheduler.scala:1419) @ org.apache.spark.scheduler.dagscheduler$$anonfun$abortstage$1.apply(dagscheduler.scala:1418) @ scala.collection.mutable.resizablearray$class.foreach(resizablearray.scala:59) @ scala.collection.mutable.arraybuffer.foreach(arraybuffer.scala:47) @ org.apache.spark.scheduler.dagscheduler.abortstage(dagscheduler.scala:1418) @ org.apache.spark.scheduler.dagscheduler$$anonfun$handletasksetfailed$1.apply(dagscheduler.scala:799) @ org.apache.spark.scheduler.dagscheduler$$anonfun$handletasksetfailed$1.apply(dagscheduler.scala:799) @ scala.option.foreach(option.scala:236) @ org.apache.spark.scheduler.dagscheduler.handletasksetfailed(dagscheduler.scala:799) @ org.apache.spark.scheduler.dagschedulereventprocessloop.doonreceive(dagscheduler.scala:1640) @ org.apache.spark.scheduler.dagschedulereventprocessloop.onreceive(dagscheduler.scala:1599) @ org.apache.spark.scheduler.dagschedulereventprocessloop.onreceive(dagscheduler.scala:1588) @ org.apache.spark.util.eventloop$$anon$1.run(eventloop.scala:48) @ org.apache.spark.scheduler.dagscheduler.runjob(dagscheduler.scala:620) @ org.apache.spark.sparkcontext.runjob(sparkcontext.scala:1855) @ org.apache.spark.sparkcontext.runjob(sparkcontext.scala:1868) @ org.apache.spark.sparkcontext.runjob(sparkcontext.scala:1945) @ org.apache.spark.sql.execution.datasources.insertintohadoopfsrelation$$anonfun$run$1.apply$mcv$sp(insertintohadoopfsrelation.scala:150) @ org.apache.spark.sql.execution.datasources.insertintohadoopfsrelation$$anonfun$run$1.apply(insertintohadoopfsrelation.scala:108) @ org.apache.spark.sql.execution.datasources.insertintohadoopfsrelation$$anonfun$run$1.apply(insertintohadoopfsrelation.scala:108) @ org.apache.spark.sql.execution.sqlexecution$.withnewexecutionid(sqlexecution.scala:56) @ org.apache.spark.sql.execution.datasources.insertintohadoopfsrelation.run(insertintohadoopfsrelation.scala:108) @ org.apache.spark.sql.execution.executedcommand.sideeffectresult$lzycompute(commands.scala:58) @ org.apache.spark.sql.execution.executedcommand.sideeffectresult(commands.scala:56) @ org.apache.spark.sql.execution.executedcommand.doexecute(commands.scala:70) @ org.apache.spark.sql.execution.sparkplan$$anonfun$execute$5.apply(sparkplan.scala:132) @ org.apache.spark.sql.execution.sparkplan$$anonfun$execute$5.apply(sparkplan.scala:130) @ org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:150) @ org.apache.spark.sql.execution.sparkplan.execute(sparkplan.scala:130) @ org.apache.spark.sql.execution.queryexecution.tordd$lzycompute(queryexecution.scala:55) @ org.apache.spark.sql.execution.queryexecution.tordd(queryexecution.scala:55) @ org.apache.spark.sql.execution.datasources.resolveddatasource$.apply(resolveddatasource.scala:256) @ org.apache.spark.sql.dataframewriter.save(dataframewriter.scala:148) @ org.apache.spark.sql.dataframewriter.save(dataframewriter.scala:139) @ org.apache.spark.sql.dataframewriter.parquet(dataframewriter.scala:334) @ app.application$.main(app.scala:156) @ app.application.main(app.scala) @ sun.reflect.nativemethodaccessorimpl.invoke0(native method) @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:57) @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43) @ java.lang.reflect.method.invoke(method.java:606) @ org.apache.spark.deploy.sparksubmit$.org$apache$spark$deploy$sparksubmit$$runmain(sparksubmit.scala:731) @ org.apache.spark.deploy.sparksubmit$.dorunmain$1(sparksubmit.scala:181) @ org.apache.spark.deploy.sparksubmit$.submit(sparksubmit.scala:206) @ org.apache.spark.deploy.sparksubmit$.main(sparksubmit.scala:121) @ org.apache.spark.deploy.sparksubmit.main(sparksubmit.scala) caused by: java.lang.outofmemoryerror: java heap space @ org.apache.parquet.column.values.dictionary.intlist.initslab(intlist.java:90) @ org.apache.parquet.column.values.dictionary.intlist.<init>(intlist.java:86) @ org.apache.parquet.column.values.dictionary.dictionaryvalueswriter.<init>(dictionaryvalueswriter.java:93) @ org.apache.parquet.column.values.dictionary.dictionaryvalueswriter$plainbinarydictionaryvalueswriter.<init>(dictionaryvalueswriter.java:229) @ org.apache.parquet.column.parquetproperties.dictionarywriter(parquetproperties.java:131) @ org.apache.parquet.column.parquetproperties.dictwriterwithfallback(parquetproperties.java:178) @ org.apache.parquet.column.parquetproperties.getvalueswriter(parquetproperties.java:203) @ org.apache.parquet.column.impl.columnwriterv1.<init>(columnwriterv1.java:83) @ org.apache.parquet.column.impl.columnwritestorev1.newmemcolumn(columnwritestorev1.java:68) @ org.apache.parquet.column.impl.columnwritestorev1.getcolumnwriter(columnwritestorev1.java:56) @ org.apache.parquet.io.messagecolumnio$messagecolumniorecordconsumer.<init>(messagecolumnio.java:183) @ org.apache.parquet.io.messagecolumnio.getrecordwriter(messagecolumnio.java:375) @ org.apache.parquet.hadoop.internalparquetrecordwriter.initstore(internalparquetrecordwriter.java:109) @ org.apache.parquet.hadoop.internalparquetrecordwriter.<init>(internalparquetrecordwriter.java:99) @ org.apache.parquet.hadoop.parquetrecordwriter.<init>(parquetrecordwriter.java:100) @ org.apache.parquet.hadoop.parquetoutputformat.getrecordwriter(parquetoutputformat.java:303) @ org.apache.parquet.hadoop.parquetoutputformat.getrecordwriter(parquetoutputformat.java:262) @ org.apache.spark.sql.execution.datasources.parquet.parquetoutputwriter.<init>(parquetrelation.scala:94) @ org.apache.spark.sql.execution.datasources.parquet.parquetrelation$$anon$3.newinstance(parquetrelation.scala:286) @ org.apache.spark.sql.execution.datasources.basewritercontainer.newoutputwriter(writercontainer.scala:129) @ org.apache.spark.sql.execution.datasources.defaultwritercontainer.writerows(writercontainer.scala:255) @ org.apache.spark.sql.execution.datasources.insertintohadoopfsrelation$$anonfun$run$1$$anonfun$apply$mcv$sp$3.apply(insertintohadoopfsrelation.scala:150) @ org.apache.spark.sql.execution.datasources.insertintohadoopfsrelation$$anonfun$run$1$$anonfun$apply$mcv$sp$3.apply(insertintohadoopfsrelation.scala:150) @ org.apache.spark.scheduler.resulttask.runtask(resulttask.scala:66) @ org.apache.spark.scheduler.task.run(task.scala:89) @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:214) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1145) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:615) @ java.lang.thread.run(thread.java:745)
had same problem today well. turned out execution plan rather complex , tostring generated 150 mb of information combined string interpolation of scala lead driver running out of memory.
you can try increase driver memory (i had double 8 gb 16 gb).
Comments
Post a Comment