java - Out of memory error when writing out spark dataframes to parquet format -


i'm trying query data database, transformations on , save new data in parquet format on hdfs.

since database query returns large number of rows, i'm getting data in batches , running above process on every incoming batch.

update 2: batch processing logic is:

import scala.collection.javaconverters._  import org.apache.spark.sparkcontext import org.apache.spark.sql.sqlcontext import org.apache.spark.sql.row import org.apache.spark.sql.types.{structtype, structfield, stringtype}  class batch(rows: list[string],              sqlcontext: sqlcontext) {        // actual schema has around 60 fields       val schema = array("name", "age", "address").map(field =>                        structfield(field, stringtype, true)                    )        val transformedrows = rows.map(rows => {                // transformation logic (returns array[array[string]] type)            }).map(row => row.fromseq(row.toseq))        val dataframe = sqlcontext.createdataframe(transformedrows.asjava, schema)  }  val sparkconf = new sparkconf().setappname("spark app") val sparkcontext = new sparkcontext(sparkconf) val sqlcontext = new sqlcontext(sparkcontext)  // code query database // queryresponse iterator fetches next batch on calling queryresponse.next  var batch_num = 0  while (queryresponse.hasnext) {     val batch = queryresponse.next      val batchtosave = new batch(                           batch.tolist.map(_.getdocument.tostring),                           sqlcontext)      batchtosave.dataframe.write.parquet(batch_num + "_parquet")      batch_num += 1  } 

my spark version in 1.6.1 , spark-submit is:

spark-submit target/scala-2.10/spark\ application-assembly-1.0.jar 

the problem after number of batches, java.lang.outofmemoryerror error.

the entire stacktrace is:

exception in thread "main" java.lang.outofmemoryerror: java heap space     @ java.util.arrays.copyofrange(arrays.java:2694)     @ java.lang.string.<init>(string.java:203)     @ java.lang.stringbuilder.tostring(stringbuilder.java:405)     @ scala.stringcontext.standardinterpolator(stringcontext.scala:125)     @ scala.stringcontext.s(stringcontext.scala:90)     @ org.apache.spark.sql.execution.queryexecution.tostring(queryexecution.scala:70)     @ org.apache.spark.sql.execution.sqlexecution$.withnewexecutionid(sqlexecution.scala:52)     @ org.apache.spark.sql.execution.datasources.insertintohadoopfsrelation.run(insertintohadoopfsrelation.scala:108)     @ org.apache.spark.sql.execution.executedcommand.sideeffectresult$lzycompute(commands.scala:58)     @ org.apache.spark.sql.execution.executedcommand.sideeffectresult(commands.scala:56)     @ org.apache.spark.sql.execution.executedcommand.doexecute(commands.scala:70)     @ org.apache.spark.sql.execution.sparkplan$$anonfun$execute$5.apply(sparkplan.scala:132)     @ org.apache.spark.sql.execution.sparkplan$$anonfun$execute$5.apply(sparkplan.scala:130)     @ org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:150)     @ org.apache.spark.sql.execution.sparkplan.execute(sparkplan.scala:130)     @ org.apache.spark.sql.execution.queryexecution.tordd$lzycompute(queryexecution.scala:55)     @ org.apache.spark.sql.execution.queryexecution.tordd(queryexecution.scala:55)     @ org.apache.spark.sql.execution.datasources.resolveddatasource$.apply(resolveddatasource.scala:256)     @ org.apache.spark.sql.dataframewriter.save(dataframewriter.scala:148)     @ org.apache.spark.sql.dataframewriter.save(dataframewriter.scala:139)     @ org.apache.spark.sql.dataframewriter.parquet(dataframewriter.scala:334)     @ app.application$.main(app.scala:156)     @ app.application.main(app.scala)     @ sun.reflect.nativemethodaccessorimpl.invoke0(native method)     @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:57)     @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43)     @ java.lang.reflect.method.invoke(method.java:606)     @ org.apache.spark.deploy.sparksubmit$.org$apache$spark$deploy$sparksubmit$$runmain(sparksubmit.scala:731)     @ org.apache.spark.deploy.sparksubmit$.dorunmain$1(sparksubmit.scala:181)     @ org.apache.spark.deploy.sparksubmit$.submit(sparksubmit.scala:206)     @ org.apache.spark.deploy.sparksubmit$.main(sparksubmit.scala:121)     @ org.apache.spark.deploy.sparksubmit.main(sparksubmit.scala) 

