In this post I will try to explain what happens when Apache Spark tries to read a parquet file. Apache Parquet is a popular columnar storage format which stores its data as a bunch of files. Typically these files are stored on HDFS. In a seprate post I will explain more details about the internals of Parquet, but for here we focus on what happens when you call
 val parquetFileDF = spark.read.parquet("intWithPayload.parquet")
as documented in the Spark SQL programming guide.
Note: This blog post is work in progress with its content, accuracy, and of course, formatting.
This commentary is made on the 2.1 version of the source code, with 
the Whole Stage Code Generation (WSCG) on.
In this example, I am trying to read a file which was generated by the 
Parquet Generator Tool.
The schema for intWithPayload.parquet file is <int, Array[Byte]>.
This detail is important because it dictates how WSCG is done. 
See the end of this page.
In Spark SQL, various operations are implemented in their respective 
classes. You can find them having Exec as a suffix in their name.
Step 1: So for reading a data source, we look into DataSourceScanExec class. 
  From here, the code somehow ends up in the ParquetFileFormat class. 
  I am not entirely clear how does this happen, but it makes sense. 
  Various input file formats are implemented this way. We need to trace 
  whole-stage-code-generation path with the batch reading support. Anyways
  moving on.
Step 2: ParquetFileFormat has a buildReader function that returns a 
  (PartitionedFile => Iterator[InternalRow]) function. The iterator 
  in this function is generated as:
  val iter = new RecordReaderIterator(parquetReader)
Here parquetReader is of type VectorizedParquetRecordReader. 
  The RecordReaderIterator class wraps a Scala iterator 
  (we discuss later how this iterator is consumed, step 4) 
  around Hadoop style RecordReader<K, V>, which is implemented 
  by VectorizedParquetRecordReader 
  (and its base class SpecificParquetRecordReaderBase<Object>).
Step 3: What does VectorizedParquetRecordReader do? According to the 
  comment in the file,” A specialized RecordReader that reads into 
  InternalRows or ColumnarBatches directly using the Parquet column APIs. 
  This is somewhat based on parquet-mr’s ColumnReader “. After 
  object allocation VectorizedParquetRecordReader, 
  initialize(split, hadoopAttemptContext) and 
  initBatch(partitionSchema, file.partitionValues) functions are called.
Step 3(i): initialize calls the initialize function of the super class 
  SpecificParquetRecordReaderBase. Here, the file schema is read, 
  requested schema is inferred, and a reader of 
  type ParquetFileReader is allocated in the base class. 
  By the end of the SpecificParquetRecordReaderBase.initialize(...) 
  we know how many rows are there in the InputFileSplit. 
  This is stored in the totalRowCount variable.
Step 3(ii): initBatch mainly allocates columnarBatch object. 
  We discuss this class in more detail later.
Step 4: Implementation of the RecordReader interface in 
  VectorizedParquetRecordReader demands more attention, because 
  it what what is called when the iterator wrapper from the Step 2 is 
  consumed (where?, we tell later). Upon calling nextKeyValue(), 
  the function first calls resultBatch (which becomes no-op, 
  if columnarBatch is already initialized, which it is, see step 
  3.ii), and then to nextBatch(). Remember, we always operate in 
  the batch mode (returnColumnarBatch is set to true). 
  The nextBatch function fills (how? we tell later) columnarBatch 
  with data and this variable is returned in getCurrentValue function. 
  getCurrentKey always returns null and is implemented in the 
  base class of SpecificParquetRecordReaderBase.
