Hi there fellow hackers, welcome back to my blog!
Today, we’re going to learn how to build an ETL (Extract, Transform, Load) pipeline using Python, specifically for cryptocurrency data. Our pipeline will extract data from a web source, transform it into a usable format, and load it into an SQLite database.
Table of contents
Open Table of contents
The Power of a Data Pipeline
In the modern era, data is the new oil, driving decision-making in everything from business strategies to governmental policies. However, raw data, in its natural form, is like unrefined crude oil - it needs to be processed and purified before it can be useful.
A data pipeline is like a refinery for data. It takes in raw data, processes it, and outputs data that’s clean, organized, and ready for analysis. With a data pipeline, you can automate this process, saving you time and ensuring that your analyses and decisions are based on the most up-to-date data.
A data pipeline can be invaluable when you’re dealing with data that updates frequently - like cryptocurrency prices. Without a data pipeline, you’d have to manually download and clean the data every day (or even every hour) to ensure your analyses are up-to-date. With a data pipeline, you can automate this process, allowing you to focus on analysis and decision-making.
What is an ETL Pipeline?
An ETL pipeline - standing for Extract, Transform, Load - is one of the most common types of data pipelines. It works exactly as the acronym suggests: first, it Extracts data from a source, then it Transforms the data into a suitable format, and finally, it Loads the data into a database or data warehouse for further analysis or reporting.
On the other hand, a relatively new paradigm known as ELT - Extract, Load, Transform - has emerged with the advent of modern data warehouses. The ELT process involves Extracting the data from the source, Loading the raw data directly into a data warehouse, and then Transforming the data as needed within the data warehouse itself. This approach takes advantage of the substantial processing power of modern data warehouses and can be more efficient when dealing with large volumes of data.
In both cases, the goal is the same: to automate the process of turning raw, unstructured data into a format that’s suitable for analysis. However, the choice between ETL and ELT can depend on several factors, including the amount of data you’re working with, the capabilities of your data warehouse, the complexity of the transformations you need to perform, and the specific requirements of your project.
For this tutorial, we’ll be focusing on the ETL paradigm, as we are working with quite a small amount of data (and our transformation process is fairly simple).
Bonobo: Python’s Lightweight ETL Framework
Bonobo is a simple, lightweight Extract, Transform, Load (ETL) framework for Python. It allows you to build ETL pipelines as Python scripts, using straightforward syntax. It’s designed to be simple to use and easy to understand, making it a great choice for beginners or for simple to moderately complex ETL tasks. Bonobo also offers flexibility with its ability to accommodate multiple data inputs and outputs, allowing for parallel processing and enhancing the overall efficiency of your data pipelines.
Implementation
Now that we got the explanations out the way, let’s start coding!
Project Setup
First off, we need to setup our Python environment. For this project, we’ll use a few helpful libraries:
- Bonobo for creating the ETL pipeline.
- BeautifulSoup and requests for web scraping.
- Pandas for data manipulation.
- SQLite for data storage.
You can install these with pip:
pip install bonobo beautifulsoup4 requests pandas
Next, let’s create a new folder for our project, and within it, we’ll create separate Python files for each step of the ETL process: extract.py
, transform.py
, load.py
, and initialize_db.py
to initialize our database and create a schema for the 2 tables in the database. You can also separate the processes by separating them into packages (directories) as I did here. We will also create a folder called ‘database’ to hold our SQLite database file.
Initializing the Database
Before we can start loading data into our database using our ETL pipeline, we first need to set it up. We’ll be using SQLite, a lightweight and file-based database that’s perfect for our needs. SQLite databases are stored in single file and can be interacted with using SQL commands, which makes them a great choice for small to medium-sized projects.
First, we’ll create a new SQLite database file. You can choose any location on your system for this file, but for the sake of organization, we’ll create a new directory in our project folder and call it database
. This directory will store our actual database.
Now, inside the file initialize_db.py
, we will write some logic to create the SQLite database inside the database/
directory you just created if it does not already exist, and also create the tables and their respective schemas to store the extracted data from the ETL pipeline.
import sqlite3
def create_crypto_table(conn: sqlite3.Connection):
sql = """CREATE TABLE IF NOT EXISTS crypto (
symbol text PRIMARY KEY,
name text NOT NULL
); """
conn.execute(sql)
def create_crypto_data_table(conn):
sql = """CREATE TABLE IF NOT EXISTS crypto_data (
id integer PRIMARY KEY,
crypto_symbol text NOT NULL,
price real,
market_cap real,
# ... other data fields you want to save from the crawled page
timestamp integer NOT NULL,
FOREIGN KEY (crypto_symbol) REFERENCES crypto (symbol)
); """
conn.execute(sql)
# create index on the timestamp column
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_timestamp ON crypto_data (timestamp);"
)
def initialize_db():
# creates a SQLite database in the 'database' folder
with sqlite3.connect("database/crypto.db") as conn:
create_crypto_table(conn)
create_crypto_data_table(conn)
The crypto
table will store general information about each cryptocurrency, like its name and symbol. It will look like this:
symbol | name |
---|---|
BTC-USD | Bitcoin USD |
The crypto_data
table will store specific data about each cryptocurrency, like its current price, market cap, etc. It will look like this:
id | crypto_symbol | price | … |
---|---|---|---|
1 | BTC-USD | 35000 | … |
Notice the FOREIGN KEY
in the crypto_data
table - this creates a link between the crypto_symbol
in the crypto_data
table and the symbol
in the crypto
table. This ensures that every row in the crypto_data
table corresponds to a valid cryptocurrency in the crypto
table. Also note that this field must be unique.
Extraction: The Data Source
Our data source is the Yahoo Finance cryptocurrency page, which provides comprehensive data on various cryptocurrencies. We will be extracting data such as name, symbol, price, etc. of each cryptocurrency listed in the table.
To do this, we’ll use BeautifulSoup to parse the HTML of the page and find the information we need. In extract.py
, we will send a request to the webpage, fetch its HTML content, and outputs the parsed HTML.
from bs4 import BeautifulSoup
from requests import request
# NOTE: user-agent header needs to be changed for query params to work as this mimics browser interaction
def extract():
url = "https://finance.yahoo.com/crypto"
request_params = {"count": 100, "offset": 0}
headers = {
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.142 Safari/537.36"
}
response = request("GET", url=url, params=request_params, headers=headers)
yield BeautifulSoup(response.text, "html.parser")
Transformation: Preparing the Data
The raw data we extracted is messy and not ready for analysis. We need to clean it up and organize it. This is where the ‘Transform’ step comes in.
In transform.py
, we will take in the raw data and output two neat pandas DataFrames. One DataFrame holds the general information about each cryptocurrency (which will be saved in the crypto
table in the database), and the other holds the specific details like price, market cap, etc. (which will be stored in the crypto_data
table we created above).
import locale
import time
from typing import Tuple
import pandas as pd
from bs4 import BeautifulSoup
# NOTE: setlocale for numeric string parsing "26,000" -> 26000.0
locale.setlocale(locale.LC_ALL, "")
def format_float(value):
if value == "N/A":
return None
if value[-1] == "%":
return locale.atof(value[1:-1])
if value[0] == "-" or value[0] == "+":
return locale.atof(value[1:])
return locale.atof(value)
def format_text_float(value):
tens = {"K": 10e3, "M": 10e6, "B": 10e9, "T": 10e12, "Q": 10e15}
if value == "N/A":
return None
if not value[-1] in tens:
return locale.atof(value)
factor, exp = value[0:-1], value[-1].upper()
return locale.atof(factor) * tens[exp]
def format_table(raw_soup):
table = raw_soup.find("div", {"id": "scr-res-table"}).find("table")
table_columns = [
column_header.text.strip() for column_header in table.thead.tr.find_all("th")
]
table_df = pd.DataFrame(columns=table_columns)
for row in table.tbody.find_all("tr"):
df_row = [table_cell.text.strip() for table_cell in row.find_all("td")]
table_df.loc[len(table_df)] = df_row
crypto_df = pd.DataFrame().assign(symbol=table_df["Symbol"], name=table_df["Name"])
# NOTE: adding extra column "timestamp" to record time of extraction
crypto_data_df = pd.DataFrame().assign(
crypto_symbol=table_df["Symbol"],
price=table_df["Price (Intraday)"].apply(format_float),
market_cap=table_df["Market Cap"].apply(format_text_float),
# ... other data fields you want to save from the crawled page
timestamp=time.time(),
)
return crypto_df, crypto_data_df
def transform(soup):
crypto_df, crypto_data_df = format_table(soup)
yield crypto_df, crypto_data_df
Loading: Storing the Data
Finally, we want to store this cleaned data for future use. We’ll do this by saving the formatted data we cleaned in the transformation step.
In load.py
, we write two functions. The load_crypto
function loads data into the crypto
table, and the load_crypto_data
function loads data into the crypto_data
table.
import sqlite3
def load_crypto(*data):
crypto_df, _ = data
with sqlite3.connect("database/crypto.db") as conn:
curs = conn.cursor()
for _, row in crypto_df.iterrows():
curs.execute("INSERT OR IGNORE INTO crypto VALUES (?, ?)", (row))
def load_crypto_data(*data):
_, crypto_data_df = data
with sqlite3.connect("database/crypto.db") as conn:
crypto_data_df.to_sql("crypto_data", conn, if_exists="append", index=False)
Notice that while we use the to_sql
function from pandas to easily write our crypto_data
DataFrame into the SQLite database, we used raw SQL to load the crypto
DataFrame. This is because we need to check if there are any duplicates in the crypto
table, and if there are, we need not to insert new values.
Bringing It All Together
Now that we have all the parts of our ETL pipeline, it’s time to bring them together. We use Bonobo to define our ETL graph. In a new main.py
file, we define a get_graph
function where we add all our ETL functions as nodes in the Bonobo graph. The data will flow from the extract
node, through the transform
node, to the two load
nodes.
Finally, we run our pipeline by calling bonobo.run(get_graph())
.
import bonobo
from .initialize_db import initialize_db
from .extract import extract
from .transform import transform
from .load import load_crypto, load_crypto_data
def get_graph(**options):
graph = bonobo.Graph()
graph.add_chain(extract, transform)
graph.add_chain(load_crypto, _input=transform) # loading df_crypto
graph.add_chain(load_crypto_data, _input=transform) # loading df_crypto_data
return graph
if __name__ == "__main__":
initialize_db()
bonobo.run(get_graph())
And voila! If you run python main.py
on your terminal, you should see the pipeline execute and load the data into your database!
Extras
Iterating Over All Pages of the Webpage
If you may have wondered while inspecting the Yahoo Finance Crypto page, yes you are right the Crypto table is paginated. To extract all the tables on every page, you would need to iterate over each page. This would be your challenge to think about how to do this in the extraction step!
Automating the Pipeline
If you want your script to run daily, hourly, etc. we can achieve this by scheduling our Python script to run at regular intervals. There are several ways to do this, from simple solutions like using Task Scheduler on Windows or cron jobs on Unix-based systems, to more complex ones like using a job scheduler like Apache Airflow. This will also be your challenge!
Conclusion
Congrats! You’ve built your own cryptocurrency ETL pipeline! Now, you have an automated system that extracts cryptocurrency data from the web, transforms it into a usable format, and loads it into a database. This project is a great way to get hands-on experience with Python, web scraping, data transformation, and working with databases. You can now use this valuable data to gain insights into the fascinating world of cryptocurrencies. Happy data analyzing!
References
- Bonobo: https://www.bonobo-project.org/