Skip to content

Building a Cryptocurrency Data ETL Pipeline with Python and Bonobo

Posted on:August 5, 2023 at 04:57 PM
Reading time:14 minutes

Hi there fellow hackers, welcome back to my blog!

Today, we’re going to learn how to build an ETL (Extract, Transform, Load) pipeline using Python, specifically for cryptocurrency data. Our pipeline will extract data from a web source, transform it into a usable format, and load it into an SQLite database.

Table of contents

Open Table of contents

The Power of a Data Pipeline

In the modern era, data is the new oil, driving decision-making in everything from business strategies to governmental policies. However, raw data, in its natural form, is like unrefined crude oil - it needs to be processed and purified before it can be useful.

A data pipeline is like a refinery for data. It takes in raw data, processes it, and outputs data that’s clean, organized, and ready for analysis. With a data pipeline, you can automate this process, saving you time and ensuring that your analyses and decisions are based on the most up-to-date data.

A data pipeline can be invaluable when you’re dealing with data that updates frequently - like cryptocurrency prices. Without a data pipeline, you’d have to manually download and clean the data every day (or even every hour) to ensure your analyses are up-to-date. With a data pipeline, you can automate this process, allowing you to focus on analysis and decision-making.

What is an ETL Pipeline?

An ETL pipeline - standing for Extract, Transform, Load - is one of the most common types of data pipelines. It works exactly as the acronym suggests: first, it Extracts data from a source, then it Transforms the data into a suitable format, and finally, it Loads the data into a database or data warehouse for further analysis or reporting.

On the other hand, a relatively new paradigm known as ELT - Extract, Load, Transform - has emerged with the advent of modern data warehouses. The ELT process involves Extracting the data from the source, Loading the raw data directly into a data warehouse, and then Transforming the data as needed within the data warehouse itself. This approach takes advantage of the substantial processing power of modern data warehouses and can be more efficient when dealing with large volumes of data.

In both cases, the goal is the same: to automate the process of turning raw, unstructured data into a format that’s suitable for analysis. However, the choice between ETL and ELT can depend on several factors, including the amount of data you’re working with, the capabilities of your data warehouse, the complexity of the transformations you need to perform, and the specific requirements of your project.

For this tutorial, we’ll be focusing on the ETL paradigm, as we are working with quite a small amount of data (and our transformation process is fairly simple).

Bonobo: Python’s Lightweight ETL Framework

Bonobo is a simple, lightweight Extract, Transform, Load (ETL) framework for Python. It allows you to build ETL pipelines as Python scripts, using straightforward syntax. It’s designed to be simple to use and easy to understand, making it a great choice for beginners or for simple to moderately complex ETL tasks. Bonobo also offers flexibility with its ability to accommodate multiple data inputs and outputs, allowing for parallel processing and enhancing the overall efficiency of your data pipelines.

Implementation

Now that we got the explanations out the way, let’s start coding!

Project Setup

First off, we need to setup our Python environment. For this project, we’ll use a few helpful libraries:

You can install these with pip:

pip install bonobo beautifulsoup4 requests pandas

Next, let’s create a new folder for our project, and within it, we’ll create separate Python files for each step of the ETL process: extract.py, transform.py, load.py, and initialize_db.py to initialize our database and create a schema for the 2 tables in the database. You can also separate the processes by separating them into packages (directories) as I did here. We will also create a folder called ‘database’ to hold our SQLite database file.

Initializing the Database

Before we can start loading data into our database using our ETL pipeline, we first need to set it up. We’ll be using SQLite, a lightweight and file-based database that’s perfect for our needs. SQLite databases are stored in single file and can be interacted with using SQL commands, which makes them a great choice for small to medium-sized projects.

First, we’ll create a new SQLite database file. You can choose any location on your system for this file, but for the sake of organization, we’ll create a new directory in our project folder and call it database. This directory will store our actual database.