So, now we know what variable is returned in the iterator. Now from here 
there are two directions. We first describe how ColumnarBatch is filled 
with the parquet data. And then we describe who consumes the mysterious 
iterator that is generated in the beginning (Step 2, val iter), where 
ColumnarBatch is returned and what happens with it?
ColumnarBatch is filled?In the VectorizedParquetRecordReader.nextBatch() function, if it 
has not read all the rows, it calls checkEndOfRowGroup() function. 
the checkEndOfRowGroup function then reads a rowGroup
(you can think of a row group as a collection of certain number of rows
stored in columnar format), and then allocates a VectorizedColumnReader 
object for each requested column in the requestedSchema. 
The VectorizedColumnReader constructor takes a ColumnDescriptor 
(can be found in the schema) and a PageReader 
(can be found from the rowGroup, a Parquet API call). 
Also missingColumns is a bitmap of missing columns (probably the ones 
which are missing or ones which Spark does not intend to read). Then 
in the nextBatch function readBatch(num, columnarBatch.column(i)); 
is called on all the VectorizedColumnReader objects allocated 
before in the checkEndOfRowGroup function (essentially per column). 
(So, ColumnarBatch and ColumnVector are just raw piece of 
memory which are used by the VectorizedColumnReader). So here 
in the readBatch, number of rows is passed and ColumnVector 
(stored in ColumnarBatch). So what is ColumnVector? We can 
think of this as an array of types, index by the rowId.
/** 
 * This class represents a column of values and provides the main APIs 
 * to access the data values. It supports all the types and contains 
 * get/put APIs as well as their batched versions. The batched versions 
 * are preferable whenever possible.
 * ...
*/
public abstract class ColumnVector implements AutoCloseable
Anyways, coming back to the point. The raw data is stored in the 
ColumnVector, which themselves are stored in a ColumnBatch object. 
ColumnVector is what is passed as the storage space in the 
readBatch function. Now inside the readBatch function, it 
first calls readPage() function which see which version of the 
parquet file we are reading (v1 or v2, I don’t know the difference), 
and then initializes a bunch of objects, namely, 
defColumn:VectorizedRleValuesReader, 
repetitionLevelColumn:ValuesReaderIntIterator, 
definitionLevelColumn:ValuesReaderIntIterator, and 
dataColumn:VectorizedRleValuesReader. 
Out of these variables ValuesReaderIntIterator is from 
parquet-mr, and VectorizedRleValuesReader from Spark. 
From here on, I do not understand completely what is happening. 
There are a bunch of read[Type]Batch() functions which are called, 
which in turn call defColumn.read[Type]s() functions. ([Type] here 
is some type like Int, Short, Binary, etc.). In these functions on 
VectorizedRleValuesReader, data is read, de-coded (perhaps from RLE) 
and then inserted into ColumnVector which is passed here. 
So much so for this.
Now the second part. The iterator returns two different types based on if reader is in the batch mode or not. The code looks something like this:
@Override
  public Object getCurrentValue() throws IOException, InterruptedException {
    if (returnColumnarBatch) return columnarBatch;
    return columnarBatch.getRow(batchIdx - 1);
  }
columnarBatch is of type ColumnarBatch and columnarBatch.getRow 
returns a nested class of type ColumnarBatch.Row. Anyways coming back 
to the point, we are tracking the batch mode. Hence, the value the 
iterator returns is of type ColumnarBatch. This iterator is 
somehow passed to the wholestage code generation. The code 
that consumes the iterator and materializes 
UnsafeRow is documented at the end of this post 
(also here). 
So what is happening here? In the scan_nextBatch function we read 
the new value of the the ColumnarBatch by calling next(). Then 
we acquire ColumnVectors objects (see scan_colInstance0 and 
scan_colInstance1 variables). ColumnarBatch tells how many rows 
it has numRows(), and it just calls ColumnVector objects with 
get[Type](rowId:Int) to get the final value.
These materialized values are then represented as 
UnsafeRow with the help of BufferHolder and UnsafeRowWriter objects, 
allocated as
/* 035 */     scan_result = new UnsafeRow(2);
/* 036 */     this.scan_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(scan_result, 32);
/* 037 */     this.scan_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(scan_holder, 2);
From here on, I am not sure who consumes this 
org.apache.spark.sql.execution.BufferedRowIterator (the parent class) iterator. 
There is some hint here, in the WholeStageCodegenExec class
val rdds = child.asInstanceOf[CodegenSupport].inputRDDs()
assert(rdds.size <= 2, "Up to two input RDDs can be supported")
if (rdds.length == 1) {
      rdds.head.mapPartitionsWithIndex { (index, iter) =>
        val clazz = CodeGenerator.compile(cleanedSource)
        val buffer = clazz.generate(references).asInstanceOf[BufferedRowIterator]
        buffer.init(index, Array(iter))
        new Iterator[InternalRow] {
          override def hasNext: Boolean = {
            val v = buffer.hasNext
            if (!v) durationMs += buffer.durationMs()
            v
          }
          override def next: InternalRow = buffer.next()
        }
      }
    }
