当前位置: 首页 > 科技 > 区块链 > 区块链研究实验室|了解如何将加密货币实时价格数据流加

区块链研究实验室|了解如何将加密货币实时价格数据流加

天乐
2020-11-24 23:42:43 第一视角

目前有许多不同的交易平台和交易所将提供对其数据的API访问,从而聪明的交易者可以根据自己的策略和交易需求构建自己的交易工具。

Excel是许多交易者的首选,因为它提供了巨大的潜力来创建自定义工具和仪表板,以建立市场洞察力,测试和试验数据和想法,以及监视投资组合的绩效并跟踪头寸。

技术型交易者也会希望使用Python进行数据分析和交易策略回溯测试,甚至构建系统化或自动化的交易策略。

大多数加密交换都提供了一种通过API以编程方式从其平台获取数据的方法。这是我们用来将基于Excel或Python的工具连接到平台以获取数据和管理订单。

在本文中,我们将使用BitMEX Python API将实时价格流式传输到Excel中。

要将BitMEX的实时价格传输到Excel中,您将需要以下内容:

首先,我们将编写Python代码,以获取Python中的实时价格并测试它。一旦成功,我们将使用PyXLL从Excel调用Python代码。

要通过websockets与BitMEX API对话,我们需要Python“websockets”包。打开命令提示符,使用Python包管理器“pip”通过运行“pip install websockets”来安装websockets包。

以下Python代码创建了一个与BitMEX API的websocket连接,订阅了“ XBTUSD”工具的更新,然后等待消息返回并在它们到达时进行打印。有了这段代码,我们现在就可以使用Python从BitMEX传输实时价格了!

import websockets

import asyncio

import json

async def main():

uri = "wss://www.bitmex.com/realtime"

async with websockets.connect(uri) as websocket:

# Send a message to subscribe to XBTUSD updates

msg = {

"op": "subscribe",

"args": ["instrument:XBTUSD"]

}

await websocket.send(json.dumps(msg))

# Receive messages in a loop and print them out

while True:

result = await websocket.recv()

data = json.loads(result)

print(data)

if __name__ == "__main__":

# Run the 'main' function in an asyncio event loop

loop = asyncio.get_event_loop()

loop.create_task(main())

loop.run_forever()

您可以通过将代码保存到名为“ bitmex.py”的文件中,然后在命令提示符下运行“ python bitmex.py”来运行此代码。对于开发Python代码,您会发现使用IDLE(在安装Python时是标准配置)或PyCharm等Python IDE会更容易。

上面的代码显示了如何使用websockets包连接到BitMex API,以接收Python中的价格更新。在我们在Excel中使用此功能之前,我们将对其进行一些细化,以便我们可以“subscribe”各个更新事件。

下面我们用“subscribe”方法定义一个类“BitMex”。使用这个我们可以订阅个别更新。BitMex类将处理从websockets API接收到的消息路由到已订阅的每个符号和字段的相关回调。

import websockets

import asyncio

import json

class BitMex:

"""Class to manage subscriptions to BitMEX prices."""

URI = "wss://www.bitmex.com/realtime"

def __init__(self, loop=None):

self.__websocket = None

self.__running = False

self.__running_task = None

self.__subscriptions = {}

self.__data = {}

self.__lock = asyncio.Lock(loop=loop)

async def __connect(self):

# Connect to the websocket API and start the __run coroutine

self.__running = True

self.__websocket = await websockets.connect(self.URI)

self.__running_task = asyncio.create_task(self.__run())

async def __disconnect(self):

# Close the websocket and wait for __run to complete

self.__running = False

await self.__websocket.close()

self.__websocket = None

await self.__running_task

async def __run(self):

# Read from the websocket until disconnected

while self.__running:

msg = await self.__websocket.recv()

await self.__process_message(json.loads(msg))

async def __process_message(self, msg):

if msg.get("table", None) == "instrument":

# Extract the data from the message, update our data dictionary and notify subscribers

for data in msg.get("data", []):

symbol = data["symbol"]

timestamp = data["symbol"]

# Update the latest values in our data dictionary and notify any subscribers

tasks = []

subscribers = self.__subscriptions.get(symbol, {})

latest = self.__data.setdefault(symbol, {})

for field, value in data.items():

latest[field] = (value, timestamp)

# Notify the subscribers with the updated field

for subscriber in subscribers.get(field, []):

tasks.append(subscriber(symbol, field, value, timestamp))

# await all the tasks from the subscribers

if tasks:

await asyncio.wait(tasks)

async def subscribe(self, symbol, field, callback):

"""Subscribe to updates for a specific symbol and field.

The callback will be called as 'await callback(symbol, field, value, timestamp)'

whenever an update is received.

"""

async with self.__lock:

# Connect the websocket if necessary

if self.__websocket is None:

await self.__connect()

# Send the subscribe message if we're not already subscribed

if symbol not in self.__subscriptions:

msg = {"op": "subscribe", "args": [f"instrument:"]}

await self.__websocket.send(json.dumps(msg))

# Add the subscriber to the dict of subscriptions

self.__subscriptions.setdefault(symbol, {}).setdefault(field, []).append(callback)

