我的第一个类允许我连接到websocket,第二个类允许我尝试有一个图来显示数据。
在代码中,我只是尝试在weboscket运行时使用pyqtgraph显示一个图,但是窗口完全有问题。
谢谢
import asyncio
import json
import websockets
from asyncqt import asyncSlot, QtCore
import pyqtgraph as pg
class Ws(QtCore.QObject):
dataChanged = QtCore.pyqtSignal(dict)
def __init__(self, parent=None):
super().__init__(parent)
self._websocket = None
@property
def websocket(self):
return self._websocket
async def connect(self, server,):
self._websocket = await websockets.connect(server)
await self.on_message()
@asyncSlot(dict)
async def on_message(self):
while True:
message = await self.websocket.recv()
message = json.loads(message)
self.dataChanged.emit(message)
@asyncSlot(dict)
async def send_message(self, message):
while self.websocket is None:
await asyncio.sleep(0.2)
data = json.dumps(message)
await self.websocket.send(data)
def run(self):
loop = asyncio.get_event_loop()
loop.create_task(self.connect(server="wss://www.bitmex.com/realtime"))
loop.run_forever()
class TestUi(object):
def __init__(self):
self.plt = pg.plot()
self.ws = Ws()
self.ws.send_message(
{"op": "subscribe", "args": ["instrument:XBTUSD"]})
self.ws.dataChanged.connect(self.update_data)
self.ws.run()
def update_data(self, data):
print(data)
if __name__ == "__main__":
u = TestUi()
您必须设置asyncqt的eventloop:
import asyncio
import json
import sys
import websockets
from PyQt5 import QtCore, QtWidgets
import pyqtgraph as pg
from asyncqt import QEventLoop, asyncSlot
class Ws(QtCore.QObject):
dataChanged = QtCore.pyqtSignal(dict)
def __init__(self, parent=None):
super().__init__(parent)
self._websocket = None
@property
def websocket(self):
return self._websocket
async def connect(
self, server,
):
self._websocket = await websockets.connect(server)
await self.on_message()
@asyncSlot(dict)
async def on_message(self):
while True:
message = await self.websocket.recv()
message = json.loads(message)
self.dataChanged.emit(message)
@asyncSlot(dict)
async def send_message(self, message):
while self.websocket is None:
await asyncio.sleep(0.2)
data = json.dumps(message)
await self.websocket.send(data)
def run(self):
loop = asyncio.get_event_loop()
loop.create_task(self.connect(server="wss://www.bitmex.com/realtime"))
loop.run_forever()
class TestUi(object):
def __init__(self):
app = QtWidgets.QApplication(sys.argv)
loop = QEventLoop(app)
asyncio.set_event_loop(loop)
self.plt = pg.plot()
self.ws = Ws()
self.ws.send_message({"op": "subscribe", "args": ["instrument:XBTUSD"]})
self.ws.dataChanged.connect(self.update_data)
self.ws.run()
def update_data(self, data):
print(data)
if __name__ == "__main__":
u = TestUi()