How to Create Custom Zipline Bundles From Binance Data Part 1

7 minute read

We have successfully installed Zipline and downloaded all trading pairs from Binance. Now it is time to create custom data bundles from those data sets. In tutorial part 1, I am going to show you how to create the data bundle from csv files. In part 2, we are going to skip downloading csv files and create Zipline data bundles directly from Binance public API.

First, create a file named binance_csv.py, and import all the modules we need. Note: Save this file into Zipline package folder in our env $HOME/anaconda3/envs/zipline/lib/python3.5/site-packages/zipline/data/bundles

import bs4 as bs
from binance.client import Client
import csv
from datetime import datetime as dt
import numpy as np
from os import listdir, mkdir, remove
from os.path import exists, isfile, join
from pathlib import Path
import pandas as pd
import pickle
import requests
from trading_calendars import register_calendar
from trading_calendars.exchange_calendar_binance import BinanceExchangeCalendar

If you have followed my previous tutorials, you should have all the packages in the env except binance.client. Let us get that out of the way.

(zipline) pip install python-binance

Don’t worry about BinanceExchangeCalendar for now, we will create it later.

Binance Data in CSV

Create a script to download Binance trading data in csv.

# Set up the directories where we are going to save those csv files
user_home = str(Path.home())
csv_data_path = join(user_home, '.zipline/custom_data/csv')
custom_data_path = join(user_home, '.zipline/custom_data')

def save_csv(reload_tickers=False, interval='1m'):
    """
    Save Zipline bundle ready csv for Binance trading ticker pair
    :param reload_tickers: True or False
    :type reload_tickers: boolean
    :param interval: Default 1m. Other available ones: 1m, 3m, 5m, 15m, 30m, 1h, 2h, 4h, 6h, 8h, 12h, 1d, 3d, 1w, 1M
    :type interval: str
    """

    if not exists(csv_data_path):
        mkdir(csv_data_path)

    if reload_tickers:
        ticker_pairs = tickers()  # tickers() will be included in put-it-all-together session
    else:
        ticker_pickle = join(
            custom_data_path, 'binance_ticker_pairs.pickle')
        with open(ticker_pickle, 'rb') as f:
            ticker_pairs = pickle.load(f)

    client = Client("", "")
    start = '2017-7-14'  # Binance launch date
    end = dt.utcnow().strftime('%Y-%m-%d')  # Current day
    csv_filenames = [csv_filename for csv_filename in listdir(
        csv_data_path) if isfile(join(csv_data_path, csv_filename))]

    for ticker_pair in ticker_pairs:
        filename = "Binance_{}_{}.csv".format(ticker_pair, interval)        

        if csv_filenames != [] and filename in csv_filenames:
            remove(join(csv_data_path, filename))

        output = join(csv_data_path, filename)
        klines = client.get_historical_klines_generator(
            ticker_pair, interval, start, end)
        for index, kline in enumerate(klines):
            with open(output, 'a+') as f:
                writer = csv.writer(f)
                if index == 0:
                    writer.writerow(
                        ['date', 'open', 'high', 'low', 'close', 'volume'])
                # Make a real copy of kline
                # Binance API forbids the change of open time
                line = kline[:]
                del line[6:]
                line[0] = np.datetime64(line[0], 'ms')
                line[0] = pd.Timestamp(line[0], 'ms')
                writer.writerow(line)

        print('{} saved.'.format(filename))

    return [file for file in listdir(csv_data_path) if isfile(join(csv_data_path, file))]

Create Custom Ingest Function

Once all csv have been downloaded, we need a function to loop through those files and create custom bundles.

def csv_to_bundle(interval='1m'):

    def ingest(environ,
               asset_db_writer,
               minute_bar_writer,
               daily_bar_writer,
               adjustment_writer,
               calendar,
               start_session,
               end_session,
               cache,
               show_progress,
               output_dir
               ):

        # Get all available csv filenames
        csv_filenames = save_csv(reload_tickers=True, interval=interval)
        # Loop through the filenames and create a dict to keep some temp meta data
        ticker_pairs = [{'exchange': pair.split('_')[0],
                         'symbol': pair.split('_')[1],
                         'interval':pair.split('_')[2].split('.')[0],
                         'file_path':join(csv_data_path, pair)}
                        for pair in csv_filenames]

        # Create an empty meta data dataframe
        metadata_dtype = [
            ('symbol', 'object'),
            ('asset_name', 'object'),
            ('start_date', 'datetime64[ns]'),
            ('end_date', 'datetime64[ns]'),
            ('first_traded', 'datetime64[ns]'),
            ('auto_close_date', 'datetime64[ns]'),
            ('exchange', 'object'), ]
        metadata = pd.DataFrame(
            np.empty(len(ticker_pairs), dtype=metadata_dtype))

        minute_data_sets = []
        daily_data_sets = []

        for sid, ticker_pair in enumerate(ticker_pairs):
            df = pd.read_csv(ticker_pair['file_path'],
                             index_col=['date'],
                             parse_dates=['date'])

            symbol = ticker_pair['symbol']
            asset_name = ticker_pair['symbol']
            start_date = df.index[0]
            end_date = df.index[-1]
            first_traded = start_date
            auto_close_date = end_date + pd.Timedelta(days=1)
            exchange = ticker_pair['exchange']

            # Update metadata
            metadata.iloc[sid] = symbol, asset_name, start_date, end_date, first_traded, auto_close_date, exchange

            if ticker_pair['interval'] == '1m':
                minute_data_sets.append((sid, df))

            if ticker_pair['interval'] == '1d':
                daily_data_sets.append((sid, df))

        if minute_data_sets != []:
            # Dealing with missing sessions in some data sets
            for daily_data_set in daily_data_sets:
                try:
                    minute_bar_writer.write(
                        [daily_data_set], show_progress=True)
                except Exception as e:
                    print(e)

        if daily_data_sets != []:
            # Dealing with missing sessions in some data sets
            for daily_data_set in daily_data_sets:
                try:
                    daily_bar_writer.write(
                        [daily_data_set], show_progress=True)
                except Exception as e:
                    print(e)
        
        asset_db_writer.write(equities=metadata)
        print(metadata)
        adjustment_writer.write()

    return ingest