Now, inside the file initialize_db.py, we will write some logic to create the SQLite database inside the database/ directory you just created if it does not already exist, and also create the tables and their respective schemas to store the extracted data from the ETL pipeline.

import sqlite3


def create_crypto_table(conn: sqlite3.Connection):
    sql = """CREATE TABLE IF NOT EXISTS crypto (
                symbol text PRIMARY KEY,
                name text NOT NULL
                ); """
    conn.execute(sql)

def create_crypto_data_table(conn):
    sql = """CREATE TABLE IF NOT EXISTS crypto_data (
                    id integer PRIMARY KEY,
                    crypto_symbol text NOT NULL,
                    price real,
                    market_cap real,
                    # ... other data fields you want to save from the crawled page
                    timestamp integer NOT NULL,
                    FOREIGN KEY (crypto_symbol) REFERENCES crypto (symbol)
                ); """
    conn.execute(sql)

    # create index on the timestamp column
    conn.execute(
        "CREATE INDEX IF NOT EXISTS idx_timestamp ON crypto_data (timestamp);"
    )

def initialize_db():
    # creates a SQLite database in the 'database' folder
    with sqlite3.connect("database/crypto.db") as conn:
        create_crypto_table(conn)
        create_crypto_data_table(conn)

The crypto table will store general information about each cryptocurrency, like its name and symbol. It will look like this:

symbolname
BTC-USDBitcoin USD

The crypto_data table will store specific data about each cryptocurrency, like its current price, market cap, etc. It will look like this:

idcrypto_symbolprice
1BTC-USD35000

Notice the FOREIGN KEY in the crypto_data table - this creates a link between the crypto_symbol in the crypto_data table and the symbol in the crypto table. This ensures that every row in the crypto_data table corresponds to a valid cryptocurrency in the crypto table. Also note that this field must be unique.

Extraction: The Data Source

Our data source is the Yahoo Finance cryptocurrency page, which provides comprehensive data on various cryptocurrencies. We will be extracting data such as name, symbol, price, etc. of each cryptocurrency listed in the table.

To do this, we’ll use BeautifulSoup to parse the HTML of the page and find the information we need. In extract.py, we will send a request to the webpage, fetch its HTML content, and outputs the parsed HTML.

from bs4 import BeautifulSoup
from requests import request


# NOTE: user-agent header needs to be changed for query params to work as this mimics browser interaction

def extract():
    url = "https://finance.yahoo.com/crypto"
    request_params = {"count": 100, "offset": 0}
    headers = {
        "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.142 Safari/537.36"
    }

    response = request("GET", url=url, params=request_params, headers=headers)

    yield BeautifulSoup(response.text, "html.parser")

Transformation: Preparing the Data

The raw data we extracted is messy and not ready for analysis. We need to clean it up and organize it. This is where the ‘Transform’ step comes in.

In transform.py, we will take in the raw data and output two neat pandas DataFrames. One DataFrame holds the general information about each cryptocurrency (which will be saved in the crypto table in the database), and the other holds the specific details like price, market cap, etc. (which will be stored in the crypto_data table we created above).

import locale
import time
from typing import Tuple

import pandas as pd
from bs4 import BeautifulSoup

# NOTE: setlocale for numeric string parsing "26,000" -> 26000.0
locale.setlocale(locale.LC_ALL, "")


def format_float(value):
    if value == "N/A":
        return None
    if value[-1] == "%":
        return locale.atof(value[1:-1])
    if value[0] == "-" or value[0] == "+":
        return locale.atof(value[1:])
    return locale.atof(value)

def format_text_float(value):
    tens = {"K": 10e3, "M": 10e6, "B": 10e9, "T": 10e12, "Q": 10e15}

    if value == "N/A":
        return None

    if not value[-1] in tens:
        return locale.atof(value)

    factor, exp = value[0:-1], value[-1].upper()
    return locale.atof(factor) * tens[exp]

