How does Apache Spark read a parquet file

In this post I will try to explain what happens when Apache Spark tries to read a parquet file. Apache Parquet is a popular columnar storage format which stores its data as a bunch of files. Typically these files are stored on HDFS. In a seprate post I will explain more details about the internals of Parquet, but for here we focus on what happens when you call

 val parquetFileDF = spark.read.parquet("intWithPayload.parquet")

as documented in the Spark SQL programming guide.

Note: This blog post is work in progress with its content, accuracy, and of course, formatting.

This commentary is made on the 2.1 version of the source code, with the Whole Stage Code Generation (WSCG) on.

In this example, I am trying to read a file which was generated by the Parquet Generator Tool. The schema for intWithPayload.parquet file is <int, Array[Byte]>. This detail is important because it dictates how WSCG is done. See the end of this page.

Key Objects

In Spark SQL, various operations are implemented in their respective classes. You can find them having Exec as a suffix in their name.

Step 1: So for reading a data source, we look into DataSourceScanExec class. From here, the code somehow ends up in the ParquetFileFormat class. I am not entirely clear how does this happen, but it makes sense. Various input file formats are implemented this way. We need to trace whole-stage-code-generation path with the batch reading support. Anyways moving on.

Step 2: ParquetFileFormat has a buildReader function that returns a (PartitionedFile => Iterator[InternalRow]) function. The iterator in this function is generated as:

  val iter = new RecordReaderIterator(parquetReader)

Here parquetReader is of type VectorizedParquetRecordReader. The RecordReaderIterator class wraps a Scala iterator (we discuss later how this iterator is consumed, step 4) around Hadoop style RecordReader<K, V>, which is implemented by VectorizedParquetRecordReader (and its base class SpecificParquetRecordReaderBase<Object>).

Step 3: What does VectorizedParquetRecordReader do? According to the comment in the file,” A specialized RecordReader that reads into InternalRows or ColumnarBatches directly using the Parquet column APIs. This is somewhat based on parquet-mr’s ColumnReader “. After object allocation VectorizedParquetRecordReader, initialize(split, hadoopAttemptContext) and initBatch(partitionSchema, file.partitionValues) functions are called.

Step 3(i): initialize calls the initialize function of the super class SpecificParquetRecordReaderBase. Here, the file schema is read, requested schema is inferred, and a reader of type ParquetFileReader is allocated in the base class. By the end of the SpecificParquetRecordReaderBase.initialize(...) we know how many rows are there in the InputFileSplit. This is stored in the totalRowCount variable.

Step 3(ii): initBatch mainly allocates columnarBatch object. We discuss this class in more detail later.

Step 4: Implementation of the RecordReader interface in VectorizedParquetRecordReader demands more attention, because it what what is called when the iterator wrapper from the Step 2 is consumed (where?, we tell later). Upon calling nextKeyValue(), the function first calls resultBatch (which becomes no-op, if columnarBatch is already initialized, which it is, see step 3.ii), and then to nextBatch(). Remember, we always operate in the batch mode (returnColumnarBatch is set to true). The nextBatch function fills (how? we tell later) columnarBatch with data and this variable is returned in getCurrentValue function. getCurrentKey always returns null and is implemented in the base class of SpecificParquetRecordReaderBase.

So, now we know what variable is returned in the iterator. Now from here there are two directions. We first describe how ColumnarBatch is filled with the parquet data. And then we describe who consumes the mysterious iterator that is generated in the beginning (Step 2, val iter), where ColumnarBatch is returned and what happens with it?

How and where ColumnarBatch is filled?

In the VectorizedParquetRecordReader.nextBatch() function, if it has not read all the rows, it calls checkEndOfRowGroup() function. the checkEndOfRowGroup function then reads a rowGroup (you can think of a row group as a collection of certain number of rows stored in columnar format), and then allocates a VectorizedColumnReader object for each requested column in the requestedSchema. The VectorizedColumnReader constructor takes a ColumnDescriptor (can be found in the schema) and a PageReader (can be found from the rowGroup, a Parquet API call). Also missingColumns is a bitmap of missing columns (probably the ones which are missing or ones which Spark does not intend to read). Then in the nextBatch function readBatch(num, columnarBatch.column(i)); is called on all the VectorizedColumnReader objects allocated before in the checkEndOfRowGroup function (essentially per column). (So, ColumnarBatch and ColumnVector are just raw piece of memory which are used by the VectorizedColumnReader). So here in the readBatch, number of rows is passed and ColumnVector (stored in ColumnarBatch). So what is ColumnVector? We can think of this as an array of types, index by the rowId.