# Call the callback with the latest data

data = self.__data.get(symbol, {})

if field in data:

(value, timestamp) = data[field]

await callback(symbol, field, value, timestamp)

async def unsubscribe(self, symbol, field, callback):

async with self.__lock:

# Remove the subscriber from the list of subscriptions

self.__subscriptions[symbol][field].remove(callback)

if not self.__subscriptions[symbol][field]:

del self.__subscriptions[symbol][field]

# Unsubscribe if we no longer have any subscriptions for this instrument

if not self.__subscriptions[symbol]:

msg = {"op": "unsubscribe", "args": [f"instrument:"]}

await self.__websocket.send(json.dumps(msg))

del self.__subscriptions[symbol]

self.__data.pop(symbol, None)

# Disconnect if we no longer have any subscriptions

if not self.__subscriptions:

async with self.__lock:

await self.__disconnect()

async def main():

# This is the callback that will be called whenever there's an update

async def callback(symbol, field, value, timestamp):

print((symbol, field, value, timestamp))

bm = BitMex()

await bm.subscribe("XBTUSD", "lastPrice", callback)

await asyncio.sleep(60)

await bm.unsubscribe("XBTUSD", "lastPrice", callback)

print("DONE!")

if __name__ == "__main__":

# Run the 'main' function in an asyncio event loop

loop = asyncio.get_event_loop()

loop.create_task(main())

loop.run_forever()

在上面的“ main”函数中,我们创建“ BitMex”类的实例,并订阅“ XBTUSD”工具上“ lastPrice”字段的更新。每当发生新的“ lastPrice”事件时,都会调用回调并打印新价格。

现在,我们有了这个BitMex类,并且可以订阅特定的字段和符号,接下来我们将使用PyXLL将其公开给Excel。这就是使我们能够将BitMEX实时流数据传输到Excel的原因!

要使用上面在Python中编写的代码,我们需要安装PyXLL Excel加载项。您可以找到此的安装说明,并从https://www.pyxll.com下载加载项。

安装PyXLL后,您可以通过编辑pyxll.cfg配置文件向其中添加自己的Python模块。我们将编写一个新的Python模块,该模块使用PyXLL的实时数据功能。

我们通过创建一个从“ pyxll.RTD”类派生的新类,使用PyXLL编写RTD函数,然后编写Python函数以返回该类。使用PyXLL的“ @xl_func”装饰器装饰Python函数,以将其公开为Excel函数。RTD类具有两种方法,“连接”和“断开连接”,我们根据需要使用这些方法来订阅和退订BitMex数据。当我们收到新值时,我们将更新RTD对象的“ value”属性,这我们在Excel中看到的值发生更新。

from pyxll import xl_func, get_event_loop, RTD

from bitmex import BitMex

class BitMexRTD(RTD):

# Use a single BitMex object for all RTD functions

_bitmex = BitMex(loop=get_event_loop())

def __init__(self, symbol, field):

super().__init__(value="Waiting...")

self.__symbol = symbol

self.__field = field

async def connect(self):

# Subscribe to BitMix updates when Excel connects to the RTD object

await self._bitmex.subscribe(self.__symbol, self.__field, self.__update)

async def disconnect(self):

# Unsubscribe to BitMix updates when Excel disconnects from the RTD object

await self._bitmex.unsubscribe(self.__symbol, self.__field, self.__update)

async def __update(self, symbol, field, value, timestamp):

# Update the value in Excel

self.value = value

@xl_func("string symbol, string field: rtd")

def bitmex_rtd(symbol, field):

return BitMexRTD(symbol, field)

通过将新的Python模块添加到pyxll.cfg配置文件的“modules”列表中,将其添加到我们的PyXLL加载项中。包含此模块的文件夹以及我们之前编写的bitmex.py模块,也需要位于“ pythonpath”上,以便Python可以导入它们。pythonpath也可以在pyxll.cfg配置文件中设置。

当我们启动Excel或重新加载PyXLL插件时,这个新模块将被加载,“bitmex_rtd”函数将可供我们从Excel调用。如果有任何问题,请检查PyXLL日志文件,以检查出了什么问题。

现在在Excel中,我们可以以与其他Excel工作表函数完全相同的方式调用新的“ bitmex_rtd”函数。由于它是实时数据功能,因此每次通过BitMEX websockets API收到对请求字段的更新时,该值都会继续更新。

= bitmex_rtd(“ XBTUSD”,“ lastPrice”)

许多加密平台和交易所都以与BitMEX类似的方式提供API访问。我们可以使用上面显示的相同技术在Python中访问这些价格。PyXLL在用于调用这些API的Python代码和Excel之间提供了一个优雅的桥梁。

我们不仅限于获取上面所示的数据。大多数平台API也允许我们通过他们的API下单。我们可以使用它来构建和集成用Python编写的定制交易应用程序,但是使用Excel作为这些工具的接口。

python和Excel的结合为我们提供了一种现代而强大的编程语言的优势,并使Excel成为一个灵活、直观的前端供我们使用。

提示:支持键盘“← →”键翻页
为你推荐
加载更多
意见反馈
返回顶部