i tried coalescing data single partition didn't make difference.

dataframe.coalesce(1).write.parquet(batch_num + "_parquet") 

any appreciated.

update 1

not doing coalesce transform on rdd still gives error stacktrace follows. seems issue parquet.

driver stacktrace:     @ org.apache.spark.scheduler.dagscheduler.org$apache$spark$scheduler$dagscheduler$$failjobandindependentstages(dagscheduler.scala:1431)     @ org.apache.spark.scheduler.dagscheduler$$anonfun$abortstage$1.apply(dagscheduler.scala:1419)     @ org.apache.spark.scheduler.dagscheduler$$anonfun$abortstage$1.apply(dagscheduler.scala:1418)     @ scala.collection.mutable.resizablearray$class.foreach(resizablearray.scala:59)     @ scala.collection.mutable.arraybuffer.foreach(arraybuffer.scala:47)     @ org.apache.spark.scheduler.dagscheduler.abortstage(dagscheduler.scala:1418)     @ org.apache.spark.scheduler.dagscheduler$$anonfun$handletasksetfailed$1.apply(dagscheduler.scala:799)     @ org.apache.spark.scheduler.dagscheduler$$anonfun$handletasksetfailed$1.apply(dagscheduler.scala:799)     @ scala.option.foreach(option.scala:236)     @ org.apache.spark.scheduler.dagscheduler.handletasksetfailed(dagscheduler.scala:799)     @ org.apache.spark.scheduler.dagschedulereventprocessloop.doonreceive(dagscheduler.scala:1640)     @ org.apache.spark.scheduler.dagschedulereventprocessloop.onreceive(dagscheduler.scala:1599)     @ org.apache.spark.scheduler.dagschedulereventprocessloop.onreceive(dagscheduler.scala:1588)     @ org.apache.spark.util.eventloop$$anon$1.run(eventloop.scala:48)     @ org.apache.spark.scheduler.dagscheduler.runjob(dagscheduler.scala:620)     @ org.apache.spark.sparkcontext.runjob(sparkcontext.scala:1855)     @ org.apache.spark.sparkcontext.runjob(sparkcontext.scala:1868)     @ org.apache.spark.sparkcontext.runjob(sparkcontext.scala:1945)     @ org.apache.spark.sql.execution.datasources.insertintohadoopfsrelation$$anonfun$run$1.apply$mcv$sp(insertintohadoopfsrelation.scala:150)     @ org.apache.spark.sql.execution.datasources.insertintohadoopfsrelation$$anonfun$run$1.apply(insertintohadoopfsrelation.scala:108)     @ org.apache.spark.sql.execution.datasources.insertintohadoopfsrelation$$anonfun$run$1.apply(insertintohadoopfsrelation.scala:108)     @ org.apache.spark.sql.execution.sqlexecution$.withnewexecutionid(sqlexecution.scala:56)     @ org.apache.spark.sql.execution.datasources.insertintohadoopfsrelation.run(insertintohadoopfsrelation.scala:108)     @ org.apache.spark.sql.execution.executedcommand.sideeffectresult$lzycompute(commands.scala:58)     @ org.apache.spark.sql.execution.executedcommand.sideeffectresult(commands.scala:56)     @ org.apache.spark.sql.execution.executedcommand.doexecute(commands.scala:70)     @ org.apache.spark.sql.execution.sparkplan$$anonfun$execute$5.apply(sparkplan.scala:132)     @ org.apache.spark.sql.execution.sparkplan$$anonfun$execute$5.apply(sparkplan.scala:130)     @ org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:150)     @ org.apache.spark.sql.execution.sparkplan.execute(sparkplan.scala:130)     @ org.apache.spark.sql.execution.queryexecution.tordd$lzycompute(queryexecution.scala:55)     @ org.apache.spark.sql.execution.queryexecution.tordd(queryexecution.scala:55)     @ org.apache.spark.sql.execution.datasources.resolveddatasource$.apply(resolveddatasource.scala:256)     @ org.apache.spark.sql.dataframewriter.save(dataframewriter.scala:148)     @ org.apache.spark.sql.dataframewriter.save(dataframewriter.scala:139)     @ org.apache.spark.sql.dataframewriter.parquet(dataframewriter.scala:334)     @ app.application$.main(app.scala:156)     @ app.application.main(app.scala)     @ sun.reflect.nativemethodaccessorimpl.invoke0(native method)     @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:57)     @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43)     @ java.lang.reflect.method.invoke(method.java:606)     @ org.apache.spark.deploy.sparksubmit$.org$apache$spark$deploy$sparksubmit$$runmain(sparksubmit.scala:731)     @ org.apache.spark.deploy.sparksubmit$.dorunmain$1(sparksubmit.scala:181)     @ org.apache.spark.deploy.sparksubmit$.submit(sparksubmit.scala:206)     @ org.apache.spark.deploy.sparksubmit$.main(sparksubmit.scala:121)     @ org.apache.spark.deploy.sparksubmit.main(sparksubmit.scala) caused by: java.lang.outofmemoryerror: java heap space     @ org.apache.parquet.column.values.dictionary.intlist.initslab(intlist.java:90)     @ org.apache.parquet.column.values.dictionary.intlist.<init>(intlist.java:86)     @ org.apache.parquet.column.values.dictionary.dictionaryvalueswriter.<init>(dictionaryvalueswriter.java:93)     @ org.apache.parquet.column.values.dictionary.dictionaryvalueswriter$plainbinarydictionaryvalueswriter.<init>(dictionaryvalueswriter.java:229)     @ org.apache.parquet.column.parquetproperties.dictionarywriter(parquetproperties.java:131)     @ org.apache.parquet.column.parquetproperties.dictwriterwithfallback(parquetproperties.java:178)     @ org.apache.parquet.column.parquetproperties.getvalueswriter(parquetproperties.java:203)     @ org.apache.parquet.column.impl.columnwriterv1.<init>(columnwriterv1.java:83)     @ org.apache.parquet.column.impl.columnwritestorev1.newmemcolumn(columnwritestorev1.java:68)     @ org.apache.parquet.column.impl.columnwritestorev1.getcolumnwriter(columnwritestorev1.java:56)     @ org.apache.parquet.io.messagecolumnio$messagecolumniorecordconsumer.<init>(messagecolumnio.java:183)     @ org.apache.parquet.io.messagecolumnio.getrecordwriter(messagecolumnio.java:375)     @ org.apache.parquet.hadoop.internalparquetrecordwriter.initstore(internalparquetrecordwriter.java:109)     @ org.apache.parquet.hadoop.internalparquetrecordwriter.<init>(internalparquetrecordwriter.java:99)     @ org.apache.parquet.hadoop.parquetrecordwriter.<init>(parquetrecordwriter.java:100)     @ org.apache.parquet.hadoop.parquetoutputformat.getrecordwriter(parquetoutputformat.java:303)     @ org.apache.parquet.hadoop.parquetoutputformat.getrecordwriter(parquetoutputformat.java:262)     @ org.apache.spark.sql.execution.datasources.parquet.parquetoutputwriter.<init>(parquetrelation.scala:94)     @ org.apache.spark.sql.execution.datasources.parquet.parquetrelation$$anon$3.newinstance(parquetrelation.scala:286)     @ org.apache.spark.sql.execution.datasources.basewritercontainer.newoutputwriter(writercontainer.scala:129)     @ org.apache.spark.sql.execution.datasources.defaultwritercontainer.writerows(writercontainer.scala:255)     @ org.apache.spark.sql.execution.datasources.insertintohadoopfsrelation$$anonfun$run$1$$anonfun$apply$mcv$sp$3.apply(insertintohadoopfsrelation.scala:150)     @ org.apache.spark.sql.execution.datasources.insertintohadoopfsrelation$$anonfun$run$1$$anonfun$apply$mcv$sp$3.apply(insertintohadoopfsrelation.scala:150)     @ org.apache.spark.scheduler.resulttask.runtask(resulttask.scala:66)     @ org.apache.spark.scheduler.task.run(task.scala:89)     @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:214)     @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1145)     @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:615)     @ java.lang.thread.run(thread.java:745) 

had same problem today well. turned out execution plan rather complex , tostring generated 150 mb of information combined string interpolation of scala lead driver running out of memory.

you can try increase driver memory (i had double 8 gb 16 gb).


Comments

Popular posts from this blog

sql - invalid in the select list because it is not contained in either an aggregate function -

Angularjs unit testing - ng-disabled not working when adding text to textarea -

How to start daemon on android by adb -