If I have to speculate this I would say that rdd is already laid out by 
the operators that implement them. For example, in FileSourceScanExec 
which implements DataSourceScanExec (which is a trait),
private lazy val inputRDD: RDD[InternalRow] = {
    val readFile: (PartitionedFile) => Iterator[InternalRow] =
      relation.fileFormat.buildReaderWithPartitionValues(
        sparkSession = relation.sparkSession,
        dataSchema = relation.dataSchema,
        partitionSchema = relation.partitionSchema,
        requiredSchema = outputSchema,
        filters = dataFilters,
        options = relation.options,
        hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
    relation.bucketSpec match {
      case Some(bucketing) if relation.sparkSession.sessionState.conf.bucketingEnabled =>
        createBucketedReadRDD(bucketing, readFile, selectedPartitions, relation)
      case _ =>
        createNonBucketedReadRDD(readFile, selectedPartitions, relation)
    }
  }
 val PARQUET_SCHEMA_MERGING_ENABLED = SQLConfigBuilder("spark.sql.parquet.mergeSchema")
    .doc("When true, the Parquet data source merges schemas collected from all data files, " +
         "otherwise the schema is picked from the summary file or a random data file " +
         "if no summary file is available.")
    .booleanConf
    .createWithDefault(false)
  val PARQUET_SCHEMA_RESPECT_SUMMARIES = SQLConfigBuilder("spark.sql.parquet.respectSummaryFiles")
    .doc("When true, we make assumption that all part-files of Parquet are consistent with " +
         "summary files and we will ignore them when merging schema. Otherwise, if this is " +
         "false, which is the default, we will merge all part-files. This should be considered " +
         "as expert-only option, and shouldn't be enabled before knowing what it means exactly.")
    .booleanConf
    .createWithDefault(false)
  val PARQUET_BINARY_AS_STRING = SQLConfigBuilder("spark.sql.parquet.binaryAsString")
    .doc("Some other Parquet-producing systems, in particular Impala and older versions of " +
      "Spark SQL, do not differentiate between binary data and strings when writing out the " +
      "Parquet schema. This flag tells Spark SQL to interpret binary data as a string to provide " +
      "compatibility with these systems.")
    .booleanConf
    .createWithDefault(false)
  val PARQUET_INT96_AS_TIMESTAMP = SQLConfigBuilder("spark.sql.parquet.int96AsTimestamp")
    .doc("Some Parquet-producing systems, in particular Impala, store Timestamp into INT96. " +
      "Spark would also store Timestamp as INT96 because we need to avoid precision lost of the " +
      "nanoseconds field. This flag tells Spark SQL to interpret INT96 data as a timestamp to " +
      "provide compatibility with these systems.")
    .booleanConf
    .createWithDefault(true)
  val PARQUET_CACHE_METADATA = SQLConfigBuilder("spark.sql.parquet.cacheMetadata")
    .doc("Turns on caching of Parquet schema metadata. Can speed up querying of static data.")
    .booleanConf
    .createWithDefault(true)
  val PARQUET_COMPRESSION = SQLConfigBuilder("spark.sql.parquet.compression.codec")
    .doc("Sets the compression codec use when writing Parquet files. Acceptable values include: " +
      "uncompressed, snappy, gzip, lzo.")
    .stringConf
    .transform(_.toLowerCase())
    .checkValues(Set("uncompressed", "snappy", "gzip", "lzo"))
    .createWithDefault("snappy")
  val PARQUET_FILTER_PUSHDOWN_ENABLED = SQLConfigBuilder("spark.sql.parquet.filterPushdown")
    .doc("Enables Parquet filter push-down optimization when set to true.")
    .booleanConf
    .createWithDefault(true)
  val PARQUET_WRITE_LEGACY_FORMAT = SQLConfigBuilder("spark.sql.parquet.writeLegacyFormat")
    .doc("Whether to follow Parquet's format specification when converting Parquet schema to " +
      "Spark SQL schema and vice versa.")
    .booleanConf
    .createWithDefault(false)
  val PARQUET_OUTPUT_COMMITTER_CLASS = SQLConfigBuilder("spark.sql.parquet.output.committer.class")
    .doc("The output committer class used by Parquet. The specified class needs to be a " +
      "subclass of org.apache.hadoop.mapreduce.OutputCommitter.  Typically, it's also a subclass " +
      "of org.apache.parquet.hadoop.ParquetOutputCommitter.")
    .internal()
    .stringConf
    .createWithDefault(classOf[ParquetOutputCommitter].getName)
  val PARQUET_VECTORIZED_READER_ENABLED =
    SQLConfigBuilder("spark.sql.parquet.enableVectorizedReader")
      .doc("Enables vectorized parquet decoding.")
      .booleanConf
      .createWithDefault(true)
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private scala.collection.Iterator scan_input;
/* 009 */   private org.apache.spark.sql.execution.metric.SQLMetric scan_numOutputRows;
/* 010 */   private org.apache.spark.sql.execution.metric.SQLMetric scan_scanTime;
/* 011 */   private long scan_scanTime1;
/* 012 */   private org.apache.spark.sql.execution.vectorized.ColumnarBatch scan_batch;
/* 013 */   private int scan_batchIdx;
/* 014 */   private org.apache.spark.sql.execution.vectorized.ColumnVector scan_colInstance0;
/* 015 */   private org.apache.spark.sql.execution.vectorized.ColumnVector scan_colInstance1;
/* 016 */   private UnsafeRow scan_result;
/* 017 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder scan_holder;
/* 018 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter scan_rowWriter;
/* 019 */
/* 020 */   public GeneratedIterator(Object[] references) {
/* 021 */     this.references = references;
/* 022 */   }
/* 023 */
/* 024 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 025 */     partitionIndex = index;
/* 026 */     this.inputs = inputs;
/* 027 */     scan_input = inputs[0];
/* 028 */     this.scan_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0];
/* 029 */     this.scan_scanTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[1];
/* 030 */     scan_scanTime1 = 0;
/* 031 */     scan_batch = null;
/* 032 */     scan_batchIdx = 0;
/* 033 */     scan_colInstance0 = null;
/* 034 */     scan_colInstance1 = null;
/* 035 */     scan_result = new UnsafeRow(2);
/* 036 */     this.scan_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(scan_result, 32);
/* 037 */     this.scan_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(scan_holder, 2);
/* 038 */
/* 039 */   }
/* 040 */
/* 041 */   private void scan_nextBatch() throws java.io.IOException {
/* 042 */     long getBatchStart = System.nanoTime();
/* 043 */     if (scan_input.hasNext()) {
/* 044 */       scan_batch = (org.apache.spark.sql.execution.vectorized.ColumnarBatch)scan_input.next();
/* 045 */       scan_numOutputRows.add(scan_batch.numRows());
/* 046 */       scan_batchIdx = 0;
/* 047 */       scan_colInstance0 = scan_batch.column(0);
/* 048 */       scan_colInstance1 = scan_batch.column(1);
/* 049 */
/* 050 */     }
/* 051 */     scan_scanTime1 += System.nanoTime() - getBatchStart;
/* 052 */   }
/* 053 */
/* 054 */   protected void processNext() throws java.io.IOException {
/* 055 */     if (scan_batch == null) {
/* 056 */       scan_nextBatch();
/* 057 */     }
/* 058 */     while (scan_batch != null) {
/* 059 */       int numRows = scan_batch.numRows();
/* 060 */       while (scan_batchIdx < numRows) {
/* 061 */         int scan_rowIdx = scan_batchIdx++;
/* 062 */         boolean scan_isNull = scan_colInstance0.isNullAt(scan_rowIdx);
/* 063 */         int scan_value = scan_isNull ? -1 : (scan_colInstance0.getInt(scan_rowIdx));
/* 064 */         boolean scan_isNull1 = scan_colInstance1.isNullAt(scan_rowIdx);
/* 065 */         byte[] scan_value1 = scan_isNull1 ? null : (scan_colInstance1.getBinary(scan_rowIdx));
/* 066 */         scan_holder.reset();
/* 067 */
/* 068 */         scan_rowWriter.zeroOutNullBytes();
/* 069 */
/* 070 */         if (scan_isNull) {
/* 071 */           scan_rowWriter.setNullAt(0);
/* 072 */         } else {
/* 073 */           scan_rowWriter.write(0, scan_value);
/* 074 */         }
/* 075 */
/* 076 */         if (scan_isNull1) {
/* 077 */           scan_rowWriter.setNullAt(1);
/* 078 */         } else {
/* 079 */           scan_rowWriter.write(1, scan_value1);
/* 080 */         }
/* 081 */         scan_result.setTotalSize(scan_holder.totalSize());
/* 082 */         append(scan_result);
/* 083 */         if (shouldStop()) return;
/* 084 */       }
/* 085 */       scan_batch = null;
/* 086 */       scan_nextBatch();
/* 087 */     }
/* 088 */     scan_scanTime.add(scan_scanTime1 / (1000 * 1000));
/* 089 */     scan_scanTime1 = 0;
/* 090 */   }
/* 091 */ }