Share
Explore

icon picker
Data Engineer Cert. - DataCamp

Welcome to my summary on Data Engineering Certification from DataCamp. This is an incomplete version. Some topic may not yet been covered. Please revisit this page from time to time to see the progress.

!Writing Efficient Code in Python

image.png

Build-in functions

Built-in types: list , tuple , set , dict , and others
Built-in functions: print(), len() , range(), round() , enumerate() , map() , zip() and others
Built-in modules: os, sys, itertools, collections, math, and others
Interesting points




Component of lambda function

Timing and profiling


Gaining Efficiencies


Pandas optimizations

!Writing Functions in Python

lorem ipsum

Intro to Shell

ls
file listing
pwd
print working directory
cd
change directory
cd ..
move up one level
cd ~
change to home directory
cp original.txt duplicate.txt
copy file
mv old_location.txt new_location.txt
move file (can also use for renaming file)
rm file.txt
remove file
rm -r directory
remove directory
cat file.txt
view file content
head file.txt
see first 10 lines of file
head -n 3 file.txt
see first 3 lines of file
ls -R
list everything below current directory
cut -f 2-5, 8 -d , values.csv
select column 2 to 5 and column 8 using , as separator
-f : field
-d : delimiter
grep search_term filename.csv
search for pattern in file
-c : print a count of matching lines instead of the lines
-h : do not print the filename when searching multiple files
-i : ignore case
-l : print the names of files that contain the matches
-n : print line number of matching lines
-v : invert match
head filename.csv > store.csv
store first 10 lines in filename.csv to store.csv
head -n 5 filename.csv | tail -n 2
store first 5 lines from filename.csv to be input for the next command
wc
print number of character -c , word -w , or line -l in a file
wildcards
* matches any characters at any length
? matches a single character
[...] matches any one of the characters inside
{...} matches any of comma separated patters inside
sort
sort output
-n numerical order
-r reversed order
-f fold case (case-insensitive)
-b ignore leading blank
uniq
remove adjacent of duplicate lines
filename=text.txt
assign text.txt to a variable called filename
echo $filename
print value contained in the variable filename
for filename in directory/*.csv; do echo $filename; done
print name of every file in the folder directory
nano filename
open filename in editor
ctrl + k cut a line
ctrl + u paste a line from clipboard
ctrl + o save the file
ctrl + x exit the editor
bash script.sh
run commands in script.sh

Data Processing in Shell

data download

curl [option flags] [URL]
client url download data from http or ftp
-O download with existing filename
-o newname download and rename to newname
-L redirect HTTP if 300 error code occurs
-C resume previous file transfer if it times out before completion
wget [option flags] [URL]
www get > native command to download files
better than curl for multiple file downloading
-b background download
-q turn off wget output
-c resume broken download
-i download from list given in a file
--wait=1 wait 1 second before download

csvkit

in2csv filename.xlsx > filename.csv
convert the first sheet in filename.xlsx to filename.csv
in2csv -n filename.xlsx
list all sheet names
in2csv filename.xlsx —sheet “sheet1” > filename.csv
convert sheet sheet1 to filename.csv
csvlook filename.csv
preview filename.csv to console
csvstat filename.csv
df.describe in console
csvcut -n filename.csv
list all column names in filename.csv
csvcut -c 1 filename.csv
return column index 1 (regarding result from csvcut -n ) from filename.csv
can be used as -c “column name” as well
csvgrep
filter by row using exact match or regex
must use one of the following options
-m exact row value
-r regex pattern
-f path to a file
csvgrep -c “column name” -m value filename.csv
filter filename.csv where column name == value
csvstack file1.csv file2.csv > allfile.csv
stack file1.csv and file2.csv together and save to allfile.csv
csvstack -g “f1”,”f2” -n "source" file1.csv file2.csv > allfile.csv
create a special column name source (instead of the default group)to identify which row comes from which file

SQL

connect to database sqlite:///database.db using query SELECT * FROM base and save to filename.csv
csvsql --query "SELECT * FROM base LIMIT 1" filename.csv
use the above query to select data from local filename.csv file
csvsql --query "SELECT * FROM base1 INNER JOIN base2 ON ID" base1.csv base2.csv
can use for multiple csv but the bases should appear in order according to SQL query
insert filename.csv to database
--no-inference disable type parsing (consider everything as string)
--no-constraints generate schema without length limit or null check

cron job

echo "* * * * * python hello_world.py" | crontab
Add as job that runs every minute on the minute to crontab
there are 5 * to indicate time for a cron job
minute(0-59)
hour(0-23)
day of month(1-31)
month(1-12)
day of week(0-6)
crontab -l
list all cron jobs

Intro to Bash

Bash is a script of shell commands
Usually starts with
#!/usr/bash or #!/bin/bash
file extension is .sh
run by bash scriptname.sh
arguments can be added by bash scriptname.sh arg1 arg2
These arg1 arg2 can be accessed inside the script by:
$1 get arg1
$2 get arg2
$@ get arg1 arg2
$# get 2 as the amount of arguments
variables
var1="sam" assign the value "sam" to var1 . Note that there must be no space between =
echo "There are $var1 items" will print There are 6 items
quotation marks
'sometext' interpreted literally
"sometext" interpreted literally except $ and `
`sometext` interpreted as bash command and return STDOUT into a variable
$(command) is equivalent to `command` but the $(command) is used more in modern applications
numerical calculation
expr 1 + 4 get 5
expr 11 + 3.5 get error as expr takes only int into account
echo "11 + 3.5" | bc get 14.5 as bc (basic calculator) can do more
echo "11/3" | bc get 3 since the decimal place has not been defined
echo "scale=3; 11/3" | bc get 3.666
$((11+3)) is expr 11 + 3
array
numerical-indexed
declare -a my_array to define empty array
or my_array=(1 2 3) to provide values while defining array. Note that space is used as separator
echo ${my_array[@]} get (1 2 3)
echo ${#my_array[@]} get 3 (length of array)
echo array[@]:N:M where N is the starting idx and M is how many elem to return
if-statement
dont forget [ ]
special flags
-e if the file exists
-s if the file exists and has size greater than zero
-r if the file exists and readable
-w if the file exists and writable
combine conditions with && for AND and || for OR
for-loop




function


return does not return the return value but status code, where success is 0 and failure is 1-255, which is captured as $? . To output values from a function, one should
make a global variable and update it
use echo at the last line of the function and use shell within a shell to call the function

!Unit Testing

image.png
This makes like 100 times for testing a function before going into production
simple unit test process
create a file with naming convention of test_ in front, e.g., to test myfunc this test file should be test_myfunc.py
inside the testing file
run the unit test pytest test_myfunc.py
comparing float in assert statement, use assert 0.1+0.1+0.1 == pytest.approx(0.3)
image.png
Project structure composed of src to contains source code and test to contain tests with corresponding test modules for each source file. Inside test file one can split different source functions to test by using class
if a function is expected to fail
dealing with testing with files
create raw data file + create ref result file → function → compare the result file from function and ref result file → remove raw data file + remove ref result file
The process in orange is call Fixture in which pytest has a function to deal with
or use tmpdir function from pytest to create files in this tmpdir and remove files and this tmpdir after the test ends

!Object Oriented

Object = states + behaviors = attributes + methods
attribute <> variable <> obj.my_attribute
method <> function() <> obj.my_method

Intro to Airflow

Airflow is a platform to program workflows → Creation, Scheduling, Monitoring
Workflow in Airflow is written in Python implemented as DAG (Directed Acyclic Graphs)
DAG is a set of tasks forming into graph in which:
unidirection → task flow indicates inheritance and dependencies
acyclic → no loop or repeat

Airflow Operator

Bash Operator


Python Operator

Sensor

Operator that waits for a certain condition to be true
Can set how often to check
Attach to task

File sensor

Other sensors

ExternalTaskSensor
HttpSensor
SqlSensor
other sensors in airflow.sensors and airflow.contrib.sensors

Template

Replace things effectively using jinja template style
using for loop
special variables
{{ ds }}: running date in YYYY-MM-DD
{{ ds_nodash }} : YYYYMMDD
{{ prev_ds }} : previous running date in YYYY-MM-DD
{{ prev_ds_nodash }} : YYYYMMDD
{{ dag }} : DAG object
{{ conf }} : Airflow config
{{ macros.datetime }} : datetime.datetime object in python
{{ macros.timedelta }} : timedelta object in python
{{ macros.ds_add(’2020-04-15’, 5) }} : modify days from date → in this case, 2020-04-20

Branching

Use logic to navigate the flow

Running DAGs and Tasks

run a specific task from command line
airflow run <dag_id> <task_id> <date>
run a full DAG
airflow trigger_dag -e <date> <dag_id>

Intro to Spark

Spark is used to distribute computing tasks to workers or nodes, in which the main connection between user and the cluster is a simple as calling SparkContext and the interface of the connection is SparkSession .
view tables
use SQL query
convert pd dataframe to spark
read csv to spark
add column
rename column
filtering
select columns
select column, modify, and assign with alias
aggregation
join

ML Pipeline in Spark

pyspark.ml composed of two main classes
Transformer classes have .transform() method which takes in df and returns df with an additional transformed column
Estimator classes have .fit() method which takes in df but returns a model object
ML pipeline in spark allows only numerical columns for calculation. In many cases, Spark guesses datatype of each column for us once throwing into the pipeline but they could guess incorrectly, e.g., bus line number which is number but does not contain numerical meaning behind. Therefore, it would be safer to transform each column to their corresponding data type before throwing into ML pipeline.
cast data type → target data type should be int integer or float double
create one hot encoder for categorical data
assemble features into a vector (prerequisite for spark before training a model)
create pipeline
split data
model tuning

!AWS Boto

Boto3 is python interface to AWS. There are multiple services available in AWS for data projects but some interesting services are:
Simple Storage Service (S3) - store files (or so-called objects) in AWS
Simple Notification Service (SNS) - send alerts or notifications (via email or other channels)
Comprehend - sentiment analysis on text data
Rekognition - extract text from image and image classification

S3

service to store files in folders on cloud. In fact, terminology in cloud computing use:
file → object
folder → bucket
create boto3 client:
create bucket
list buckets
delete bucket
upload file
list objects in a bucket
get object metadata
download object
delete object

Permission

By default, AWS denies every request to access to data unless AWS_KEY_ID and AWS_SECRET of users with appropriate permission are defined. Permissions can be given by the following:
IAM - provide permissions across AWS services
Bucket Policy - permission on buckets
Access Control List (ACL) - permission on objects
Pre-signed URL - temporary access to an object
Here, we will discuss about ACL and Pre-signed URL only.
There are 2 types of ACL to define: public-read and private . Upon uploading, all object ACLs are private by default. To set ACL to public-read :
or
public file in s3 will be in the format
read csv file as dataframe on s3 without downloading to local disk
generate pre-signed URL

SNS

service to send SMS, email, or notifications to subscribers via AWS. To send messages, these components need to be considered:
publisher - script/code that generate topic or send topic from
topic - topic of messages to subscribe to
subscriber - listener
create sns client
create sns topic
list topics
delete topic
subscribe to a topic
list subscriptions
list subscriptions by topic
delete subscription
send a topic
send a single SMS to a single subscriber

Intro to Relational Database

Relational Database

entities become tables
reduce redundancy
data integrity by relationship
simple SQL query

Example

entities which contain redundant information
image.png
In fact, the blue are person info, the green are university info, and the orange are organization info.
Therefore, this can be reformed into:
image.png

Migrate the university_professors table to new schema

Insert to organizations table
If renaming table required
If dropping column required:

Data constraints


Attribute constraints

fix data structure, leads to more consistency, and data quality
image.png
attribute constraints should be defined upon table creation
change type after table creation

Key constraints

key is the attribute which identify the record uniquely
if this attribute can be removed while the record can still represent its uniqueness → superkey
if no more attributes can be removed → key (actual key)
usually a surrogate key is use to define record’s uniqueness → id column

Referential integrity constraints

Foreign key (FK) points to the Primary key (PK) of another table
Each FK must exist in the PK of the other table
example to specify FK
add FK to existing table

Database Design

2 ways to process data
Online Transaction Processing (OLTP)
Involve around transaction data
for day-to-day data processing
store in conventional database
Online Analytical Processing (OLAP)
Involve around analysis
for business decision making
store in data warehouse
image.png

Structuring data

structured data
follow a schema
data type and relationship are defined
easy to analyze
less flexible → hard to scale
e.g., SQL, relationla database
unstructured data
schemaless
e.g., photos, chatlogs, audio
semi-structured data
does not follow larger schema
self-describing structure
e.g., NoSQL, XML, JSON
image.png

Data modeling

process of creating model for data to be stored.
1) Conceptual data model → describe entities, relationships, attributes
image.png
2) Logical data model → define tables, columns, relationships
image.png
3) Physical data model → describe physical storage, CPU, backup systems

Dimensional modeling

Design of data warehouse for OLAP queries using star schema.

Elements of dimensional modeling

Fact table
holds records of a metric
changes regularly
connects to dimensions via foreign keys
Dimension tables
holds descriptions of attributes
does not change often
image.png
Star schema can be extended to snowflake schema → dimension tables are normalized to avoid repeating information.
dimension table in star schema (denormalized)
image.png
dimension table in snowflake schema (normalized)
image.png
data normalization to snowflake schema yields the following
advantages:
eliminate data redundancy → save storage
better data integrity → accurate and consistent data
disadvantages:
complex queries require more CPU

Normal forms

approaches to normalize data with specific rules.

1NF

each record must be unique → no duplicate rows
each cell must hold one value
image.png
image.png

2NF

Must satisfy 1NF and
if primary key is one column, it automatically satisfy 2NF
if it is composite primary key (primary key is more than one column), each non-key column must be dependent on all the keys
image.png
image.png

3NF

must satisfy 2NF and
no transitive dependencies → non-key cannot depend on other non-key columns
image.png
image.png

Database views

is a virtual table which is a result of stored query in memory. It can be queried like regular table to reduce retyping common quries.
get all views in the database
Keep in mind that this is non-materialized view which stores query, not its result. Therefore, everytime the view is called, it starts a new query to get a fresh result from database. That means, if the query for the view takes long time to run, the query referring to this view will always take long time as well.
On the other hand, materialized view stores result of the view for accessing. That means, querying this view will be much faster than non-materialized view with a drawback that the view might be up-to-date if not yet refreshed.
refresh materialized view

Table partitioning

Once records in our database become bigger, it is necessasry to scale it out so that the hardware cab still manage the queries.
vertical partitioning
separate low-priority column(s) to other table(s)
image.png
horizontal partitioning
split data according to rows
image.png
image.png

Intro to Scala

Scala = modern Java
It combines object-oriented programming and functional programming → scalable
Object-oriented programming (OOP)
Every value is an object
Every operation is a method call
Functional programming (FP)
functions are first-class values
operations should map input values to output values rather than change data inplace

every value in scala is an object and every operation is a method call
variable definition
Scala value types:
Double ← Default double precision floating point number
Float
Long
Int
Short
Byte
Char → in sequence becomes String
Boolean
Unit
In Scala, it is more preferable to use val over var as it is immutable, which means it is more static and easier to interpret (no need to remember if it is changed somewhere). However, with drawback of creating new variable everytime the state of variable changes, more memory for data copying is most of the time necessary to satisfy this requirement.

Scala as a compiler

Scala itself is a interpreted language, which means it interprets while running. However, it can behave as a compiled language by:
create a file named Game.scala
compile using command scalac Game.scala
run using command scala Game

Function

Array

Array in Scala is a sequence of objects that is mutable and share the same type
Dirtiest way to create an array
array can be parameterized before defining their values
define values inside array
As array is mutable but its definition uses val , the elements inside the array can be changed but the details about the array, e.g., length or data type, cannot.
Define array with mixed data type

List

List in Scala is a sequence of objects that is immutable and share the same type
List definition
List has many useful methods, for example:
:: cons - prepend a new element to an existing list (append exists in Scala but rarely used)
Nil empty list
initialize list with :: and Nil
Nil at the end is required since :: prepends data in front to a list behind. And this method belongs to a list object, where Nil can do this job.
::: concatenation - concat two lists

if

evaluate statement if it is True or False . Do not forget () around the statement
operators
relational
> gt
< lt
>= gte
<= lte
== eq
!= neq
logical
&& and
|| or
! not

while


Big Data with PySpark

3V to define as Big Data
Volume - size of data
Variety - different sources and formats
Velocity - speed of data
Terminologies in big data processing
Clustered computing - compute at multiple machines
Parallel computing - simultaneous computation
Distributed computing - collection of nodes running in parallel
Batch processing - job broken into small pieces and running them on individual machines
Real-time processing - immediate processing of data

PySpark

Apache Spark provides high level API in Scala, Java, Python, and R. In this course only Python API, PySpark, is discussed. PySpark is originally written in Scala. The processing speed remains the same as in Scala while the syntax looks like Pandas and Sklearn.
As Spark has Spark Shell to quickly prototype the data processing in Scala, PySpark also has its PySpark Shell for this purpose in Python.
To start going into the world of Spark, one requires SparkContext usually abbreviated as sc → connecting point to Spark cluster. without it, you cannot run any PySpark jobs.
sc.version
Inspect currently using Spark version
sc.pythonVer
Inspect currently using python version
sc.master
Inspect URL of the cluster or “local” when run in local mode
rdd = sc.parallelize([1,2,3,4])
Load list values into SparkContext
rdd =sc.paralellize(”Hello”)
Load string Hello to SparkContext
rdd = sc.textFile(”text.txt”)
Load local file text.txt to SparkContext

Resilient Distributed Datasets (RDD)

RDD is backbone data type in pyspark. It’s a technique that Spark uses for its distributed data processing.
RDD stands for 3 important abilities:
Resilient - ability to withstand failures
Distributed - spanning across multiple machines
Datasets - collection of partitioned data → array, tables, tuples, etc.
Partition - logical division of data storage
rdd = sc.parallelize([1,2,3,4], minPartitions=4)
Load list to sparkcontext with minimum 4 partitions to store the data

RDD Operations

RDD in pyspark supports:
Transformation - create new output RDDs from input RDDs
Actions - computation on the input RDDs

Transformation

follows Lazy evaluation → create computation graph first, and the graph will be executed only if it really needs to be evaluated. Supporting functions in Transformation are:
map() - apply function to all elements in the RDD
return [1,4,9,16]
filter() - return a new RDD only elements that satisfy the condition
return [3,4]
flatMap() - return multiple values for each element in the original RDD
return [”hello”, “world”, “how”, “are”, “you”]
union() - return a combination of RDDs
return [1, 2, 3, 4, 5, 6, 7]

Actions

return values after RDD computations. Important actions are:
collect() - return all elements of the RDD as an array
rdd_map.collect() returns [1,4,9,16]
take(N) - return the first N elements of the RDD
rdd_map.take(2) returns [1,4]
first() - return only the first element → equivalent to take(1)
rdd_map.first() returns [1]
count() - return number of elements in the RDD
rdd_map.count() returns 4

Pair RDDs

dataset with key/value pairs
each row is a key that maps to one or more values
create pair RDD from a list of key-value tuple

Transformations

All regular transformations work on pair RDD. But the functions that parsed in should operate on key value pairs rather than individual elements. Examples of RDD transformation:
reduceByKey() - combine values with the same key
return [(”name1”, 23), (”name2”, 66), (”name3”, 12)]
sortByKey() - return RDD sorted by key
return [(66, "name2"), (23, "name1"), (12, "name3")]
groupByKey() - group values with the same key
return [ ("DE", ("Munich", "Berlin")), ("UK", ("London")), ("NL", ("Amsterdam")) ]
join() - join 2 pair RDD based on their key
return [ ("name1", (20, 21)), ("name2", (23, 17)), ("name3", (12, 4)) ]

Actions

description
reduce(func) - aggregate elements of a regular RDD. This func should be commutative and associative.
return 14
saveAsTextFile("filename") - save RDD into a text file with each partition as a separated file in case if the data is too big to store in memory using collect()
coalesce() - combine data to save RDD as a single text file
countByKey() - count how many times each key appears in the RDD. Works only with pair RDD of type (K, V) and the data should be able to fit the memory.
return ("k1", 2), ("k2", 1)
collectAsMap() - return pair RDD as a dictionary. Again, works only when the data can fit into the memory
return {"k1": 3, "k2": 6}

PySparkSQL

library for structured data like pandas. can create DataFrame for data wrangling and computation. can deal with both structured (relational database) or semi-structured data (json). support SQL queries (SELECT * FROM table ) or expression method (df.select() )
RDD requires SparkContext as its main entry point, Spark DataFrame requires SparkSession abbreviated by spark for this reason as well.
create dataframe from RDD
create dataframe from file

DataFrame Operations

Transformations

select() - transformation subsets the columns in the DataFrame
filter() - filter the rows based on condition(s)
groupby() - group DataFrame based on column(s) and perform the rest with aggregation
orderby() - sort the DataFrame based on column(s)
dropDuplicates() - create a new DataFrame with only unique rows
withColumnRenamed() - create a new column with new names of specific columns

Actions

printSchema() - print type of columns. general method of Spark, not specific as DataFrame action
show(n_rows) - print the first 20 rows or n_rows in the DataFrame
columns - print columns of the DataFrame
describe() - calculate summary statistics of numerical columns in the DataFrame

SQL Queries

In the previous part, DataFrame was manipulated using DataFrame API. In fact, DataFrame can be manipulated using SQL queries which is easier to understand and more portable. Both API and SQL are mostly interchangeable.
SQL queries need SparkSession sql("SQL statement") to execute SQL query
However, the DataFrame need to be transformed to SQL view before doing any queries.

Visualization

Plotting in PySpark natively is still not available. Use the following methods:
pyspark_dist_explore library
toPandas()
HandySpark library

MLlib

Instead of using sklearn, MLlib plays a role in ML tasks for big data using cluster of machines rather than a single machine. Three C’s of ML in MLlib:
Collaborative filtering → for recommendation engine
Classification
Clustering

Collaborative Filtering

Use Rating class as a wrapper for tuple (user, product, rating)
Randomly split data (applies to any ML tasks)
Train model using Alternating Least Squares
Predict ratings

Classification

Vector in MLlib
LabelledPoint() wrapper to contain label and feature
Prepare data for training (classification)
train
evaluate

Cleaning Data with PySpark

Data cleaning helps in terms of Performance and Organizing data flow. The first thing is to define schema of the data.

Parquet file

When we deal with csv file, there are issues like:
no defined schema
nested data require special handling
encoding format limited
slow to parse
files cannot be filtered
any intermediate use requires redefining schema
Therefore, let’s introduce .parquet format
a columnar data format
support in Spark and other Big Data frameworks
automatically stores schema information

Working with Parquet

read file
df = spark.read.format("parquet").load("filename.parquet")
df = spark.read.parquet("filename.parquet")
write file
df.write.format("parquet").save("filename.parquet")
df.write.parquet("filename.parquet")
SparkSQL operations

DataFrame

filter only some rows
select only some columns
create new column
drop column
conditional filtering
user-defined function

Caching

improve calculation speed by using memory or disk. Therefore, some big data may not fit in memory
caching before action
check if data is cached
remove cache

Performance

Spark cluster is divided into 2 components:
driver - assign jobs → should have double RAM more than worker
worker - do the job → more workers is better than big workers
To improve Spark performance, the following factors are taken into account:
number of objects - more objects is better than larger but less ones
general size of objects - objects with more or less the same size is better than big and small sizes
schema - well-defined schema helps Spark not to redefine it again

Shuffling

shuffling is an action that move data across workers to perform other actions. This comes with a bit more calculation cost.
usually shuffling comes with the command .repartition(num_partition) so try to avoid it
join() is another function that do shuffling, if need to use this command, apply combined_df = df1.join(F.broadcast(df2))

Pipeline

is steps to transform input to desired output
input - csv, json, web services, databases
transformation - withColumn(), filter(), drop()
output - csv, parquet, database

!Intro to MongoDB

me mid 2 tango pls





Want to print your doc?
This is not the way.
Try clicking the ⋯ next to your doc name or using a keyboard shortcut (
CtrlP
) instead.