/** 
 * This class represents a column of values and provides the main APIs 
 * to access the data values. It supports all the types and contains 
 * get/put APIs as well as their batched versions. The batched versions 
 * are preferable whenever possible.
 * ...
*/
public abstract class ColumnVector implements AutoCloseable

Anyways, coming back to the point. The raw data is stored in the ColumnVector, which themselves are stored in a ColumnBatch object. ColumnVector is what is passed as the storage space in the readBatch function. Now inside the readBatch function, it first calls readPage() function which see which version of the parquet file we are reading (v1 or v2, I don’t know the difference), and then initializes a bunch of objects, namely, defColumn:VectorizedRleValuesReader, repetitionLevelColumn:ValuesReaderIntIterator, definitionLevelColumn:ValuesReaderIntIterator, and dataColumn:VectorizedRleValuesReader. Out of these variables ValuesReaderIntIterator is from parquet-mr, and VectorizedRleValuesReader from Spark. From here on, I do not understand completely what is happening. There are a bunch of read[Type]Batch() functions which are called, which in turn call defColumn.read[Type]s() functions. ([Type] here is some type like Int, Short, Binary, etc.). In these functions on VectorizedRleValuesReader, data is read, de-coded (perhaps from RLE) and then inserted into ColumnVector which is passed here. So much so for this.

How and where the Scala[ColumnBatch] iterator is consumed?

Now the second part. The iterator returns two different types based on if reader is in the batch mode or not. The code looks something like this:

@Override
  public Object getCurrentValue() throws IOException, InterruptedException {
    if (returnColumnarBatch) return columnarBatch;
    return columnarBatch.getRow(batchIdx - 1);
  }

columnarBatch is of type ColumnarBatch and columnarBatch.getRow returns a nested class of type ColumnarBatch.Row. Anyways coming back to the point, we are tracking the batch mode. Hence, the value the iterator returns is of type ColumnarBatch. This iterator is somehow passed to the wholestage code generation. The code that consumes the iterator and materializes UnsafeRow is documented at the end of this post (also here). So what is happening here? In the scan_nextBatch function we read the new value of the the ColumnarBatch by calling next(). Then we acquire ColumnVectors objects (see scan_colInstance0 and scan_colInstance1 variables). ColumnarBatch tells how many rows it has numRows(), and it just calls ColumnVector objects with get[Type](rowId:Int) to get the final value.

These materialized values are then represented as UnsafeRow with the help of BufferHolder and UnsafeRowWriter objects, allocated as

/* 035 */     scan_result = new UnsafeRow(2);
/* 036 */     this.scan_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(scan_result, 32);
/* 037 */     this.scan_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(scan_holder, 2);

From here on, I am not sure who consumes this org.apache.spark.sql.execution.BufferedRowIterator (the parent class) iterator. There is some hint here, in the WholeStageCodegenExec class

val rdds = child.asInstanceOf[CodegenSupport].inputRDDs()
assert(rdds.size <= 2, "Up to two input RDDs can be supported")
if (rdds.length == 1) {
      rdds.head.mapPartitionsWithIndex { (index, iter) =>
        val clazz = CodeGenerator.compile(cleanedSource)
        val buffer = clazz.generate(references).asInstanceOf[BufferedRowIterator]
        buffer.init(index, Array(iter))
        new Iterator[InternalRow] {
          override def hasNext: Boolean = {
            val v = buffer.hasNext
            if (!v) durationMs += buffer.durationMs()
            v
          }
          override def next: InternalRow = buffer.next()
        }
      }
    }

If I have to speculate this I would say that rdd is already laid out by the operators that implement them. For example, in FileSourceScanExec which implements DataSourceScanExec (which is a trait),

private lazy val inputRDD: RDD[InternalRow] = {
    val readFile: (PartitionedFile) => Iterator[InternalRow] =
      relation.fileFormat.buildReaderWithPartitionValues(
        sparkSession = relation.sparkSession,
        dataSchema = relation.dataSchema,
        partitionSchema = relation.partitionSchema,
        requiredSchema = outputSchema,
        filters = dataFilters,
        options = relation.options,
        hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))

    relation.bucketSpec match {
      case Some(bucketing) if relation.sparkSession.sessionState.conf.bucketingEnabled =>
        createBucketedReadRDD(bucketing, readFile, selectedPartitions, relation)
      case _ =>
        createNonBucketedReadRDD(readFile, selectedPartitions, relation)
    }
  }

