Python源码示例:PyQt5.QtNetwork.QTcpSocket()
示例1
def connect(self, remoteAddress, port):
"""Establishes a connection with the IDE"""
self.__socket = QTcpSocket()
if remoteAddress is None:
self.__socket.connectToHost(QHostAddress.LocalHost, port)
else:
self.__socket.connectToHost(remoteAddress, port)
if not self.__socket.waitForConnected(1000):
raise Exception('Cannot connect to the IDE')
self.__socket.setSocketOption(QAbstractSocket.KeepAliveOption, 1)
self.__socket.setSocketOption(QAbstractSocket.LowDelayOption, 1)
self.__socket.disconnected.connect(self.__onDisconnected)
示例2
def __init__(self, socket: QtNetwork.QTcpSocket = None):
"""
Initializes the extended TCP socket. Either wraps around an existing socket or creates its own and resets
the number of bytes written.
:param socket: An already existing socket or None if none is given.
"""
super().__init__()
# new QTcpSocket() if none is given
if socket is not None:
self.socket = socket
else:
self.socket = QtNetwork.QTcpSocket()
# some wiring, new data is handled by _receive()
self.socket.readyRead.connect(self._receive)
self.socket.error.connect(self.error)
self.socket.connected.connect(self.connected)
self.socket.disconnected.connect(self.disconnected)
self.socket.bytesWritten.connect(self.count_bytes_written)
self.bytes_written = 0
示例3
def connect(self, remoteAddress, port):
"""Establishes a connection with the IDE"""
self.__socket = QTcpSocket()
if remoteAddress is None:
self.__socket.connectToHost(QHostAddress.LocalHost, port)
else:
self.__socket.connectToHost(remoteAddress, port)
if not self.__socket.waitForConnected(1000):
raise Exception('Cannot connect to the IDE')
self.__socket.setSocketOption(QAbstractSocket.KeepAliveOption, 1)
self.__socket.setSocketOption(QAbstractSocket.LowDelayOption, 1)
self.__socket.disconnected.connect(self.__onDisconnected)
示例4
def connect(self, remoteAddress, port):
"""Establishes a session with the debugger"""
self.socket = QTcpSocket()
if remoteAddress is None:
self.socket.connectToHost(QHostAddress.LocalHost, port)
else:
self.socket.connectToHost(remoteAddress, port)
if not self.socket.waitForConnected(1000):
raise Exception('Cannot connect to the IDE')
self.socket.setSocketOption(QAbstractSocket.KeepAliveOption, 1)
self.socket.setSocketOption(QAbstractSocket.LowDelayOption, 1)
self.socket.disconnected.connect(self.__onDisconnected)
示例5
def __init__(self, socket: QtNetwork.QTcpSocket = None):
"""
We start with an empty channels list.
:param socket: A socket if there is one existing already.
"""
super().__init__(socket)
self.received.connect(self._process)
self.channels = {}
示例6
def _new_client(self, socket: QtNetwork.QTcpSocket):
"""
A new connection (QTCPPSocket) to the server occurred. Give it an id and add some general receivers to the new
server client (wrap the socket into a NetworkClient). Add the new server client to the internal client list.
Not intended for outside use.
:param socket: The socket for the new connection
"""
# wrap into a NetworkClient
client = ServerNetworkClient(socket)
# give it a new id
while True:
# theoretically this could take forever, practically only if we have 1e6 clients already
new_id = random.randint(0, 1e6)
if not any([new_id == client.client_id for client in self.server_clients]):
# not any == none
break
# noinspection PyUnboundLocalVariable
client.client_id = new_id
logger.info('new client with id {}'.format(new_id))
# add some general channels and receivers
# TODO the receivers should be in another module eventually
client.connect_to_channel(constants.C.LOBBY, self._lobby_messages)
client.connect_to_channel(constants.C.GENERAL, general_messages)
# TODO only if localhost connection add the system channel
client.connect_to_channel(constants.C.SYSTEM, self._system_messages)
# chat message system, handled by a single central routine
client.connect_to_channel(constants.C.CHAT, self._chat_system)
# finally add to list of clients
self.server_clients.append(client)
示例7
def _new_connection(self):
"""
Called by the newConnection signal of the QTCPServer. Not intended for outside use.
Zero or more new clients might be available, emit new_client signal for each of them.
"""
logger.info('new connection on server')
while self.tcp_server.hasPendingConnections():
# returns a new QTcpSocket
socket = self.tcp_server.nextPendingConnection()
# emit signal
self.new_client.emit(socket)
示例8
def connect(self):
self.socket = qtnetwork.QTcpSocket()
# set TCP_NODELAY (disable Nagle's algorithm)
self.socket.setSocketOption(
qtnetwork.QAbstractSocket.LowDelayOption, True)
self.socket.connectToHost(self.host, self.port)
示例9
def __init__(self, username, recipient):
super().__init__()
self.username = username
self.recipient = recipient
self.listener = qtn.QTcpServer()
self.listener.listen(qtn.QHostAddress.Any, self.port)
self.listener.newConnection.connect(self.on_connection)
self.listener.acceptError.connect(self.on_error)
self.connections = []
self.client_socket = qtn.QTcpSocket()
self.client_socket.error.connect(self.on_error)
示例10
def doConnect(self):
"""连接服务器"""
self.buttonConnect.setEnabled(False)
self._timer.stop()
self._clearConn()
self.browserResult.append('正在连接服务器')
# 连接控制小车的服务器
self._connCar = QTcpSocket(self)
self._connCar.connected.connect(self.onConnected) # 绑定连接成功信号
self._connCar.disconnected.connect(self.onDisconnected) # 绑定连接丢失信号
self._connCar.readyRead.connect(self.onReadyRead) # 准备读取信号
self._connCar.error.connect(self.onError) # 连接错误信号
self._connCar.connectToHost(self.HOST, self.PORT)