Welcome to my summary on Data Engineering Certification from DataCamp. This is an incomplete version. Some topic may not yet been covered. Please revisit this page from time to time to see the progress.
!Writing Efficient Code in Python
Build-in functions
Built-in types: list , tuple , set , dict , and others Built-in functions: print(), len() , range(), round() , enumerate() , map() , zip() and others Built-in modules: os, sys, itertools, collections, math, and others Interesting points
Component of lambda function
Timing and profiling
Gaining Efficiencies
Pandas optimizations
!Writing Functions in Python
lorem ipsum
Intro to Shell
ls
file listing
pwd
print working directory
cd
change directory
cd ..
move up one level
cd ~
change to home directory
cp original.txt duplicate.txt
copy file
mv old_location.txt new_location.txt
move file (can also use for renaming file)
rm file.txt
remove file
rm -r directory
remove directory
cat file.txt
view file content
head file.txt
see first 10 lines of file
head -n 3 file.txt
see first 3 lines of file
ls -R
list everything below current directory
cut -f 2-5, 8 -d , values.csv
select column 2 to 5 and column 8 using , as separator
-f : field
-d : delimiter
grep search_term filename.csv
search for pattern in file
-c : print a count of matching lines instead of the lines
-h : do not print the filename when searching multiple files
-i : ignore case
-l : print the names of files that contain the matches
-n : print line number of matching lines
-v : invert match
head filename.csv > store.csv
store first 10 lines in filename.csv to store.csv
head -n 5 filename.csv | tail -n 2
store first 5 lines from filename.csv to be input for the next command
wc
print number of character -c , word -w , or line -l in a file
* matches any characters at any length
? matches a single character
[...] matches any one of the characters inside
{...} matches any of comma separated patters inside
sort
sort output
-n numerical order
-r reversed order
-f fold case (case-insensitive)
-b ignore leading blank
uniq
remove adjacent of duplicate lines
filename=text.txt
assign text.txt to a variable called filename
echo $filename
print value contained in the variable filename
for filename in directory/*.csv; do echo $filename; done
print name of every file in the folder directory
nano filename
open filename in editor
ctrl + k cut a line
ctrl + u paste a line from clipboard
ctrl + o save the file
ctrl + x exit the editor
bash script.sh
run commands in script.sh
Data Processing in Shell
data download
curl [option flags] [URL]
client url download data from http or ftp
-O download with existing filename
-o newname download and rename to newname
-L redirect HTTP if 300 error code occurs
-C resume previous file transfer if it times out before completion
wget [option flags] [URL]
www get > native command to download files
better than curl for multiple file downloading
-b background download
-q turn off wget output
-c resume broken download
-i download from list given in a file
--wait=1 wait 1 second before download
csvkit
in2csv filename.xlsx > filename.csv
convert the first sheet in filename.xlsx to filename.csv
in2csv -n filename.xlsx
list all sheet names
in2csv filename.xlsx —sheet “sheet1” > filename.csv
convert sheet sheet1 to filename.csv
csvlook filename.csv
preview filename.csv to console
csvstat filename.csv
df.describe in console
csvcut -n filename.csv
list all column names in filename.csv
csvcut -c 1 filename.csv
return column index 1 (regarding result from csvcut -n ) from filename.csv
can be used as -c “column name” as well
csvgrep
filter by row using exact match or regex
must use one of the following options
-m exact row value
-r regex pattern
-f path to a file
csvgrep -c “column name” -m value filename.csv
filter filename.csv where column name == value
csvstack file1.csv file2.csv > allfile.csv
stack file1.csv and file2.csv together and save to allfile.csv
csvstack -g “f1”,”f2” -n "source" file1.csv file2.csv > allfile.csv
create a special column name source (instead of the default group)to identify which row comes from which file
SQL
connect to database sqlite:///database.db using query SELECT * FROM base and save to filename.csv
csvsql --query "SELECT * FROM base LIMIT 1" filename.csv
use the above query to select data from local filename.csv file
csvsql --query "SELECT * FROM base1 INNER JOIN base2 ON ID" base1.csv base2.csv
can use for multiple csv but the bases should appear in order according to SQL query
insert filename.csv to database
--no-inference disable type parsing (consider everything as string)
--no-constraints generate schema without length limit or null check
cron job
echo "* * * * * python hello_world.py" | crontab
Add as job that runs every minute on the minute to crontab
there are 5 * to indicate time for a cron job
crontab -l
list all cron jobs
Intro to Bash
Bash is a script of shell commands #!/usr/bash or #!/bin/bash
run by bash scriptname.sh arguments can be added by bash scriptname.sh arg1 arg2 These arg1 arg2 can be accessed inside the script by: $# get 2 as the amount of arguments var1="sam" assign the value "sam" to var1 . Note that there must be no space between = echo "There are $var1 items" will print There are 6 items 'sometext' interpreted literally "sometext" interpreted literally except $ and ` `sometext` interpreted as bash command and return STDOUT into a variable $(command) is equivalent to `command` but the $(command) is used more in modern applications expr 11 + 3.5 get error as expr takes only int into account echo "11 + 3.5" | bc get 14.5 as bc (basic calculator) can do more echo "11/3" | bc get 3 since the decimal place has not been defined echo "scale=3; 11/3" | bc get 3.666 declare -a my_array to define empty array or my_array=(1 2 3) to provide values while defining array. Note that space is used as separator echo ${my_array[@]} get (1 2 3) echo ${#my_array[@]} get 3 (length of array) echo array[@]:N:M where N is the starting idx and M is how many elem to return -s if the file exists and has size greater than zero -r if the file exists and readable -w if the file exists and writable combine conditions with && for AND and || for OR
return does not return the return value but status code, where success is 0 and failure is 1-255, which is captured as $? . To output values from a function, one should make a global variable and update it use echo at the last line of the function and use shell within a shell to call the function !Unit Testing
This makes like 100 times for testing a function before going into production
create a file with naming convention of test_ in front, e.g., to test myfunc this test file should be test_myfunc.py run the unit test pytest test_myfunc.py comparing float in assert statement, use assert 0.1+0.1+0.1 == pytest.approx(0.3) Project structure composed of src to contains source code and test to contain tests with corresponding test modules for each source file. Inside test file one can split different source functions to test by using class
if a function is expected to fail dealing with testing with files create raw data file + create ref result file → function → compare the result file from function and ref result file → remove raw data file + remove ref result file The process in orange is call Fixture in which pytest has a function to deal with or use tmpdir function from pytest to create files in this tmpdir and remove files and this tmpdir after the test ends
!Object Oriented
Object = states + behaviors = attributes + methods
attribute <> variable <> obj.my_attribute method <> function() <> obj.my_method Intro to Airflow
Airflow is a platform to program workflows → Creation, Scheduling, Monitoring
Workflow in Airflow is written in Python implemented as DAG (Directed Acyclic Graphs) DAG is a set of tasks forming into graph in which: unidirection → task flow indicates inheritance and dependencies acyclic → no loop or repeat Airflow Operator
Bash Operator
Python Operator
Sensor
Operator that waits for a certain condition to be true Can set how often to check File sensor
Other sensors
other sensors in airflow.sensors and airflow.contrib.sensors Template
Replace things effectively using jinja template style
using for loop
special variables
{{ ds }}: running date in YYYY-MM-DD
{{ ds_nodash }} : YYYYMMDD
{{ prev_ds }} : previous running date in YYYY-MM-DD
{{ prev_ds_nodash }} : YYYYMMDD
{{ dag }} : DAG object
{{ conf }} : Airflow config
{{ macros.datetime }} : datetime.datetime object in python
{{ macros.timedelta }} : timedelta object in python
{{ macros.ds_add(’2020-04-15’, 5) }} : modify days from date → in this case, 2020-04-20
Branching
Use logic to navigate the flow
Running DAGs and Tasks
run a specific task from command line airflow run <dag_id> <task_id> <date>
airflow trigger_dag -e <date> <dag_id>
Intro to Spark
Spark is used to distribute computing tasks to workers or nodes, in which the main connection between user and the cluster is a simple as calling SparkContext and the interface of the connection is SparkSession .
convert pd dataframe to spark select column, modify, and assign with alias ML Pipeline in Spark
pyspark.ml composed of two main classes
Transformer classes have .transform() method which takes in df and returns df with an additional transformed column Estimator classes have .fit() method which takes in df but returns a model object ML pipeline in spark allows only numerical columns for calculation. In many cases, Spark guesses datatype of each column for us once throwing into the pipeline but they could guess incorrectly, e.g., bus line number which is number but does not contain numerical meaning behind. Therefore, it would be safer to transform each column to their corresponding data type before throwing into ML pipeline.
cast data type → target data type should be int integer or float double create one hot encoder for categorical data assemble features into a vector (prerequisite for spark before training a model) !AWS Boto
Boto3 is python interface to AWS. There are multiple services available in AWS for data projects but some interesting services are:
Simple Storage Service (S3) - store files (or so-called objects) in AWS Simple Notification Service (SNS) - send alerts or notifications (via email or other channels) Comprehend - sentiment analysis on text data Rekognition - extract text from image and image classification S3
service to store files in folders on cloud. In fact, terminology in cloud computing use:
create boto3 client:
create bucket
list buckets
delete bucket
upload file
list objects in a bucket
get object metadata
download object
delete object
Permission
By default, AWS denies every request to access to data unless AWS_KEY_ID and AWS_SECRET of users with appropriate permission are defined. Permissions can be given by the following:
IAM - provide permissions across AWS services Bucket Policy - permission on buckets Access Control List (ACL) - permission on objects Pre-signed URL - temporary access to an object Here, we will discuss about ACL and Pre-signed URL only.
There are 2 types of ACL to define: public-read and private . Upon uploading, all object ACLs are private by default. To set ACL to public-read :
or
public file in s3 will be in the format
read csv file as dataframe on s3 without downloading to local disk
generate pre-signed URL
SNS
service to send SMS, email, or notifications to subscribers via AWS. To send messages, these components need to be considered:
publisher - script/code that generate topic or send topic from topic - topic of messages to subscribe to create sns client
create sns topic
list topics
delete topic
subscribe to a topic
list subscriptions
list subscriptions by topic
delete subscription
send a topic
send a single SMS to a single subscriber
Intro to Relational Database
Relational Database
data integrity by relationship simple SQL query
Example
entities which contain redundant information
In fact, the blue are person info, the green are university info, and the orange are organization info.
Therefore, this can be reformed into:
Migrate the university_professors table to new schema
Insert to organizations table If renaming table required If dropping column required: Data constraints
Attribute constraints
fix data structure, leads to more consistency, and data quality attribute constraints should be defined upon table creation change type after table creation Key constraints
key is the attribute which identify the record uniquely if this attribute can be removed while the record can still represent its uniqueness → superkey if no more attributes can be removed → key (actual key) usually a surrogate key is use to define record’s uniqueness → id column Referential integrity constraints
Foreign key (FK) points to the Primary key (PK) of another table Each FK must exist in the PK of the other table Database Design
2 ways to process data
Online Transaction Processing (OLTP) Involve around transaction data for day-to-day data processing store in conventional database Online Analytical Processing (OLAP) for business decision making Structuring data
data type and relationship are defined less flexible → hard to scale e.g., SQL, relationla database e.g., photos, chatlogs, audio does not follow larger schema self-describing structure Data modeling
process of creating model for data to be stored.
1) Conceptual data model → describe entities, relationships, attributes
2) Logical data model → define tables, columns, relationships
3) Physical data model → describe physical storage, CPU, backup systems
Dimensional modeling
Design of data warehouse for OLAP queries using star schema.
Elements of dimensional modeling
holds records of a metric connects to dimensions via foreign keys holds descriptions of attributes Star schema can be extended to snowflake schema → dimension tables are normalized to avoid repeating information.
dimension table in star schema (denormalized)
dimension table in snowflake schema (normalized)
data normalization to snowflake schema yields the following
advantages:
eliminate data redundancy → save storage better data integrity → accurate and consistent data disadvantages:
complex queries require more CPU Normal forms
approaches to normalize data with specific rules.
1NF
each record must be unique → no duplicate rows each cell must hold one value 2NF
Must satisfy 1NF and
if primary key is one column, it automatically satisfy 2NF if it is composite primary key (primary key is more than one column), each non-key column must be dependent on all the keys 3NF
must satisfy 2NF and
no transitive dependencies → non-key cannot depend on other non-key columns Database views
is a virtual table which is a result of stored query in memory. It can be queried like regular table to reduce retyping common quries.
get all views in the database
Keep in mind that this is non-materialized view which stores query, not its result. Therefore, everytime the view is called, it starts a new query to get a fresh result from database. That means, if the query for the view takes long time to run, the query referring to this view will always take long time as well.
On the other hand, materialized view stores result of the view for accessing. That means, querying this view will be much faster than non-materialized view with a drawback that the view might be up-to-date if not yet refreshed.
refresh materialized view
Table partitioning
Once records in our database become bigger, it is necessasry to scale it out so that the hardware cab still manage the queries.
separate low-priority column(s) to other table(s) split data according to rows Intro to Scala
Scala = modern Java
It combines object-oriented programming and functional programming → scalable
Object-oriented programming (OOP)
Every operation is a method call Functional programming (FP)
functions are first-class values operations should map input values to output values rather than change data inplace
every value in scala is an object and every operation is a method call
variable definition
Scala value types:
Double ← Default double precision floating point number Char → in sequence becomes String In Scala, it is more preferable to use val over var as it is immutable, which means it is more static and easier to interpret (no need to remember if it is changed somewhere). However, with drawback of creating new variable everytime the state of variable changes, more memory for data copying is most of the time necessary to satisfy this requirement.
Scala as a compiler
Scala itself is a interpreted language, which means it interprets while running. However, it can behave as a compiled language by:
create a file named Game.scala compile using command scalac Game.scala run using command scala Game Function
Array
Array in Scala is a sequence of objects that is mutable and share the same type
Dirtiest way to create an array
array can be parameterized before defining their values
define values inside array
As array is mutable but its definition uses val , the elements inside the array can be changed but the details about the array, e.g., length or data type, cannot.
Define array with mixed data type
List
List in Scala is a sequence of objects that is immutable and share the same type
List definition
List has many useful methods, for example:
:: cons - prepend a new element to an existing list (append exists in Scala but rarely used) initialize list with :: and Nil
Nil at the end is required since :: prepends data in front to a list behind. And this method belongs to a list object, where Nil can do this job.
::: concatenation - concat two lists if
evaluate statement if it is True or False . Do not forget () around the statement
operators
while
Big Data with PySpark
3V to define as Big Data
Variety - different sources and formats Terminologies in big data processing
Clustered computing - compute at multiple machines Parallel computing - simultaneous computation Distributed computing - collection of nodes running in parallel Batch processing - job broken into small pieces and running them on individual machines Real-time processing - immediate processing of data PySpark
Apache Spark provides high level API in Scala, Java, Python, and R. In this course only Python API, PySpark, is discussed. PySpark is originally written in Scala. The processing speed remains the same as in Scala while the syntax looks like Pandas and Sklearn.
As Spark has Spark Shell to quickly prototype the data processing in Scala, PySpark also has its PySpark Shell for this purpose in Python.
To start going into the world of Spark, one requires SparkContext usually abbreviated as sc → connecting point to Spark cluster. without it, you cannot run any PySpark jobs.
sc.version
Inspect currently using Spark version
sc.pythonVer
Inspect currently using python version
sc.master
Inspect URL of the cluster or “local” when run in local mode
rdd = sc.parallelize([1,2,3,4])
Load list values into SparkContext
rdd =sc.paralellize(”Hello”)
Load string Hello to SparkContext
rdd = sc.textFile(”text.txt”)
Load local file text.txt to SparkContext
Resilient Distributed Datasets (RDD)
RDD is backbone data type in pyspark. It’s a technique that Spark uses for its distributed data processing.
RDD stands for 3 important abilities:
Resilient - ability to withstand failures Distributed - spanning across multiple machines Datasets - collection of partitioned data → array, tables, tuples, etc. Partition - logical division of data storage
rdd = sc.parallelize([1,2,3,4], minPartitions=4)
Load list to sparkcontext with minimum 4 partitions to store the data
RDD Operations
RDD in pyspark supports:
Transformation - create new output RDDs from input RDDs Actions - computation on the input RDDs Transformation
follows Lazy evaluation → create computation graph first, and the graph will be executed only if it really needs to be evaluated. Supporting functions in Transformation are:
map() - apply function to all elements in the RDD return [1,4,9,16]
filter() - return a new RDD only elements that satisfy the condition return [3,4]
flatMap() - return multiple values for each element in the original RDD return [”hello”, “world”, “how”, “are”, “you”]
union() - return a combination of RDDs return [1, 2, 3, 4, 5, 6, 7]
Actions
return values after RDD computations. Important actions are:
collect() - return all elements of the RDD as an array rdd_map.collect() returns [1,4,9,16]
take(N) - return the first N elements of the RDD rdd_map.take(2) returns [1,4]
first() - return only the first element → equivalent to take(1) rdd_map.first() returns [1]
count() - return number of elements in the RDD rdd_map.count() returns 4
Pair RDDs
dataset with key/value pairs
each row is a key that maps to one or more values
create pair RDD from a list of key-value tuple
Transformations
All regular transformations work on pair RDD. But the functions that parsed in should operate on key value pairs rather than individual elements. Examples of RDD transformation:
reduceByKey() - combine values with the same key return [(”name1”, 23), (”name2”, 66), (”name3”, 12)]
sortByKey() - return RDD sorted by key return [(66, "name2"), (23, "name1"), (12, "name3")]
groupByKey() - group values with the same key return [ ("DE", ("Munich", "Berlin")), ("UK", ("London")), ("NL", ("Amsterdam")) ]
join() - join 2 pair RDD based on their key return [ ("name1", (20, 21)), ("name2", (23, 17)), ("name3", (12, 4)) ]
Actions
description
reduce(func) - aggregate elements of a regular RDD. This func should be commutative and associative. return 14
saveAsTextFile("filename") - save RDD into a text file with each partition as a separated file in case if the data is too big to store in memory using collect() coalesce() - combine data to save RDD as a single text file countByKey() - count how many times each key appears in the RDD. Works only with pair RDD of type (K, V) and the data should be able to fit the memory. return ("k1", 2), ("k2", 1)
collectAsMap() - return pair RDD as a dictionary. Again, works only when the data can fit into the memory return {"k1": 3, "k2": 6}
PySparkSQL
library for structured data like pandas. can create DataFrame for data wrangling and computation. can deal with both structured (relational database) or semi-structured data (json). support SQL queries (SELECT * FROM table ) or expression method (df.select() )
RDD requires SparkContext as its main entry point, Spark DataFrame requires SparkSession abbreviated by spark for this reason as well.
create dataframe from RDD create dataframe from file DataFrame Operations
Transformations
select() - transformation subsets the columns in the DataFrame filter() - filter the rows based on condition(s) groupby() - group DataFrame based on column(s) and perform the rest with aggregation orderby() - sort the DataFrame based on column(s) dropDuplicates() - create a new DataFrame with only unique rows withColumnRenamed() - create a new column with new names of specific columns Actions
printSchema() - print type of columns. general method of Spark, not specific as DataFrame action show(n_rows) - print the first 20 rows or n_rows in the DataFrame columns - print columns of the DataFrame describe() - calculate summary statistics of numerical columns in the DataFrame SQL Queries
In the previous part, DataFrame was manipulated using DataFrame API. In fact, DataFrame can be manipulated using SQL queries which is easier to understand and more portable. Both API and SQL are mostly interchangeable.
SQL queries need SparkSession sql("SQL statement") to execute SQL query
However, the DataFrame need to be transformed to SQL view before doing any queries.
Visualization
Plotting in PySpark natively is still not available. Use the following methods:
pyspark_dist_explore library MLlib
Instead of using sklearn, MLlib plays a role in ML tasks for big data using cluster of machines rather than a single machine. Three C’s of ML in MLlib:
Collaborative filtering → for recommendation engine Collaborative Filtering
Use Rating class as a wrapper for tuple (user, product, rating) Randomly split data (applies to any ML tasks) Train model using Alternating Least Squares Classification
LabelledPoint() wrapper to contain label and feature Prepare data for training (classification) Cleaning Data with PySpark
Data cleaning helps in terms of Performance and Organizing data flow. The first thing is to define schema of the data.
Parquet file
When we deal with csv file, there are issues like:
nested data require special handling any intermediate use requires redefining schema Therefore, let’s introduce .parquet format
support in Spark and other Big Data frameworks automatically stores schema information Working with Parquet
df = spark.read.format("parquet").load("filename.parquet")
df = spark.read.parquet("filename.parquet")
df.write.format("parquet").save("filename.parquet")
df.write.parquet("filename.parquet")
DataFrame
Caching
improve calculation speed by using memory or disk. Therefore, some big data may not fit in memory
Performance
Spark cluster is divided into 2 components:
driver - assign jobs → should have double RAM more than worker worker - do the job → more workers is better than big workers To improve Spark performance, the following factors are taken into account:
number of objects - more objects is better than larger but less ones general size of objects - objects with more or less the same size is better than big and small sizes schema - well-defined schema helps Spark not to redefine it again Shuffling
shuffling is an action that move data across workers to perform other actions. This comes with a bit more calculation cost.
usually shuffling comes with the command .repartition(num_partition) so try to avoid it join() is another function that do shuffling, if need to use this command, apply combined_df = df1.join(F.broadcast(df2)) Pipeline
is steps to transform input to desired output
input - csv, json, web services, databases transformation - withColumn(), filter(), drop() output - csv, parquet, database
!Intro to MongoDB
me mid 2 tango pls