def format_table(raw_soup):
    table = raw_soup.find("div", {"id": "scr-res-table"}).find("table")
    table_columns = [
        column_header.text.strip() for column_header in table.thead.tr.find_all("th")
    ]
    table_df = pd.DataFrame(columns=table_columns)

    for row in table.tbody.find_all("tr"):
        df_row = [table_cell.text.strip() for table_cell in row.find_all("td")]
        table_df.loc[len(table_df)] = df_row

    crypto_df = pd.DataFrame().assign(symbol=table_df["Symbol"], name=table_df["Name"])

    # NOTE: adding extra column "timestamp" to record time of extraction
    crypto_data_df = pd.DataFrame().assign(
        crypto_symbol=table_df["Symbol"],
        price=table_df["Price (Intraday)"].apply(format_float),
        market_cap=table_df["Market Cap"].apply(format_text_float),
        # ... other data fields you want to save from the crawled page
        timestamp=time.time(),
    )

    return crypto_df, crypto_data_df

def transform(soup):
    crypto_df, crypto_data_df = format_table(soup)
    yield crypto_df, crypto_data_df

Loading: Storing the Data

Finally, we want to store this cleaned data for future use. We’ll do this by saving the formatted data we cleaned in the transformation step.

In load.py, we write two functions. The load_crypto function loads data into the crypto table, and the load_crypto_data function loads data into the crypto_data table.

import sqlite3


def load_crypto(*data):
    crypto_df, _ = data
    with sqlite3.connect("database/crypto.db") as conn:
      curs = conn.cursor()
      for _, row in crypto_df.iterrows():
          curs.execute("INSERT OR IGNORE INTO crypto VALUES (?, ?)", (row))

def load_crypto_data(*data):
    _, crypto_data_df = data
    with sqlite3.connect("database/crypto.db") as conn:
        crypto_data_df.to_sql("crypto_data", conn, if_exists="append", index=False)

Notice that while we use the to_sql function from pandas to easily write our crypto_data DataFrame into the SQLite database, we used raw SQL to load the crypto DataFrame. This is because we need to check if there are any duplicates in the crypto table, and if there are, we need not to insert new values.

Bringing It All Together

Now that we have all the parts of our ETL pipeline, it’s time to bring them together. We use Bonobo to define our ETL graph. In a new main.py file, we define a get_graph function where we add all our ETL functions as nodes in the Bonobo graph. The data will flow from the extract node, through the transform node, to the two load nodes.

Finally, we run our pipeline by calling bonobo.run(get_graph()).

import bonobo

from .initialize_db import initialize_db
from .extract import extract
from .transform import transform
from .load import load_crypto, load_crypto_data


def get_graph(**options):
    graph = bonobo.Graph()

    graph.add_chain(extract, transform)
    graph.add_chain(load_crypto, _input=transform)  # loading df_crypto
    graph.add_chain(load_crypto_data, _input=transform)  # loading df_crypto_data

    return graph

if __name__ == "__main__":
    initialize_db()
    bonobo.run(get_graph())

And voila! If you run python main.py on your terminal, you should see the pipeline execute and load the data into your database!

Extras

Iterating Over All Pages of the Webpage

If you may have wondered while inspecting the Yahoo Finance Crypto page, yes you are right the Crypto table is paginated. To extract all the tables on every page, you would need to iterate over each page. This would be your challenge to think about how to do this in the extraction step!

Automating the Pipeline

If you want your script to run daily, hourly, etc. we can achieve this by scheduling our Python script to run at regular intervals. There are several ways to do this, from simple solutions like using Task Scheduler on Windows or cron jobs on Unix-based systems, to more complex ones like using a job scheduler like Apache Airflow. This will also be your challenge!

Conclusion

Congrats! You’ve built your own cryptocurrency ETL pipeline! Now, you have an automated system that extracts cryptocurrency data from the web, transforms it into a usable format, and loads it into a database. This project is a great way to get hands-on experience with Python, web scraping, data transformation, and working with databases. You can now use this valuable data to gain insights into the fascinating world of cryptocurrencies. Happy data analyzing!

References