In this post I will try to explain what happens when Apache Spark tries to read a parquet file. Apache Parquet is a popular columnar storage format which stores its data as a bunch of files. Typically these files are stored on HDFS. In a seprate post I will explain more details about the internals of Parquet, but for here we focus on what happens when you call
val parquetFileDF = spark.read.parquet("intWithPayload.parquet")
as documented in the Spark SQL programming guide.
Note: This blog post is work in progress with its content, accuracy, and of course, formatting.
This commentary is made on the 2.1
version of the source code, with
the Whole Stage Code Generation (WSCG) on.
In this example, I am trying to read a file which was generated by the
Parquet Generator Tool.
The schema for intWithPayload.parquet
file is <int, Array[Byte]>
.
This detail is important because it dictates how WSCG is done.
See the end of this page.
In Spark SQL, various operations are implemented in their respective
classes. You can find them having Exec
as a suffix in their name.
Step 1: So for reading a data source, we look into DataSourceScanExec
class.
From here, the code somehow ends up in the ParquetFileFormat
class.
I am not entirely clear how does this happen, but it makes sense.
Various input file formats are implemented this way. We need to trace
whole-stage-code-generation path with the batch reading support. Anyways
moving on.
Step 2: ParquetFileFormat
has a buildReader
function that returns a
(PartitionedFile => Iterator[InternalRow])
function. The iterator
in this function is generated as:
val iter = new RecordReaderIterator(parquetReader)
Here parquetReader
is of type VectorizedParquetRecordReader
.
The RecordReaderIterator
class wraps a Scala iterator
(we discuss later how this iterator is consumed, step 4)
around Hadoop style RecordReader<K, V>
, which is implemented
by VectorizedParquetRecordReader
(and its base class SpecificParquetRecordReaderBase<Object>
).
Step 3: What does VectorizedParquetRecordReader
do? According to the
comment in the file,” A specialized RecordReader that reads into
InternalRows or ColumnarBatches directly using the Parquet column APIs.
This is somewhat based on parquet-mr’s ColumnReader “. After
object allocation VectorizedParquetRecordReader
,
initialize(split, hadoopAttemptContext)
and
initBatch(partitionSchema, file.partitionValues)
functions are called.
Step 3(i): initialize
calls the initialize function of the super class
SpecificParquetRecordReaderBase
. Here, the file schema is read,
requested schema is inferred, and a reader
of
type ParquetFileReader
is allocated in the base class.
By the end of the SpecificParquetRecordReaderBase.initialize(...)
we know how many rows are there in the InputFileSplit
.
This is stored in the totalRowCount
variable.
Step 3(ii): initBatch
mainly allocates columnarBatch
object.
We discuss this class in more detail later.
Step 4: Implementation of the RecordReader
interface in
VectorizedParquetRecordReader
demands more attention, because
it what what is called when the iterator wrapper from the Step 2 is
consumed (where?, we tell later). Upon calling nextKeyValue()
,
the function first calls resultBatch
(which becomes no-op,
if columnarBatch
is already initialized, which it is, see step
3.ii), and then to nextBatch()
. Remember, we always operate in
the batch mode (returnColumnarBatch
is set to true).
The nextBatch
function fills (how? we tell later) columnarBatch
with data and this variable is returned in getCurrentValue
function.
getCurrentKey
always returns null
and is implemented in the
base class of SpecificParquetRecordReaderBase
.
So, now we know what variable is returned in the iterator. Now from here
there are two directions. We first describe how ColumnarBatch
is filled
with the parquet data. And then we describe who consumes the mysterious
iterator that is generated in the beginning (Step 2, val iter
), where
ColumnarBatch
is returned and what happens with it?
ColumnarBatch
is filled?In the VectorizedParquetRecordReader.nextBatch()
function, if it
has not read all the rows, it calls checkEndOfRowGroup()
function.
the checkEndOfRowGroup
function then reads a rowGroup
(you can think of a row group as a collection of certain number of rows
stored in columnar format), and then allocates a VectorizedColumnReader
object for each requested column in the requestedSchema
.
The VectorizedColumnReader
constructor takes a ColumnDescriptor
(can be found in the schema) and a PageReader
(can be found from the rowGroup
, a Parquet API call).
Also missingColumns
is a bitmap of missing columns (probably the ones
which are missing or ones which Spark does not intend to read). Then
in the nextBatch
function readBatch(num, columnarBatch.column(i));
is called on all the VectorizedColumnReader
objects allocated
before in the checkEndOfRowGroup
function (essentially per column).
(So, ColumnarBatch
and ColumnVector
are just raw piece of
memory which are used by the VectorizedColumnReader
). So here
in the readBatch
, number of rows is passed and ColumnVector
(stored in ColumnarBatch
). So what is ColumnVector
? We can
think of this as an array of types, index by the rowId.
/**
* This class represents a column of values and provides the main APIs
* to access the data values. It supports all the types and contains
* get/put APIs as well as their batched versions. The batched versions
* are preferable whenever possible.
* ...
*/
public abstract class ColumnVector implements AutoCloseable
Anyways, coming back to the point. The raw data is stored in the
ColumnVector
, which themselves are stored in a ColumnBatch
object.
ColumnVector
is what is passed as the storage space in the
readBatch
function. Now inside the readBatch
function, it
first calls readPage()
function which see which version of the
parquet file we are reading (v1
or v2
, I don’t know the difference),
and then initializes a bunch of objects, namely,
defColumn:VectorizedRleValuesReader
,
repetitionLevelColumn:ValuesReaderIntIterator
,
definitionLevelColumn:ValuesReaderIntIterator
, and
dataColumn:VectorizedRleValuesReader
.
Out of these variables ValuesReaderIntIterator
is from
parquet-mr
, and VectorizedRleValuesReader
from Spark.
From here on, I do not understand completely what is happening.
There are a bunch of read[Type]Batch()
functions which are called,
which in turn call defColumn.read[Type]s()
functions. ([Type]
here
is some type like Int, Short, Binary, etc.). In these functions on
VectorizedRleValuesReader
, data is read, de-coded (perhaps from RLE)
and then inserted into ColumnVector
which is passed here.
So much so for this.
Now the second part. The iterator returns two different types based on if reader is in the batch mode or not. The code looks something like this:
@Override
public Object getCurrentValue() throws IOException, InterruptedException {
if (returnColumnarBatch) return columnarBatch;
return columnarBatch.getRow(batchIdx - 1);
}
columnarBatch
is of type ColumnarBatch
and columnarBatch.getRow
returns a nested class of type ColumnarBatch.Row
. Anyways coming back
to the point, we are tracking the batch mode. Hence, the value the
iterator returns is of type ColumnarBatch
. This iterator is
somehow passed to the wholestage code generation. The code
that consumes the iterator and materializes
UnsafeRow
is documented at the end of this post
(also here).
So what is happening here? In the scan_nextBatch
function we read
the new value of the the ColumnarBatch
by calling next()
. Then
we acquire ColumnVector
s objects (see scan_colInstance0
and
scan_colInstance1
variables). ColumnarBatch
tells how many rows
it has numRows()
, and it just calls ColumnVector
objects with
get[Type](rowId:Int)
to get the final value.
These materialized values are then represented as
UnsafeRow
with the help of BufferHolder
and UnsafeRowWriter
objects,
allocated as
/* 035 */ scan_result = new UnsafeRow(2);
/* 036 */ this.scan_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(scan_result, 32);
/* 037 */ this.scan_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(scan_holder, 2);
From here on, I am not sure who consumes this
org.apache.spark.sql.execution.BufferedRowIterator
(the parent class) iterator.
There is some hint here, in the WholeStageCodegenExec
class
val rdds = child.asInstanceOf[CodegenSupport].inputRDDs()
assert(rdds.size <= 2, "Up to two input RDDs can be supported")
if (rdds.length == 1) {
rdds.head.mapPartitionsWithIndex { (index, iter) =>
val clazz = CodeGenerator.compile(cleanedSource)
val buffer = clazz.generate(references).asInstanceOf[BufferedRowIterator]
buffer.init(index, Array(iter))
new Iterator[InternalRow] {
override def hasNext: Boolean = {
val v = buffer.hasNext
if (!v) durationMs += buffer.durationMs()
v
}
override def next: InternalRow = buffer.next()
}
}
}
If I have to speculate this I would say that rdd
is already laid out by
the operators that implement them. For example, in FileSourceScanExec
which implements DataSourceScanExec
(which is a trait),
private lazy val inputRDD: RDD[InternalRow] = {
val readFile: (PartitionedFile) => Iterator[InternalRow] =
relation.fileFormat.buildReaderWithPartitionValues(
sparkSession = relation.sparkSession,
dataSchema = relation.dataSchema,
partitionSchema = relation.partitionSchema,
requiredSchema = outputSchema,
filters = dataFilters,
options = relation.options,
hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
relation.bucketSpec match {
case Some(bucketing) if relation.sparkSession.sessionState.conf.bucketingEnabled =>
createBucketedReadRDD(bucketing, readFile, selectedPartitions, relation)
case _ =>
createNonBucketedReadRDD(readFile, selectedPartitions, relation)
}
}
val PARQUET_SCHEMA_MERGING_ENABLED = SQLConfigBuilder("spark.sql.parquet.mergeSchema")
.doc("When true, the Parquet data source merges schemas collected from all data files, " +
"otherwise the schema is picked from the summary file or a random data file " +
"if no summary file is available.")
.booleanConf
.createWithDefault(false)
val PARQUET_SCHEMA_RESPECT_SUMMARIES = SQLConfigBuilder("spark.sql.parquet.respectSummaryFiles")
.doc("When true, we make assumption that all part-files of Parquet are consistent with " +
"summary files and we will ignore them when merging schema. Otherwise, if this is " +
"false, which is the default, we will merge all part-files. This should be considered " +
"as expert-only option, and shouldn't be enabled before knowing what it means exactly.")
.booleanConf
.createWithDefault(false)
val PARQUET_BINARY_AS_STRING = SQLConfigBuilder("spark.sql.parquet.binaryAsString")
.doc("Some other Parquet-producing systems, in particular Impala and older versions of " +
"Spark SQL, do not differentiate between binary data and strings when writing out the " +
"Parquet schema. This flag tells Spark SQL to interpret binary data as a string to provide " +
"compatibility with these systems.")
.booleanConf
.createWithDefault(false)
val PARQUET_INT96_AS_TIMESTAMP = SQLConfigBuilder("spark.sql.parquet.int96AsTimestamp")
.doc("Some Parquet-producing systems, in particular Impala, store Timestamp into INT96. " +
"Spark would also store Timestamp as INT96 because we need to avoid precision lost of the " +
"nanoseconds field. This flag tells Spark SQL to interpret INT96 data as a timestamp to " +
"provide compatibility with these systems.")
.booleanConf
.createWithDefault(true)
val PARQUET_CACHE_METADATA = SQLConfigBuilder("spark.sql.parquet.cacheMetadata")
.doc("Turns on caching of Parquet schema metadata. Can speed up querying of static data.")
.booleanConf
.createWithDefault(true)
val PARQUET_COMPRESSION = SQLConfigBuilder("spark.sql.parquet.compression.codec")
.doc("Sets the compression codec use when writing Parquet files. Acceptable values include: " +
"uncompressed, snappy, gzip, lzo.")
.stringConf
.transform(_.toLowerCase())
.checkValues(Set("uncompressed", "snappy", "gzip", "lzo"))
.createWithDefault("snappy")
val PARQUET_FILTER_PUSHDOWN_ENABLED = SQLConfigBuilder("spark.sql.parquet.filterPushdown")
.doc("Enables Parquet filter push-down optimization when set to true.")
.booleanConf
.createWithDefault(true)
val PARQUET_WRITE_LEGACY_FORMAT = SQLConfigBuilder("spark.sql.parquet.writeLegacyFormat")
.doc("Whether to follow Parquet's format specification when converting Parquet schema to " +
"Spark SQL schema and vice versa.")
.booleanConf
.createWithDefault(false)
val PARQUET_OUTPUT_COMMITTER_CLASS = SQLConfigBuilder("spark.sql.parquet.output.committer.class")
.doc("The output committer class used by Parquet. The specified class needs to be a " +
"subclass of org.apache.hadoop.mapreduce.OutputCommitter. Typically, it's also a subclass " +
"of org.apache.parquet.hadoop.ParquetOutputCommitter.")
.internal()
.stringConf
.createWithDefault(classOf[ParquetOutputCommitter].getName)
val PARQUET_VECTORIZED_READER_ENABLED =
SQLConfigBuilder("spark.sql.parquet.enableVectorizedReader")
.doc("Enables vectorized parquet decoding.")
.booleanConf
.createWithDefault(true)
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */ private Object[] references;
/* 007 */ private scala.collection.Iterator[] inputs;
/* 008 */ private scala.collection.Iterator scan_input;
/* 009 */ private org.apache.spark.sql.execution.metric.SQLMetric scan_numOutputRows;
/* 010 */ private org.apache.spark.sql.execution.metric.SQLMetric scan_scanTime;
/* 011 */ private long scan_scanTime1;
/* 012 */ private org.apache.spark.sql.execution.vectorized.ColumnarBatch scan_batch;
/* 013 */ private int scan_batchIdx;
/* 014 */ private org.apache.spark.sql.execution.vectorized.ColumnVector scan_colInstance0;
/* 015 */ private org.apache.spark.sql.execution.vectorized.ColumnVector scan_colInstance1;
/* 016 */ private UnsafeRow scan_result;
/* 017 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder scan_holder;
/* 018 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter scan_rowWriter;
/* 019 */
/* 020 */ public GeneratedIterator(Object[] references) {
/* 021 */ this.references = references;
/* 022 */ }
/* 023 */
/* 024 */ public void init(int index, scala.collection.Iterator[] inputs) {
/* 025 */ partitionIndex = index;
/* 026 */ this.inputs = inputs;
/* 027 */ scan_input = inputs[0];
/* 028 */ this.scan_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0];
/* 029 */ this.scan_scanTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[1];
/* 030 */ scan_scanTime1 = 0;
/* 031 */ scan_batch = null;
/* 032 */ scan_batchIdx = 0;
/* 033 */ scan_colInstance0 = null;
/* 034 */ scan_colInstance1 = null;
/* 035 */ scan_result = new UnsafeRow(2);
/* 036 */ this.scan_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(scan_result, 32);
/* 037 */ this.scan_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(scan_holder, 2);
/* 038 */
/* 039 */ }
/* 040 */
/* 041 */ private void scan_nextBatch() throws java.io.IOException {
/* 042 */ long getBatchStart = System.nanoTime();
/* 043 */ if (scan_input.hasNext()) {
/* 044 */ scan_batch = (org.apache.spark.sql.execution.vectorized.ColumnarBatch)scan_input.next();
/* 045 */ scan_numOutputRows.add(scan_batch.numRows());
/* 046 */ scan_batchIdx = 0;
/* 047 */ scan_colInstance0 = scan_batch.column(0);
/* 048 */ scan_colInstance1 = scan_batch.column(1);
/* 049 */
/* 050 */ }
/* 051 */ scan_scanTime1 += System.nanoTime() - getBatchStart;
/* 052 */ }
/* 053 */
/* 054 */ protected void processNext() throws java.io.IOException {
/* 055 */ if (scan_batch == null) {
/* 056 */ scan_nextBatch();
/* 057 */ }
/* 058 */ while (scan_batch != null) {
/* 059 */ int numRows = scan_batch.numRows();
/* 060 */ while (scan_batchIdx < numRows) {
/* 061 */ int scan_rowIdx = scan_batchIdx++;
/* 062 */ boolean scan_isNull = scan_colInstance0.isNullAt(scan_rowIdx);
/* 063 */ int scan_value = scan_isNull ? -1 : (scan_colInstance0.getInt(scan_rowIdx));
/* 064 */ boolean scan_isNull1 = scan_colInstance1.isNullAt(scan_rowIdx);
/* 065 */ byte[] scan_value1 = scan_isNull1 ? null : (scan_colInstance1.getBinary(scan_rowIdx));
/* 066 */ scan_holder.reset();
/* 067 */
/* 068 */ scan_rowWriter.zeroOutNullBytes();
/* 069 */
/* 070 */ if (scan_isNull) {
/* 071 */ scan_rowWriter.setNullAt(0);
/* 072 */ } else {
/* 073 */ scan_rowWriter.write(0, scan_value);
/* 074 */ }
/* 075 */
/* 076 */ if (scan_isNull1) {
/* 077 */ scan_rowWriter.setNullAt(1);
/* 078 */ } else {
/* 079 */ scan_rowWriter.write(1, scan_value1);
/* 080 */ }
/* 081 */ scan_result.setTotalSize(scan_holder.totalSize());
/* 082 */ append(scan_result);
/* 083 */ if (shouldStop()) return;
/* 084 */ }
/* 085 */ scan_batch = null;
/* 086 */ scan_nextBatch();
/* 087 */ }
/* 088 */ scan_scanTime.add(scan_scanTime1 / (1000 * 1000));
/* 089 */ scan_scanTime1 = 0;
/* 090 */ }
/* 091 */ }