What are Spark’s config for parquet files

 val PARQUET_SCHEMA_MERGING_ENABLED = SQLConfigBuilder("spark.sql.parquet.mergeSchema")
    .doc("When true, the Parquet data source merges schemas collected from all data files, " +
         "otherwise the schema is picked from the summary file or a random data file " +
         "if no summary file is available.")
    .booleanConf
    .createWithDefault(false)

  val PARQUET_SCHEMA_RESPECT_SUMMARIES = SQLConfigBuilder("spark.sql.parquet.respectSummaryFiles")
    .doc("When true, we make assumption that all part-files of Parquet are consistent with " +
         "summary files and we will ignore them when merging schema. Otherwise, if this is " +
         "false, which is the default, we will merge all part-files. This should be considered " +
         "as expert-only option, and shouldn't be enabled before knowing what it means exactly.")
    .booleanConf
    .createWithDefault(false)

  val PARQUET_BINARY_AS_STRING = SQLConfigBuilder("spark.sql.parquet.binaryAsString")
    .doc("Some other Parquet-producing systems, in particular Impala and older versions of " +
      "Spark SQL, do not differentiate between binary data and strings when writing out the " +
      "Parquet schema. This flag tells Spark SQL to interpret binary data as a string to provide " +
      "compatibility with these systems.")
    .booleanConf
    .createWithDefault(false)

  val PARQUET_INT96_AS_TIMESTAMP = SQLConfigBuilder("spark.sql.parquet.int96AsTimestamp")
    .doc("Some Parquet-producing systems, in particular Impala, store Timestamp into INT96. " +
      "Spark would also store Timestamp as INT96 because we need to avoid precision lost of the " +
      "nanoseconds field. This flag tells Spark SQL to interpret INT96 data as a timestamp to " +
      "provide compatibility with these systems.")
    .booleanConf
    .createWithDefault(true)

  val PARQUET_CACHE_METADATA = SQLConfigBuilder("spark.sql.parquet.cacheMetadata")
    .doc("Turns on caching of Parquet schema metadata. Can speed up querying of static data.")
    .booleanConf
    .createWithDefault(true)

  val PARQUET_COMPRESSION = SQLConfigBuilder("spark.sql.parquet.compression.codec")
    .doc("Sets the compression codec use when writing Parquet files. Acceptable values include: " +
      "uncompressed, snappy, gzip, lzo.")
    .stringConf
    .transform(_.toLowerCase())
    .checkValues(Set("uncompressed", "snappy", "gzip", "lzo"))
    .createWithDefault("snappy")

  val PARQUET_FILTER_PUSHDOWN_ENABLED = SQLConfigBuilder("spark.sql.parquet.filterPushdown")
    .doc("Enables Parquet filter push-down optimization when set to true.")
    .booleanConf
    .createWithDefault(true)

  val PARQUET_WRITE_LEGACY_FORMAT = SQLConfigBuilder("spark.sql.parquet.writeLegacyFormat")
    .doc("Whether to follow Parquet's format specification when converting Parquet schema to " +
      "Spark SQL schema and vice versa.")
    .booleanConf
    .createWithDefault(false)

  val PARQUET_OUTPUT_COMMITTER_CLASS = SQLConfigBuilder("spark.sql.parquet.output.committer.class")
    .doc("The output committer class used by Parquet. The specified class needs to be a " +
      "subclass of org.apache.hadoop.mapreduce.OutputCommitter.  Typically, it's also a subclass " +
      "of org.apache.parquet.hadoop.ParquetOutputCommitter.")
    .internal()
    .stringConf
    .createWithDefault(classOf[ParquetOutputCommitter].getName)

  val PARQUET_VECTORIZED_READER_ENABLED =
    SQLConfigBuilder("spark.sql.parquet.enableVectorizedReader")
      .doc("Enables vectorized parquet decoding.")
      .booleanConf
      .createWithDefault(true)

Appendix: Spark code generated for parquet reading

