How to design a data lake on AWS
Introduction
Our company has been at the forefront of interactive and digital entertainment since the debut of the first console. In order to amaze and attract more and more game fans, we are trying to make wiser decisions via data solutions.
Why Data Lake
For practical and performance reasons, we own various data sources at the department level. Instead of providing multiple data sources credentials or APIs to downstream consumers(data analysts, business analysts, data scientists, etc.), we decide to integrate these data sources together and set up a centralized data lake, so that all downstream consumers are able to retrieve the structured, unstructured and transformed data they require from only one data platform.
High Level Architecture Design
Amazon Web Services (AWS) allows us to build automated, cost-effective, and auto-scalable data lake architecture. Here is how we design our data lake infrastructure:
- The source data are originally stored in multiple types of database on AWS Aurora, including MySQL database, Oracle database, Snowflake, etc.
- Airflow is the main ETL and scheduling tool set up in docker on EC2 Instance.
- All the transformed and processed data ready for downstream consumers to read are stored in AWS S3 bucket.
- Using Spark to process data, the Spark application is configured on the EMR cluster.
- Athena is used as an interactive query engine to analyze data in the data lake, we provide the Athena JDBC connection to downstream consumers for access.
- Airflow also helps in sending quick and easy alerts (Slack)to data engineers and building robust CI/CD (GitHub)pipelines.
- Ideally, data lake need to be merged to the data ocean(company-wide level).
Airflow Data Pipelines (ETL)
For each data source, we mainly have 3 data pipelines to complete ETL(extraction, transformation, and loading)procedure.
- create_athena_tables: This pipeline is to create the master database on Athena. There is no real data processing in this step, we create DDL (database schema, table schema, etc.)via this pipeline. We don’t configure schedule interval on this pipeline, only trigger it when we need to update the table or database schema.
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.contrib.operators.aws_athena_operator import AWSAthenaOperatordefault_args = {
'owner': 'airflow',
'depends_on_past': True,
'wait_for_downstream': False,
'email': 'your email address',
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1)
}
dag = DAG('create_athena_tables',
default_args=default_args,
catchup=False,
schedule_interval=None,
max_active_runs=1
)
# Dummy top operator for a clean DAG
top = DummyOperator(
task_id='top',
dag=dag,
)# Create the Athena database if it doesn't exist
create_database = AWSAthenaOperator(
task_id='your task name',
query = "CREATE DATABASE IF NOT EXISTS {database};".format(database='master'),
database='master',
output_location='S3 location for logs',
dag=dag
)# Drop the Athena table, if it exists
drop_table_athena = AWSAthenaOperator(
task_id='your task name',
query = """DROP TABLE IF EXISTS {database}.{table_name};""".format(database='master', table_name='your table name'),
database='master',
output_location='S3 location for logs',
dag=dag
)drop_table_athena.set_upstream(create_database)# Create the Athena table, if it doesn't exists
create_table_athena = AWSAthenaOperator(
task_id='your task name',
query='your create table statement',
database='master',
output_location='S3 location for logs',
dag=dag
)create_table_athena.set_upstream(drop_table_athena)# Refresh the Athena partitions
refresh_table_athena = AWSAthenaOperator(
task_id='your task name',
query = "MSCK REPAIR TABLE {database}.{table};".format(database='master',table='your table name'),
database='master',
output_location='S3 location for logs',
dag=dag
)refresh_table_athena.set_upstream(create_table_athena)
top.set_upstream(refresh_table_athena)
- micro_data_process_{datasource}:
There are a number of data lake downstream consumers, they may have varying business requirements. Some downstream consumers are allowed to see certain parts of data or features, some are not allowed. Athena and Glue catalog haven’t provided security control on this case. Thus, we use Jinja in the SQL query to enforce. Here is the sample ETL Jinja code:
publish_sql = """
SELECT
{{% if params.[some filter you pre-defined] %}}
[certain field],
{{% else %}}
FILTER(#filter out the data they are not allowed to see#) AS [certain field] ,
{{% endif %}}
FROM
master.{table}
"""
Let’s dive into more details of main tasks of this pipeline:
(1) waitfor_data_{datasource}: This is the task for sensing if there are new data coming into the database, if there are new data, then run the following tasks, otherwise it will wait on the current task until new data flow in. Here is the example how to use SqlSensor to detect if there are new data:
from airflow import DAG
from airflow.operators.latest_only_operator import LatestOnlyOperator
from airflow.operators.sensors import SqlSensor
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.contrib.hooks.ssh_hook import SSHHook
from airflow.operators.mysql_operator import MySqlOperator
from airflow.operators.dummy_operator import DummyOperator
import boto3
from airflow.operators.python_operator import PythonOperatordefault_args = {
'owner': 'airflow',
'depends_on_past': False,
'wait_for_downstream': False,
'email': 'your email address',
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(seconds=1)
}
dag = DAG('micro_data_process_{datasource}',
catchup=False,
default_args=default_args,
schedule_interval=timedelta(minutes=min),
max_active_runs=1
)# Our first task will ensure all downstream is skipped
# unless we're running the latest interval
latest_only = LatestOnlyOperator(
task_id='latest_only',
dag=dag
)waitfor_mysql_data = SqlSensor(
task_id='your task name',
sql="""SELECT COUNT(1) AS num_rows FROM {your table of interest}
WHERE {last_updated_time}> timestamp('{{ ts }}')""",
dag= dag,
conn_id='your connection id')waitfor_mysql_data.set_upstream(latest_only)
(2)dump_data_to_s3: This is the task for dumping data to the master database. The data is stored as json files in S3 bucket. The reason we choose json format is we dump the data using “select … from …into s3://”, this method doesn’t support parquet today, only text or json. We process complex data structures like structs(), so json is the optimized export format.
I would like to talk a little more about the incremental data extracting solution here. When we have some large dataset or ETL process is very complicated, which may cost longer running time, we can take the incremental data processing solution into consideration.
In our case, we fully dump this large dataset once a day. At any other time, we extract delta data between two Airflow DAG runs, and merge this delta data back to the full dataset. There is one thing that should be considered here, since one json file in AWS S3 can store 6GB, when we use the incremental way, we should delete all json files in the entity(table) bucket after each full dump. Otherwise, it would cause the data duplication issue(exponentially!) .
If data load is not large, we can fully dump every DAG run instead of choosing an incremental way. Here is the example code of incremental solution for big data load:
# Dynamically generate tasks for
# 1) Dumping the data to S3 (master)pre_mysql=[
"""
CREATE TABLE IF NOT EXISTS tmp_{your table name}_in_scope(your DDL options);
"""",
"""
DELETE FROM tmp_{your table name}_in_scope;
""",
"""
INSERT INTO tmp_{your table name}_in_scope(your fields)
SELECT * FROM {your table}
WHERE {created_date}BETWEEN STR_TO_DATE(SUBSTRING('{{{{ prev_execution_date_success }}}}',1,19),'%Y-%m-%d %H:%i:%s')
AND STR_TO_DATE(SUBSTRING('{{{{ ts }}}}',1,19),'%Y-%m-%dT%H:%i:%s')
-- We want to fully refresh once a day
OR HOUR(STR_TO_DATE(SUBSTRING('{{{{ ts }}}}',1,19),'%Y-%m-%dT%H:%i:%s')) = 8
"""
]# pre_mysql is for only extracting delta data in the table
pre_mysql = MySqlOperator(
task_id='your task name',
sql= [x for x in pre_mysql]
dag=dag,
mysql_conn_id='your connection id'
)pre_mysql.set_upstream(waitfor_mysql_data)# Dump data into S3 in JSON format
dump_table = MySqlOperator (
task_id='your task name',
sql="""your ETL query for this table, make sure query based on tmp_{your table name}_in_scope""",
dag=dag,
mysql_conn_id='your connection id'
)
dump_table.set_upstream(pre_mysql)# delta data merge with full dump data
post_spark_sql=""" SELECT
COALESCE(a.{all you fields},b.{all your fields})
FROM
(SELECT * FROM master.{full dump table}) b
FULL OUTER JOIN
(SELECT * FROM master.{delta dump table}) a
ON a.{unique key} = b.{unique key}
WHERE COALESCE(a.{unique key}, b.{unique key}) IS NOT NULL
"""# Submit spark app to publish data, this is done via SSH, the pyspark script is on the next code session;
spark_submit = """
spark-submit --deploy-mode cluster \
--master yarn {spark_params} \
--name {app_name} \
{app_file} {bucket} {publish_sql};
""".format(
app_name = 'your application name',
app_file = 'The location of pyspark script above',
bucket = 'S3 bucket for this table',
spark_params = 'spark parameters you would like to configure, e.g: --executor-memory 15G',
publish_sql = json.dumps(post_spark_sql)
)# Generate Operator
post_spark_task = SSHOperator(
task_id='your task name',
ssh_hook=SSHHook(ssh_conn_id='your EMR SSH host'),
command=spark_submit,
dag=dag)
post_spark_task.set_upstream(dump_table)
The pyspark code for post_spark_task is as below:
import sys
from pyspark.sql import SparkSession
from pyspark import SparkContext# Define parameters
bucket=sys.argv[1]
publish_sql=str(sys.argv[2])spark = SparkSession.builder \
.enableHiveSupport() \
.getOrCreate()# Set Spark configuration to remove the metadata in S3
conf=spark.conf
conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
conf.set("spark.hadoop.parquet.enable.summary-metadata", "false")# Query source data from master DB
query=publish_sql.decode("unicode_escape")
df = spark.sql(query)# Save compressed parquet/snappy consumer dataset
df.repartition(1).write.partitionBy("ds").parquet(bucket)spark.stop()
(3)publish_data_to_athena: We use this pipeline to basically copy the data from the master database to each downstream consumer’s database. Athena uses AWS Glue Data Catalog to store and retrieve table metadata for the Amazon S3 data. The table metadata lets the Athena query engine know how to find, read, and process the data that you want to query. Thus, when we select the data from the master database, all the table metadata can be created automatically in consumer databases.
Also, we use Spark to process the data during this process. The reason we use Spark here is Airflow task is running sequentially, but we have many entities(tables) need to be processed, and also we deliver data to multiple downstream consumers. In order to run the tasks simultaneously and instead of being constrained on a single machine, we use Spark to distribute the load across multiple nodes. Here is the example pyspark code to process data and save as a parquet file to S3 with overwrite mode.
import sys
from pyspark.sql import SparkSession
from pyspark import SparkContext# Define parameters
bucket=sys.argv[1]
publish_sql=str(sys.argv[2])spark = SparkSession.builder \
.enableHiveSupport() \
.getOrCreate()# Set Spark configuration to remove the metadata in S3
conf=spark.conf
conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
conf.set("spark.hadoop.parquet.enable.summary-metadata", "false")# Query source data from master DB
query=publish_sql.decode("unicode_escape")
spark.sql("SET spark.sql.hive.manageFilesourcePartitions=False")
df = spark.sql(query)# Save compressed parquet/snappy consumer dataset
df.repartition(1).write.partitionBy("ds").parquet(bucket,mode="overwrite")spark.stop()
Here is how we use Airflow SSHOperator in our pipeline to submit above pyspark script to EMR cluster:
publish_sql = """your query"""# Submit spark app to publish data, this is done via SSH
spark_submit = """
spark-submit --deploy-mode cluster \
--master yarn {spark_params} \
--name {app_name} \
{app_file} {bucket} {publish_sql};""".format
(
app_name = 'your application name',
app_file = 'The location of pyspark script above',
bucket = 'your S3 bucket name',
spark_params = 'spark parameters you would like to configure, e.g: --executor-memory 15G',
publish_sql = json.dumps(publish_sql)
)
# Generate Operator
ssh_tasks_datalake = SSHOperator(
task_id='your task name',
ssh_hook=SSHHook(ssh_conn_id='ssh_default'),
command=spark_submit,
dag=dag)ssh_tasks_datalake.set_upstream(post_spark_task)def delete_s3_files(**kwargs):
session = boto3.Session(
region_name=kwargs['AWS_REGION'],
aws_access_key_id=kwargs['AWS_ACCESS_KEY_ID'],
aws_secret_access_key=kwargs['AWS_SECRET_ACCESS_KEY'])
s3 = session.resource('s3')
bucket = s3.Bucket(kwargs['aws_bucket'])
bucket.objects.filter('they files you want to delete in the bucket').delete()# Delete all json files in the table bucket after each full dump only for incremental solution
delete_s3bucket_files = PythonOperator(
task_id='your task name',
python_callable=delete_s3_files,
dag=dag,
op_kwargs={
'AWS_REGION': 'your AWS region',
'AWS_ACCESS_KEY_ID': 'your AWS access key',
'AWS_SECRET_ACCESS_KEY':'your AWS secret key',
'aws_bucket':'bucket you want to delete'
)
delete_s3bucket_files.set_upstream(ssh_tasks_datalake)
delete_s3bucket_files.set_downstream(top)
You can remove all the ‘pre_mysql’, ‘post_sparl_task’ and ‘delete_s3bucket_files’ if you don’t choose to use the incremental method.
The schedule interval of this pipeline is 30 minutes, so we deliver the data to the data lake every 30 minutes.
- provision_data_{consumer}: This pipeline is used for creating schema on downstream consumer databases on Athena and also creating views for them. Same with create_athena_tables, we trigger this pipeline when we update the table or database schema.
The coding for this part is also similar with the other two pipelines, we use AthenaOperator to create databases and views, SSHoperator to create tables.
What’s next?
Building the data lake on AWS helps us integrate various sources together. As more and more data sources we will pull into the data lake, we should consider some limitations of the current solution and how to solve and optimize it. For example, as we mentioned above, currently Aurora doesn’t support dumping data with parquet file, once it supports parquet, we can simplify our current process. Also, how to improve the query speed as data are getting larger? We are pulling data batch by batch now, we would also consider moving to streaming solution.