Navigating the Data Lake: An In-depth Journey from MySQL to HDFS via Kafka, HUDI, Spark, and Superset

Shiksha Engineering
6 min readJul 11, 2023

Author : Ankit Bansal

Introduction

Today’s digital era is driven by data, and the management and analysis of this data are essential for successful business outcomes. In this blog post, we embark on an insightful journey through a data lake that harnesses the power of MySQL, Kafka, HDFS, HUDI, Spark, Presto, and Superset. We will traverse the journey of data from MySQL to the Hadoop Distributed File System (HDFS) using the Kafka Connector and Hadoop Upserts Deletes Incremental processing (HUDI). Further, we will employ Spark over MapReduce for data processing and finally, analyze the data using Superset via Presto.

Architecture

Data Lake Architecture: Movement from Mysql to HDFS

Initiating the Data Flow: The Kafka Connector

The journey of data from MySQL to HDFS begins with the Kafka Connector. It is an ideal tool for streaming data between MySQL and Apache Kafka. Kafka provides a scalable and fault-tolerant mechanism to handle real-time data feeds, making it a fantastic bridge between MySQL and HDFS. The Kafka Connector facilitates this seamless transition of data, and its robust system ensures that data loss is minimal, even in instances of network failure.

Deciphering the Kafka Connector Configuration

name = connector-tablename
connector.class = io.confluent.connect.jdbc.JdbcSourceConnector
key.converter = io.confluent.connect.avro.AvroConverter
value.converter = io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url = http://Schema-registry-ip:port
value.converter.schema.registry.url = http://schema-registry-id:port
transforms = createKey, setSchema
errors.log.enable = true
errors.log.include.messages = true
transforms.createKey.type = org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields = field1, field2
transforms.setSchema.type = org.apache.kafka.connect.transforms.SetSchemaMetadata$Value
transforms.setSchema.schema.name = tableRecords
connection.url = jdbc:mysql://mysql-Host:port/dbName?zeroDateTimeBehavior=convertToNull
connection.user = username
connection.password = password
numeric.precision.mapping = true
numeric.mapping = best_fit
dialect.name = MySqlDatabaseDialect
mode = incrementing
incrementing.column.name = id
timestamp.column.name = date_column
query = SELECT * FROM (SELECT *, DATE_FORMAT(DATE(date_column),'%Y/%m') as partitionid FROM dbname.table) AS table
query.suffix = LIMIT 100000
poll.interval.ms = 5000
batch.max.rows = 10
topic.prefix = tuser_v2
task.max = 1
db.timezome = Asia/Kolkata
table.type = TABLE
offset.flush.timeout.ms=600000

The majority of the above configuration is straightforward. We fetched the data in the batch of 1 lakh from DB via mentioned query. Two things which need focus

a) mode = incrementing

The mode setting in a Kafka Connect configuration determines how data is fetched from a source database. It has four options:

1. Bulk: In this mode, the connector will perform a bulk load of the entire table each time it polls for data. It’s suitable for snapshot-style imports where you want to fetch all the data from a table in a single operation.

2. Incrementing: This mode is used when a table has a column with monotonically incrementing values (like an `ID` or `timestamp` column). Each time the connector runs a query, it saves the maximum value retrieved and uses it in the `WHERE` clause for the next query to get new rows.

3. Timestamp: In this mode, the connector tracks the latest timestamp in the table. Every time it queries the table, it retrieves rows that have been modified since the last timestamp. This mode is suitable when rows of the table may be updated, and each such update will change the timestamp column to indicate when the row was updated.

4. Timestamp + Incrementing: This is a combination of the `Incrementing` and `Timestamp` modes. Here, two columns are used to track changes: one for the timestamp and one for an incrementing value. This mode is used when you have updates to rows (captured by the timestamp) and additions to rows (captured by the incrementing column).

In the above Kafka configuration, `mode = incrementing` means that the connector is set to incrementing mode. The connector will fetch data based on the incrementing value in the `incrementing.column.name` field, which is `id` in this case. Every time it queries MySQL, it will retrieve rows that have an `id` greater than the maximum `id` it has seen so far.

b) partitionId in query

This is needed for HUDI since it stores the data in a partition. In the above case, we have decided to keep the month-wise partition. We have also used day-wise partition in some places. What we have followed is:

Revolutionizing HDFS: Embracing Hudi