/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private scala.collection.Iterator scan_input;
/* 009 */   private org.apache.spark.sql.execution.metric.SQLMetric scan_numOutputRows;
/* 010 */   private org.apache.spark.sql.execution.metric.SQLMetric scan_scanTime;
/* 011 */   private long scan_scanTime1;
/* 012 */   private org.apache.spark.sql.execution.vectorized.ColumnarBatch scan_batch;
/* 013 */   private int scan_batchIdx;
/* 014 */   private org.apache.spark.sql.execution.vectorized.ColumnVector scan_colInstance0;
/* 015 */   private org.apache.spark.sql.execution.vectorized.ColumnVector scan_colInstance1;
/* 016 */   private UnsafeRow scan_result;
/* 017 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder scan_holder;
/* 018 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter scan_rowWriter;
/* 019 */
/* 020 */   public GeneratedIterator(Object[] references) {
/* 021 */     this.references = references;
/* 022 */   }
/* 023 */
/* 024 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 025 */     partitionIndex = index;
/* 026 */     this.inputs = inputs;
/* 027 */     scan_input = inputs[0];
/* 028 */     this.scan_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0];
/* 029 */     this.scan_scanTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[1];
/* 030 */     scan_scanTime1 = 0;
/* 031 */     scan_batch = null;
/* 032 */     scan_batchIdx = 0;
/* 033 */     scan_colInstance0 = null;
/* 034 */     scan_colInstance1 = null;
/* 035 */     scan_result = new UnsafeRow(2);
/* 036 */     this.scan_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(scan_result, 32);
/* 037 */     this.scan_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(scan_holder, 2);
/* 038 */
/* 039 */   }
/* 040 */
/* 041 */   private void scan_nextBatch() throws java.io.IOException {
/* 042 */     long getBatchStart = System.nanoTime();
/* 043 */     if (scan_input.hasNext()) {
/* 044 */       scan_batch = (org.apache.spark.sql.execution.vectorized.ColumnarBatch)scan_input.next();
/* 045 */       scan_numOutputRows.add(scan_batch.numRows());
/* 046 */       scan_batchIdx = 0;
/* 047 */       scan_colInstance0 = scan_batch.column(0);
/* 048 */       scan_colInstance1 = scan_batch.column(1);
/* 049 */
/* 050 */     }
/* 051 */     scan_scanTime1 += System.nanoTime() - getBatchStart;
/* 052 */   }
/* 053 */
/* 054 */   protected void processNext() throws java.io.IOException {
/* 055 */     if (scan_batch == null) {
/* 056 */       scan_nextBatch();
/* 057 */     }
/* 058 */     while (scan_batch != null) {
/* 059 */       int numRows = scan_batch.numRows();
/* 060 */       while (scan_batchIdx < numRows) {
/* 061 */         int scan_rowIdx = scan_batchIdx++;
/* 062 */         boolean scan_isNull = scan_colInstance0.isNullAt(scan_rowIdx);
/* 063 */         int scan_value = scan_isNull ? -1 : (scan_colInstance0.getInt(scan_rowIdx));
/* 064 */         boolean scan_isNull1 = scan_colInstance1.isNullAt(scan_rowIdx);
/* 065 */         byte[] scan_value1 = scan_isNull1 ? null : (scan_colInstance1.getBinary(scan_rowIdx));
/* 066 */         scan_holder.reset();
/* 067 */
/* 068 */         scan_rowWriter.zeroOutNullBytes();
/* 069 */
/* 070 */         if (scan_isNull) {
/* 071 */           scan_rowWriter.setNullAt(0);
/* 072 */         } else {
/* 073 */           scan_rowWriter.write(0, scan_value);
/* 074 */         }
/* 075 */
/* 076 */         if (scan_isNull1) {
/* 077 */           scan_rowWriter.setNullAt(1);
/* 078 */         } else {
/* 079 */           scan_rowWriter.write(1, scan_value1);
/* 080 */         }
/* 081 */         scan_result.setTotalSize(scan_holder.totalSize());
/* 082 */         append(scan_result);
/* 083 */         if (shouldStop()) return;
/* 084 */       }
/* 085 */       scan_batch = null;
/* 086 */       scan_nextBatch();
/* 087 */     }
/* 088 */     scan_scanTime.add(scan_scanTime1 / (1000 * 1000));
/* 089 */     scan_scanTime1 = 0;
/* 090 */   }
/* 091 */ }