Create Custom Trading Calendar

Looking good! Next, we can create a custom trading calendar for Binance. Since the release of Harrison’s tutorial on this topic, Quantopian has moved the trading calendar into a separate package. In our setup, you should be find them in this directory $HOME/anaconda3/envs/zipline/lib/python3.5/site-packages/trading_calendars

Fortunately, we don’t have to write up our own 24/7 calendar from scratch. We just need to copy always_open.py and paste it as exchange_calendar_binance.py. Fire up your favorite text editor and change a few things.

from datetime import time

from trading_calendars import TradingCalendar


class BinanceExchangeCalendar(TradingCalendar):
    """A TradingCalendar for an exchange that's open every minute of every day.
    """
    name = 'Binance'
    tz = 'UTC'
    weekmask = '1111111'
    open_times = (
        (None, time(0)),
    )
    close_times = (
        (None, time(23, 59)),
    )

In the same directory, we need to edit calendar_utils.py as well.

...
from .exchange_calendar_binance import BinanceExchangeCalendar  # Add this line
...
_default_calendar_factories = {
    # Exchange calendars.
    'BVMF': BVMFExchangeCalendar,
    'CMES': CMESExchangeCalendar,
...
    # Miscellaneous calendars.
    'BINANCE': BinanceExchangeCalendar,  # Add this line
    'us_futures': QuantopianUSFuturesCalendar,
    '24/7': AlwaysOpenCalendar,
    '24/5': WeekdayCalendar,
}
...
_default_calendar_aliases = {
    'NYSE': 'XNYS',
    'NASDAQ': 'XNYS',
...
    'NYFE': 'IEPA',
    'CFE': 'XCBF',
    'Binance': 'BINANCE',  # Add this line
}

Put-it-all-together

Let us put all together for binance_csv.py. The source code can be found on my github as well.

import bs4 as bs
from binance.client import Client
import csv
from datetime import datetime as dt
import numpy as np
from os import listdir, mkdir, remove
from os.path import exists, isfile, join
from pathlib import Path
import pandas as pd
import pickle
import requests
from trading_calendars import register_calendar
from trading_calendars.exchange_calendar_binance import BinanceExchangeCalendar

# Set up the directories where we are going to save those csv files
user_home = str(Path.home())
csv_data_path = join(user_home, '.zipline/custom_data/csv')
custom_data_path = join(user_home, '.zipline/custom_data')

def tickers():
    """
    Save Binance trading pair tickers to a pickle file
    Return a pickle
    """
    cmc_binance_url = 'https://coinmarketcap.com/exchanges/binance/'
    response = requests.get(cmc_binance_url)
    if response.ok:
        soup = bs.BeautifulSoup(response.text, 'html.parser')
        table = soup.find('table', {'id': 'exchange-markets'})
        ticker_pairs = []

        for row in table.findAll('tr')[1:]:
            ticker_pair = row.findAll('td')[2].text
            ticker_pairs.append(ticker_pair.strip().replace('/', ''))

    if not exists(custom_data_path):
        mkdir(custom_data_path)

    with open(join(custom_data_path, 'binance_ticker_pairs.pickle'), 'wb') as f:
        pickle.dump(ticker_pairs, f)

    return ticker_pairs