Once the data lands in Kafka, it makes its way to HDFS using Hudi, a solution built on top of HDFS. HUDI, which stands for Hadoop Upserts Deletes and Incremental Processing, has revolutionized how we deal with big data. It brings about a significant performance improvement by ensuring upserts (updates + inserts) and deletes on big datasets in HDFS. Using HUDI’s Delta Streamer utility, the fetched data from Kafka is then written to HDFS. Hoodie Delta Streamer serves as a robust tool that can capture changes from databases and apply them to a Hudi dataset. DeltaStreamer writes it to HDFS using either Copy-On-Write (COW) or Merge-On-Read (MOR) storage types:

While COW creates a clone of data blocks for every write, MOR decouples the writing into a log file and asynchronously merges it back to the data block. The choice between COW and MOR depends on your use-case and the nature of the operations you need to perform on your dataset.

HoodieDeltaStreamer, which is used for ingesting data into Hudi tables, leverages Spark for processing the incoming data. The operations of HoodieDeltaStreamer such as reading from source, transforming data, and writing to HDFS, all utilize Spark’s distributed processing capabilities.

spark-submit \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
"file://$HUDI_HOME/docker/hoodie/hadoop/hive_base/target/hoodie-utilities.jar" \
--table-type COPY_ON_WRITE \
--source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
--source-ordering-field date_column \
--target-base-path "hdfs://NamenodeIP:9000/data-lake/raw-zone/tables/table" \
--target-table table \
--props "file://$HUDI_HOME/hudi-utilities/src/test/resources/delta-streamer-config/kafka-source-table.properties" \
--schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
--source-limit 200000

Content of file kafka-source-table.properties

include=base.properties
hoodie.datasource.write.recordkey.field=id
hoodie.datasource.write.partitionpath.field=partitionid
hoodie.deltastreamer.schemaprovider.registry.url=http://schema-registry-ip:port/subjects/table-value/versions/latest
hoodie.deltastreamer.source.kafka.topic=kafka-topic-name
bootstrap.servers=kafka-broker-ip:port
auto.offset.reset=earliest
schema.registry.url=http://schema-registry-ip:port
hoodie.compact.inline.max.delta.commits=1

Hive Metastore: The Keeper of Metadata

Following above steps, data has been moved to HDFS from Mysql. But the data is still not visible in Presto or hive .

In Hudi, after data has been ingested into HDFS, it won’t be immediately visible in query engines like Hive or Presto. The reason is that these query engines fetch metadata about tables and partitions from the Hive Metastore, and the Metastore doesn’t automatically know about the new data.

Hudi provides a built-in utility called “Hive Sync Tool” that syncs metadata about the Hudi tables stored in HDFS with the Hive Metastore. This utility ensures that the Hive Metastore is aware of the new data ingested into HDFS, and hence, makes the data queryable by Hive or Presto.

Once the Hive sync operation is completed, your new data should be visible and queryable in Presto and Hive.

In short, the process is:

  1. Ingest data into HDFS using Hudi (via `HoodieDeltaStreamer` or other ingestion mechanisms).
  2. Run the Hudi Hive Sync Tool to update the Hive Metastore with the latest metadata about the Hudi tables.
  3. Query the new data using Hive, Presto, or any other query engine that utilizes the Hive Metastore.
$HUDI_HOME/hudi-sync/hudi-hive-sync/run_sync_tool.sh \
--jdbc-url jdbc:hive2://localhost:10000 \
--partitioned-by partitionid \
--base-path "hdfs://Namenode:9000/data-lake/raw-zone/tables/table" \
--user hive \
--pass hive \
--database default \
--table table \
> /data/data-lake/logs/metadata_sync.log 2>&1

Decoding Data with Superset and Presto

After the data is stored and processed in HDFS, it needs to be visualized and analyzed. Apache Superset provides a user-friendly interface to build interactive dashboards and visualize the data. Coupled with Presto, a distributed SQL query engine, you can effortlessly query data in its native storage, including HDFS.

Conclusion

Crafting an efficient data lake entails the integration of numerous technologies, each contributing unique strengths and functionalities. By leveraging MySQL, Kafka, HDFS, HUDI, Spark, Superset, and Hive Metastore, our architecture forms a robust data pipeline that empowers data ingestion, storage, processing, and querying. This architecture not only ensures a smooth data flow but also opens the door to comprehensive, real-time data analysis, thus enabling insightful, data-driven decisions for businesses. Stay tuned as we explore each component of this exciting data ecosystem in our upcoming blog posts!

--

--