How to Create Custom Zipline Bundles From Binance Data Part 2
In part 1, we have covered how to create custom data bundles from Binance csv files. Today, let us create another module which will allow us to fetch Binance API trading data and create Zipline bundles instantly. Since there are some similarities between part 1 and part 2 and some setup details will be omitted accordingly in this tutorial, I strongly suggest you do a quick review of part 1 before jumping onto this one.
Fire up your favorite text editor and create a file named binance_api.py
in the directory of our env. $HOME/anaconda3/envs/zipline/lib/python3.5/site-packages/zipline/data/bundles
.
As usual, we need to import some essentials.
import bs4 as bs
from binance.client import Client
import csv
from datetime import datetime as dt
from datetime import timedelta
import numpy as np
from os import listdir, mkdir, remove
from os.path import exists, isfile, join
from pathlib import Path
import pandas as pd
import pickle
import requests
from trading_calendars import register_calendar
from trading_calendars.exchange_calendar_binance import BinanceExchangeCalendar
If you have followed my tutorials all along, you should have all packages installed already. BinanceExchangeCalendar is a custom trading calendar created in my tutorial part 1. Here is our Zipline installation on Debian 9 Stretch. Once you have all the pre-requisites in place, you can move onto the next step.
Get Binance Trading Pair Tickers
Now, let us set up some variables.
user_home = str(Path.home())
custom_data_path = join(user_home, '.zipline/custom_data')
Create one function to collection all Binance trading ticker pairs and another as a ticker pair generator.
def tickers():
"""
Save Binance trading pair tickers to a pickle file
Return a list of trading ticker pairs
"""
cmc_binance_url = 'https://coinmarketcap.com/exchanges/binance/'
response = requests.get(cmc_binance_url)
if response.ok:
soup = bs.BeautifulSoup(response.text, 'html.parser')
table = soup.find('table', {'id': 'exchange-markets'})
ticker_pairs = []
for row in table.findAll('tr')[1:]:
ticker_pair = row.findAll('td')[2].text
ticker_pairs.append(ticker_pair.strip().replace('/', ''))
if not exists(custom_data_path):
mkdir(custom_data_path)
with open(join(custom_data_path, 'binance_ticker_pairs.pickle'), 'wb') as f:
pickle.dump(ticker_pairs, f)
return ticker_pairs
def tickers_generator():
"""
Return a tuple (sid, ticker_pair)
"""
tickers_file = join(custom_data_path, 'binance_ticker_pairs.pickle')
if not isfile(tickers_file):
ticker_pairs = tickers()
else:
with open(tickers_file, 'rb') as f:
ticker_pairs = pickle.load(f)[:]
return (tuple((sid, ticker)) for sid, ticker in enumerate(ticker_pairs))
Dataframes for Metadata & Ticker Data
Next, we are going to create two functions which can process the dataframe for ticker pair meta data and trading data, respectively.
def metadata_df():
"""
Return meta data dataframe
"""
metadata_dtype = [
('symbol', 'object'),
('asset_name', 'object'),
('start_date', 'datetime64[ns]'),
('end_date', 'datetime64[ns]'),
('first_traded', 'datetime64[ns]'),
('auto_close_date', 'datetime64[ns]'),
('exchange', 'object'), ]
metadata_df = pd.DataFrame(
np.empty(len(tickers()), dtype=metadata_dtype))
return metadata_df
def df_generator(interval):
"""
Return a tuple of sid and trading dataframe, which later can used by daily_bar_writer or minute_bar_writer
"""
client = Client("", "")
start = '2017-7-14' # Binance launch date
end = dt.utcnow().strftime('%Y-%m-%d') # Current day
for item in tickers_generator():
sid = item[0]
ticker_pair = item[1]
df = pd.DataFrame(
columns=['date', 'open', 'high', 'low', 'close', 'volume'])
symbol = ticker_pair
asset_name = ticker_pair
exchange = 'Binance'
klines = client.get_historical_klines_generator(
ticker_pair, interval, start, end)
for kline in klines:
line = kline[:]
del line[6:]
# Make a real copy of kline
# Binance API forbids the change of open time
line[0] = np.datetime64(line[0], 'ms')
line[0] = pd.Timestamp(line[0], 'ms')
df.loc[len(df)] = line
df['date'] = pd.to_datetime(df['date'])
df.set_index('date', inplace=True)
df = df.astype({'open': 'float64', 'high': 'float64',
'low': 'float64', 'close': 'float64', 'volume': 'float64'})
start_date = df.index[0]
end_date = df.index[-1]
first_traded = start_date
auto_close_date = end_date + pd.Timedelta(days=1)
# Check if there is no missing session; skip the ticker pair otherwise
if interval == '1d' and len(df.index) - 1 != pd.Timedelta(end_date - start_date).days:
continue
elif interval == '1m' and timedelta(minutes=(len(df.index) + 60)) != end_date - start_date:
continue
yield (sid, df), symbol, asset_name, start_date, end_date, first_traded, auto_close_date, exchange
Create Custom Ingest Function
Great! It is time for our custom ingest function.
def api_to_bundle(interval='1m'):
def ingest(environ,
asset_db_writer,
minute_bar_writer,
daily_bar_writer,
adjustment_writer,
calendar,
start_session,
end_session,
cache,
show_progress,
output_dir
):
metadata = metadata_df()
def minute_data_generator():
return (sid_df for (sid_df, *metadata.iloc[sid_df[0]]) in df_generator(interval='1m'))
def daily_data_generator():
return (sid_df for (sid_df, *metadata.iloc[sid_df[0]]) in df_generator(interval='1d'))
if interval == '1d':
daily_bar_writer.write(
daily_data_generator(), show_progress=True)
elif interval == '1m':
minute_bar_writer.write(
minute_data_generator(), show_progress=True)
# Drop the ticker rows which have missing sessions in their data sets
metadata.dropna(inplace=True)
asset_db_writer.write(equities=metadata)
print(metadata)
adjustment_writer.write()
return ingest
Update Extension.py
Last but not the least, we need to update $HOME/.zipline/extension.py
from zipline.data.bundles import register
from zipline.data.bundles.binance_api import api_to_bundle
register(
'binance_api',
api_to_bundle(interval='1d'),
calendar_name='Binance',
)
Put-it-all-together
import bs4 as bs
from binance.client import Client
import csv
from datetime import datetime as dt
from datetime import timedelta
import numpy as np
from os import listdir, mkdir, remove
from os.path import exists, isfile, join
from pathlib import Path
import pandas as pd
import pickle
import requests
from trading_calendars import register_calendar
from trading_calendars.exchange_calendar_binance import BinanceExchangeCalendar
user_home = str(Path.home())
custom_data_path = join(user_home, '.zipline/custom_data')
def tickers():
"""
Save Binance trading pair tickers to a pickle file
Return a list of trading ticker pairs
"""
cmc_binance_url = 'https://coinmarketcap.com/exchanges/binance/'
response = requests.get(cmc_binance_url)
if response.ok:
soup = bs.BeautifulSoup(response.text, 'html.parser')
table = soup.find('table', {'id': 'exchange-markets'})
ticker_pairs = []
for row in table.findAll('tr')[1:]:
ticker_pair = row.findAll('td')[2].text
ticker_pairs.append(ticker_pair.strip().replace('/', ''))
if not exists(custom_data_path):
mkdir(custom_data_path)
with open(join(custom_data_path, 'binance_ticker_pairs.pickle'), 'wb') as f:
pickle.dump(ticker_pairs, f)
return ticker_pairs
def tickers_generator():
"""
Return a tuple (sid, ticker_pair)
"""
tickers_file = join(custom_data_path, 'binance_ticker_pairs.pickle')
if not isfile(tickers_file):
ticker_pairs = tickers()
else:
with open(tickers_file, 'rb') as f:
ticker_pairs = pickle.load(f)[:]
return (tuple((sid, ticker)) for sid, ticker in enumerate(ticker_pairs))
def metadata_df():
"""
Return meta data dataframe
"""
metadata_dtype = [
('symbol', 'object'),
('asset_name', 'object'),
('start_date', 'datetime64[ns]'),
('end_date', 'datetime64[ns]'),
('first_traded', 'datetime64[ns]'),
('auto_close_date', 'datetime64[ns]'),
('exchange', 'object'), ]
metadata_df = pd.DataFrame(
np.empty(len(tickers()), dtype=metadata_dtype))
return metadata_df
def df_generator(interval):
"""
Return a tuple of sid and trading dataframe, which later can used by daily_bar_writer or minute_bar_writer
"""
client = Client("", "")
start = '2017-7-14' # Binance launch date
end = dt.utcnow().strftime('%Y-%m-%d') # Current day
for item in tickers_generator():
sid = item[0]
ticker_pair = item[1]
df = pd.DataFrame(
columns=['date', 'open', 'high', 'low', 'close', 'volume'])
symbol = ticker_pair
asset_name = ticker_pair
exchange = 'Binance'
klines = client.get_historical_klines_generator(
ticker_pair, interval, start, end)
for kline in klines:
line = kline[:]
del line[6:]
# Make a real copy of kline
# Binance API forbids the change of open time
line[0] = np.datetime64(line[0], 'ms')
line[0] = pd.Timestamp(line[0], 'ms')
df.loc[len(df)] = line
df['date'] = pd.to_datetime(df['date'])
df.set_index('date', inplace=True)
df = df.astype({'open': 'float64', 'high': 'float64',
'low': 'float64', 'close': 'float64', 'volume': 'float64'})
start_date = df.index[0]
end_date = df.index[-1]
first_traded = start_date
auto_close_date = end_date + pd.Timedelta(days=1)
# Check if there is no missing session; skip the ticker pair otherwise
if interval == '1d' and len(df.index) - 1 != pd.Timedelta(end_date - start_date).days:
continue
elif interval == '1m' and timedelta(minutes=(len(df.index) + 60)) != end_date - start_date:
continue
yield (sid, df), symbol, asset_name, start_date, end_date, first_traded, auto_close_date, exchange
def api_to_bundle(interval='1m'):
def ingest(environ,
asset_db_writer,
minute_bar_writer,
daily_bar_writer,
adjustment_writer,
calendar,
start_session,
end_session,
cache,
show_progress,
output_dir
):
metadata = metadata_df()
def minute_data_generator():
return (sid_df for (sid_df, *metadata.iloc[sid_df[0]]) in df_generator(interval='1m'))
def daily_data_generator():
return (sid_df for (sid_df, *metadata.iloc[sid_df[0]]) in df_generator(interval='1d'))
if interval == '1d':
daily_bar_writer.write(
daily_data_generator(), show_progress=True)
elif interval == '1m':
minute_bar_writer.write(
minute_data_generator(), show_progress=True)
# Drop the ticker rows which have missing sessions in their data sets
metadata.dropna(inplace=True)
asset_db_writer.write(equities=metadata)
print(metadata)
adjustment_writer.write()
return ingest
Here is a sample output.
Again, you can also find the source code on my github as always. I hope you have enjoyed my tutorials so far. Shoot me a message if you have questions and comments. Feel like stopping by and say hi? Let us talk more about crypto and quantitative trading over there. Here is the discord invite link. Ciao!