作为加密货币交易所的前首席技术官,我经常与一些主要交易所的API交往。在本文中,我将指导您完成创建可靠的Python脚本以从Binance提取历史交易数据的过程。
理论基础
在回测交易策略时,即使用过去的数据执行我们的策略并分析收益和其他重要因素时,我们必须确保我们拥有合适的数据类型。鉴于某些策略需要水平的书本数据,而其他策略可能只需要花费一个小时的时间,该过程并不总是那么简单,而基础架构,可用性和连接性等元素可能会因数据类型的不同而大不相同。
所需数据主要由交易策略的频率定义。交易策略类别是我将在Algotrading系列文章中讨论的其他主题的主题,但是您可以在Investopedia中找到一些可靠的信息。
好的,但是为什么本文仅涉及获取“交易”数据,为什么我们要使用Binance API?
数据频率和Binance
我想说交易数据端点主要在99.99%的交易所中提供。它非常细致,提供了足够的详细信息(在某些非常特殊的情况下)用于回测高频交易(HFT)策略,并且可以用作OHLC蜡烛的构建基块(1S至24H,或者更多,如果您需要的话)。
交易数据是通用的,并且允许使用不同频率的策略进行大量实验。
为什么选择币安?那只是因为它是我因交易量而倾向于回溯的交易所之一。我绝不隶属于Binance。我们将在我的Algotrading系列中查看其他交流。
我们将要编码什么?
我们将创建一个Python脚本,该脚本接收对符号,开始日期和结束日期作为命令行参数。它将包含所有交易的CSV文件输出到磁盘。该过程可以通过以下步骤进行详细说明:
1. 分析 symbol, starting_date, 和ending_date参数
2. 获取在开始日期发生的第一笔交易,以获取第一笔trade_id。
3. 循环获取每个请求1,000笔交易(Binance API限制),直到达到ending_date。
4. 最后将数据保存到磁盘。对于示例,我们将其保存为CSV,但是您还有其他选择。
我们将使用pandas,requests,time,sys和datetime库。在代码段中,将不会显示错误验证,因为它不会为说明添加任何值。当然,您可以在GitHub上查看完整代码。https://github.com/tgcandido/binance-data-fetcher
编码时间
解析参数:
symbol:交易对的符号,由Binance定义。可以在这里查询,也可以从Binance Web应用程序的URL复制,但不包括_ character.
starting_date 和ending_date:预期的格式为mm / dd / yyyy,或者在Python lang语中为%m /%d /%Y。
要获取参数,我们将使用内置的sys(这里没有什么花哨的东西),并且为了解析日期,我们将使用datetime库。
symbol = sys.argv[1]
starting_date = datetime.strptime(sys.argv[2], '%m/%d/%Y')
ending_date = datetime.strptime(sys.argv[3], '%m/%d/%Y') + timedelta(days=1) - timedelta(microseconds=1)
我们将增加一天并减去一微秒,以便ending_date时间部分始终为23:59:59.999,这使得获取同一天的间隔更加实用。
提取交易
使用Binance的API并使用aggTrades端点,一次请求最多可以获取1,000条交易,如果使用开始和结束参数,则它们之间的间隔最多为一小时。在发生一些失败之后,通过使用时间间隔进行获取(在某个时间点或另一个时间点,流动性将变得疯狂,我将失去一些宝贵的交易),我决定尝试from_id策略。
选择aggTrades端点是因为它返回压缩的交易。这样,我们就不会丢失任何宝贵的信息。
进行压缩、聚合交易。在同一时间、同一订单、同一价格进行交易的数量将累计。
from_id策略是这样的:通过将日期间隔发送到端点,我们将获得starting_date的第一笔交易。之后我们将从第一个获取的交易ID开始获取1,000个交易。然后我们将检查最后一笔交易是否在我们的end_date之后发生。如果是这样,我们已经遍历了所有时间段,可以将结果保存到文件中。否则,我们将更新from_id变量以获取最后的交易ID,然后重新开始循环。
取得第一个交易编号
new_ending_date = from_date + timedelta(seconds=60)
r = requests.get('https://api.binance.com/api/v3/aggTrades',
params = {
"symbol" : symbol,
"startTime": get_unix_ms_from_date(from_date),
"endTime": get_unix_ms_from_date(new_ending_date)
})
response = r.json()
if len(response) > 0:
return response[]['a']
else:
raise Exception('no trades found')
首先我们创建一个new_end_date。那是因为我们通过传递startTime和endTime参数来使用aggTrades。现在我们只需要知道该期间的第一个交易ID,因此我们将增加60秒。在流动性较低的货币对中,可以更改此参数,因为不能保证在请求的第一天发生交易。
然后使用我们的helper函数解析日期,并通过使用calendar.timegm函数将其转换为Unix毫秒表示形式。首选timegm函数,因为它将日期保留为UTC。
def get_unix_ms_from_date(date):
return int(calendar.timegm(date.timetuple()) * 1000 + date.microsecond/1000)
请求的响应是按日期排序的交易对象列表,格式如下:
因此,当我们需要第一个交易ID时,我们将返回response[0][“a”]值。
主循环
现在我们有了第一个交易ID,我们可以一次获取1,000笔交易,直到到达ending_date。以下代码将在我们的主循环中调用。它将使用from_id参数执行我们的请求,放弃startDate和endDate参数。
现在这是我们的主循环,它将执行请求并创建我们的DataFrame。
我们检查包含最近提取的交易日期的current_time是否大于我们的to_date,如果是,我们:
使用from_id参数获取交易
使用从最新交易中获取的信息来更新from_id和current_time参数
打印一个nice的调试消息
pd.concat与我们DataFrame中的先前交易取得的交易
leep,使Binance不会给我们一个难看的429 HTTP响应
清除与保存
组装完DataFrame之后,我们需要执行简单的数据清理。我们将删除重复项并trim在to_date之后发生的交易(我们遇到了这个问题,因为我们要获取1000笔交易中的大部分,因此预计我们会在目标结束日期之后执行一些交易)。
我们可以封装trim函数:
def trim(df, to_date):
return df[df['T']
并执行数据清理:
df.drop_duplicates(subset='a', inplace=True)
df = trim(df, to_date)
现在我们可以使用to_csv方法将其保存到文件中:
filename = f'binance____trades__from____to__.csv'
df.to_csv(filename)
我们还可以使用其他数据存储机制,例如Arctic。
奖励:验证你的数据
在处理交易策略时,我们要相信我们的数据,这一点很重要。我们可以很容易地用提取到的交易数据进行验证:
df = pd.read_csv(file_name)
values = df.to_numpy()
last_id = values[][1]
for row in values[1:]:
trade_id = row[1]
if last_id + 1 != trade_id:
print('last_id', last_id)
print('trade_id', trade_id)
print('inconsistent data')
exit()
last_id = trade_id
print('data is OK!')
在代码片段中,我们将DataFrame转换为NumPy数组,并逐行迭代,检查交易ID是否每行递增1。
Binance交易ID是递增编号的,并且是为每个符号创建的,所以很容易验证数据是否正确。
创建成功的交易策略的第一步是拥有正确的数据。
---------------------------------------------
原文作者:Thiago Candido
译者:链三丰
译文出处:http://bitoken.world
---------------------------------------------