Welcome to my summary on Data Engineering Certification from DataCamp. This is an incomplete version. Some topic may not yet been covered. Please revisit this page from time to time to see the progress.
!Writing Efficient Code in Python
Build-in functions
Built-in types: list , tuple , set , dict , and others
create a special column name source (instead of the default group)to identify which row comes from which file
SQL
sql2csv --db "sqlite:///database.db" \
--query "SELECT * FROM base" \
> filename.csv
connect to database sqlite:///database.db using query SELECT * FROM base and save to filename.csv
csvsql --query "SELECT * FROM base LIMIT 1" filename.csv
use the above query to select data from local filename.csv file
csvsql --query "SELECT * FROM base1 INNER JOIN base2 ON ID" base1.csv base2.csv
can use for multiple csv but the bases should appear in order according to SQL query
csvsql --no-inference --no-constraints \
--db "sqlite:///database.db" \
--insert filename.csv
insert filename.csv to database
--no-inference disable type parsing (consider everything as string)
--no-constraints generate schema without length limit or null check
cron job
echo "* * * * * python hello_world.py" | crontab
Add as job that runs every minute on the minute to crontab
there are 5 * to indicate time for a cron job
minute(0-59)
hour(0-23)
day of month(1-31)
month(1-12)
day of week(0-6)
crontab -l
list all cron jobs
Intro to Bash
Bash is a script of shell commands
Usually starts with
#!/usr/bash or #!/bin/bash
file extension is .sh
run by bash scriptname.sh
arguments can be added by bash scriptname.sh arg1 arg2
These arg1 arg2 can be accessed inside the script by:
$1 get arg1
$2 get arg2
$@ get arg1 arg2
$# get 2 as the amount of arguments
variables
var1="sam" assign the value "sam" to var1 . Note that there must be no space between =
echo "There are $var1 items" will print There are 6 items
quotation marks
'sometext' interpreted literally
"sometext" interpreted literally except $ and `
`sometext` interpreted as bash command and return STDOUT into a variable
$(command) is equivalent to `command` but the $(command) is used more in modern applications
numerical calculation
expr 1 + 4 get 5
expr 11 + 3.5 get error as expr takes only int into account
echo "11 + 3.5" | bc get 14.5 as bc (basic calculator) can do more
echo "11/3" | bc get 3 since the decimal place has not been defined
echo "scale=3; 11/3" | bc get 3.666
$((11+3)) is expr 11 + 3
array
numerical-indexed
declare -a my_array to define empty array
or my_array=(1 2 3) to provide values while defining array. Note that space is used as separator
echo ${my_array[@]} get (1 2 3)
echo ${#my_array[@]} get 3 (length of array)
echo array[@]:N:M where N is the starting idx and M is how many elem to return
if-statement
if [ condition ]; then
# some code
else
# other code
fi
dont forget [ ]
special flags
-e if the file exists
-s if the file exists and has size greater than zero
-r if the file exists and readable
-w if the file exists and writable
combine conditions with && for AND and || for OR
for-loop
for x in 1 2 3
do
echo $x
done
for x in {START .. STOP .. INCREMENT} # like range(START, STOP, INCREMENT)
do
echo $x
done
for ((x=START;x<=STOP;x+=INCREMENT)) # use double parentheses to loop c-style
do
echo $x
done
for book in books/* # loop over files in a directory
do
echo $book
done
for book in $(ls books/ | grep -i 'air')
# use shell within a shell with $()
# resulting files with 'air' in their names inside the book directory
do
echo #book
done
function
function_name () {
# function code
return # return status code
}
function function_name {
# function code
return # return status code
}
function function_name {
echo "access the first input $1"
for file in $@
do
echo "access to $file"
done
}
function_name "file1.txt" "file2.txt" "file3.txt"
return does not return the return value but status code, where success is 0 and failure is 1-255, which is captured as $? . To output values from a function, one should
make a global variable and update it
use echo at the last line of the function and use shell within a shell to call the function
!Unit Testing
This makes like 100 times for testing a function before going into production
simple unit test process
create a file with naming convention of test_ in front, e.g., to test myfunc this test file should be test_myfunc.py
inside the testing file
import pytest
import myfunc
def test_cleaning():
assert myfunc("fixed_input") == "fixed_output"
def test_data_type():
message = "Input 'another_input' should gives int as output"
# if it does not raise ValueError or other error, it fails
run the unit test pytest test_myfunc.py
comparing float in assert statement, use assert 0.1+0.1+0.1 == pytest.approx(0.3)
Project structure composed of src to contains source code and test to contain tests with corresponding test modules for each source file. Inside test file one can split different source functions to test by using class
import pytest
from data.preprocessing_helpers import func1, func2
class TestFunc1(object):
def test_functionality1_func1(self):
...
def test_functionality2_func1(self):
...
class TestFunc2(object):
def test_functionality1_func2(self):
...
def test_functionality2_func2(self):
...
if a function is expected to fail
import pytest
class TestFunc1(object):
@pytest.mark.xfail
def test_something(self):
...
# test_something is expected to fail at first.
# The above decorator is used to inform pytest that do not alarm on this fail.
@pytest.mark.skipif(bool_expression)
def test_another(self):
...
# test_another is expected to fail if bool_expression is true.
# pytest will not alarm if the condition satisfied
dealing with testing with files
create raw data file + create ref result file → function → compare the result file from function and ref result file → remove raw data file + remove ref result file
The process in orange is call Fixture in which pytest has a function to deal with
import pytest
import os
@pytest.fixture
def my_fixture():
# create data and so on
yield raw_data_path, ref_data
# remove data
def test_something(my_fixture):
...
data = my_fixture
...
or use tmpdir function from pytest to create files in this tmpdir and remove files and this tmpdir after the test ends
@pytest.fixture
def my_fixture(tmpdir):
raw_data = tmpdir.join("raw.txt")
clean_data = tmpdir.join("clean.txt")
with open(raw_data, "w") as f:
f.write("3131\n"
"21212\n"
"2111\n"
)
yield raw_data, clean_data
!Object Oriented
Object = states + behaviors = attributes + methods
attribute <> variable <> obj.my_attribute
method <> function() <> obj.my_method
class Customer:
MAX_VALUE = 10 # a global constant across the class is defined
def __init__(self, name, balance=0):
# function __init__ is called upon object creation
# whatever arguement give
self.name = name
self.balance = balance
# self.variable means an attribute which belongs to this object
def set_name(self, new_name):
# add a method to object by adding a function with self as the first argument
self.name = new_name
def identify(self):
print("I am "+name)
@classmethod # classmethod is a method that is assigned for the class, not the object
def from_file(cls, filename):
# classmethod needs cls as the first argument
with open(filename, "r") as f:
# get name and balance from file
name = f.readline()
balance = f.readline()
return cls(name, balance)
# call cls as a return value means call __init__() of the class Customer when from_file() is called
Intro to Airflow
Airflow is a platform to program workflows → Creation, Scheduling, Monitoring
Workflow in Airflow is written in Python implemented as DAG (Directed Acyclic Graphs)
DAG is a set of tasks forming into graph in which:
unidirection → task flow indicates inheritance and dependencies
acyclic → no loop or repeat
from airflow.models import DAG
from datetime import datetime
default_args = {
'owner': 'jdoe',
'email': 'jdoe@email.com',
'start_date': datetime(2022, 12, 30)
}
dag = DAG('task_name', default_args=default_args)
Airflow Operator
Bash Operator
from airflow.operators.bash_operator import BashOperator
task1 = BashOperator(task_id='first_task',
bash_command='echo 1', # this can be bash command or shell script
dag=example_dag)
# chain tasks
task1 >> task2
Python Operator
from airflow.operators.python_operator import PythonOperator
Spark is used to distribute computing tasks to workers or nodes, in which the main connection between user and the cluster is a simple as calling SparkContext and the interface of the connection is SparkSession .
view tables
SparkSession.catalog.listTables()
use SQL query
query = '''
SELECT * FROM table LIMIT 10
'''
result = SparkSession.sql(query)
print(result.show())
# can convert result to pd dataframe
df_result = result.toPandas()
convert pd dataframe to spark
df = pd.DataFrame(np.random.random(10))
# create spark dataframe from pd dataframe,
# but this is not registered as a table in spark catalog yet
Transformer classes have .transform() method which takes in df and returns df with an additional transformed column
Estimator classes have .fit() method which takes in df but returns a model object
ML pipeline in spark allows only numerical columns for calculation. In many cases, Spark guesses datatype of each column for us once throwing into the pipeline but they could guess incorrectly, e.g., bus line number which is number but does not contain numerical meaning behind. Therefore, it would be safer to transform each column to their corresponding data type before throwing into ML pipeline.
cast data type → target data type should be int integer or float double
Boto3 is python interface to AWS. There are multiple services available in AWS for data projects but some interesting services are:
Simple Storage Service (S3) - store files (or so-called objects) in AWS
Simple Notification Service (SNS) - send alerts or notifications (via email or other channels)
Comprehend - sentiment analysis on text data
Rekognition - extract text from image and image classification
S3
service to store files in folders on cloud. In fact, terminology in cloud computing use:
file → object
folder → bucket
create boto3 client:
import boto3
s3 = boto3.client('s3', region-name='us-east-1',
aws_access_key_id=AWS_KEY_ID,
aws_secret_access_key=AWS_SECRET)
create bucket
bucket = s3.create_bucket(Bucket='bucket-name')
list buckets
# get all buckets and their metadata
bucket_response = s3.list_buckets()['Buckets']
delete bucket
response = s3.delete_bucket('bucket-name')
upload file
s3.upload_file(
Filename='path/to/file.txt', # local file path
Bucket='bucket_name', # name of bucket
Key='filename.txt' # object name in bucket
)
list objects in a bucket
response = s3.list_objects(
Bucket='bucket-name', # which bucket to look in
MaxKeys=2, # how many objects to show as result
Prefix='pref' # only objects with given prefix will be shown
)['Contents']
get object metadata
response = s3.head_object(
Bucket='bucket-name',
Key='filename.txt'
)
download object
s3.download_file(
Filename='download/path/name.txt', # local path to download
Bucket='bucket-name', # name of bucket
Key='filename.txt' # object name in buckeet
)
delete object
s3.delete_object(
Bucket='bucket-name', # name of bucket
Key='filename.txt' # object name in bucket
)
Permission
By default, AWS denies every request to access to data unless AWS_KEY_ID and AWS_SECRET of users with appropriate permission are defined. Permissions can be given by the following:
IAM - provide permissions across AWS services
Bucket Policy - permission on buckets
Access Control List (ACL) - permission on objects
Pre-signed URL - temporary access to an object
Here, we will discuss about ACL and Pre-signed URL only.
There are 2 types of ACL to define: public-read and private . Upon uploading, all object ACLs are private by default. To set ACL to public-read :
s3.upload_file(
Filename='initial/path/to/file.csv',
Bucket='bucket-name',
Key='targetfile.csv'
)
s3.put_object_acl(
Bucket='bucket-name',
Key='targetfile.csv',
ACL='public-read'
)
or
s3.upload_file(
Filename='initial/path/to/file.csv',
Bucket='bucket-name',
Key='targetfile.csv',
ExtraArgs={'ACL':'public-read'}
)
public file in s3 will be in the format
{bucket-name}.s3.amazonaws.com/{object-key}
read csv file as dataframe on s3 without downloading to local disk