def save_csv(reload_tickers=False, interval='1m'):
    """
    Save Zipline bundle ready csv for Binance trading ticker pair
    :param reload_tickers: True or False
    :type reload_tickers: boolean
    :param interval: Default 1m. Other available ones: 1m, 3m, 5m, 15m, 30m, 1h, 2h, 4h, 6h, 8h, 12h, 1d, 3d, 1w, 1M
    :type interval: str
    """

    if not exists(csv_data_path):
        mkdir(csv_data_path)

    if reload_tickers:
        ticker_pairs = tickers()
    else:
        ticker_pickle = join(
            custom_data_path, 'binance_ticker_pairs.pickle')
        with open(ticker_pickle, 'rb') as f:
            ticker_pairs = pickle.load(f)

    client = Client("", "")
    start = '2017-7-14'  # Binance launch date
    end = dt.utcnow().strftime('%Y-%m-%d')  # Current day
    csv_filenames = [csv_filename for csv_filename in listdir(
        csv_data_path) if isfile(join(csv_data_path, f))]

    for ticker_pair in ticker_pairs:
        filename = "Binance_{}_{}.csv".format(ticker_pair, interval)        

        if csv_filenames != [] and filename in csv_filenames:
            remove(join(csv_data_path, filename))

        output = join(csv_data_path, filename)
        klines = client.get_historical_klines_generator(
            ticker_pair, interval, start, end)
        for index, kline in enumerate(klines):
            with open(output, 'a+') as f:
                writer = csv.writer(f)
                if index == 0:
                    writer.writerow(
                        ['date', 'open', 'high', 'low', 'close', 'volume'])
                # Make a real copy of kline
                # Binance API forbids the change of open time
                line = kline[:]
                del line[6:]
                line[0] = np.datetime64(line[0], 'ms')
                line[0] = pd.Timestamp(line[0], 'ms')
                writer.writerow(line)

        print('{} saved.'.format(filename))

    return [file for file in listdir(csv_data_path) if isfile(join(csv_data_path, f))]

def csv_to_bundle(reload_tickers=True, reload_csv=True, interval='1m'):

    def ingest(environ,
               asset_db_writer,
               minute_bar_writer,
               daily_bar_writer,
               adjustment_writer,
               calendar,
               start_session,
               end_session,
               cache,
               show_progress,
               output_dir
               ):

        # Get all available csv filenames
        if reload_csv:
            csv_filenames = save_csv(
                reload_tickers=reload_tickers, interval=interval)
        else:
            csv_filenames = [file for file in listdir(
                csv_data_path) if isfile(join(csv_data_path, file))]

        # Loop through the filenames and create a dict to keep some temp meta data
        ticker_pairs = [{'exchange': pair.split('_')[0],
                         'symbol': pair.split('_')[1],
                         'interval':pair.split('_')[2].split('.')[0],
                         'file_path':join(csv_data_path, pair)}
                        for pair in csv_filenames]

        # Create an empty meta data dataframe
        metadata_dtype = [
            ('symbol', 'object'),
            ('asset_name', 'object'),
            ('start_date', 'datetime64[ns]'),
            ('end_date', 'datetime64[ns]'),
            ('first_traded', 'datetime64[ns]'),
            ('auto_close_date', 'datetime64[ns]'),
            ('exchange', 'object'), ]
        metadata = pd.DataFrame(
            np.empty(len(ticker_pairs), dtype=metadata_dtype))

        minute_data_sets = []
        daily_data_sets = []

        for sid, ticker_pair in enumerate(ticker_pairs):
            df = pd.read_csv(ticker_pair['file_path'],
                             index_col=['date'],
                             parse_dates=['date'])

            symbol = ticker_pair['symbol']
            asset_name = ticker_pair['symbol']
            start_date = df.index[0]
            end_date = df.index[-1]
            first_traded = start_date
            auto_close_date = end_date + pd.Timedelta(days=1)
            exchange = ticker_pair['exchange']

            # Update metadata
            metadata.iloc[sid] = symbol, asset_name, start_date, end_date, first_traded, auto_close_date, exchange

            if ticker_pair['interval'] == '1m':
                minute_data_sets.append((sid, df))

            if ticker_pair['interval'] == '1d':
                daily_data_sets.append((sid, df))

        if minute_data_sets != []:
            # Dealing with missing sessions in some data sets
            for daily_data_set in daily_data_sets:
                try:
                    minute_bar_writer.write(
                        [daily_data_set], show_progress=True)
                except Exception as e:
                    print(e)

        if daily_data_sets != []:
            # Dealing with missing sessions in some data sets
            for daily_data_set in daily_data_sets:
                try:
                    daily_bar_writer.write(
                        [daily_data_set], show_progress=True)
                except Exception as e:
                    print(e)
        
        asset_db_writer.write(equities=metadata)
        print(metadata)
        adjustment_writer.write()

    return ingest

We are almost there! However, we still need add a few more lines in $HOME/.zipline/extension.py and tell Zipline to ingest the data as the way we wanted.

from zipline.data.bundles import register
from zipline.data.bundles.binance_csv import csv_to_bundle

register(
    'binance_csv',
    csv_to_bundle(interval='1d'), # Daily ('1d') or Minute ('1m') Data
    calendar_name='Binance',
)

How to Run

This is the easy part. Activate the env and run the ingest command as usual.

conda activate zipline
(zipline) zipline ingest -b binance_csv

In part 2 of this tutorial, we are going to take a look how to create custom data bundles directly from Binance public API. Feel like stopping by and say hi? Let us talk more about crypto and quantitative trading over there. Here is the discord invite link. Cheers!