Python源码示例:machine.UART
示例1
def uart_readline(self, **kwargs) -> str:
"""
Read a line (any character until newline is found) from a UART interface.
:param kwargs: Parameters to pass to :meth:`platypush.plugins.esp.EspPlugin.uart_open` and
:meth:`platypush.plugins.esp.EspPlugin.execute`.
:return: String representation of the read bytes, or base64-encoded representation if the
data can't be decoded to a string.
"""
self.uart_open(**kwargs)
response = self.execute('uart.readline()', **kwargs).output
try:
return response.decode()
except UnicodeDecodeError:
return base64.encodebytes(response).decode()
示例2
def uart_write(self, data: str, binary: bool = False, **kwargs):
"""
Write data to the UART bus.
:param data: Data to be written.
:param binary: By default data will be treated as a string. Set binary to True if it should
instead be treated as a base64-encoded binary string to be decoded before being sent.
:param kwargs: Parameters to pass to :meth:`platypush.plugins.esp.EspPlugin.uart_open` and
:meth:`platypush.plugins.esp.EspPlugin.execute`.
"""
if binary:
data = base64.decodebytes(data.encode())
else:
data = data.encode()
data = 'b"' + ''.join(['\\x{:02x}'.format(b) for b in data]) + '"'
self.uart_open(**kwargs)
code = 'uart.write({data})'.format(data=data)
self.execute(code, **kwargs)
示例3
def __init__(self):
self.uart = machine.UART(2, baudrate=115200, tx=17, rx=-2)
示例4
def __init__(self):
self.uart = machine.UART(1, tx=17, rx=16)
self.uart.init(9600, bits=8, parity=None, stop=1)
self._timer = timEx.addTimer(200, timEx.PERIODIC, self._monitor)
self.data_save = b''
示例5
def __init__(self):
self.uart = machine.UART(1, tx=17, rx=16)
self.uart.init(9600, bits=8, parity=None, stop=1)
self.callback = None
if self._reset() == 1:
raise module.Module('Module lorawan not connect')
self._timer = None
self.uartString = ''
time.sleep(2)
示例6
def __init__(self):
self.uart = machine.UART(1, tx=17, rx=16)
self.uart.init(19600, bits=0, parity=None, stop=1)
self._timer = timEx.addTimer(100, timEx.PERIODIC, self._monitor)
self._times = 0
self.cb = None
self.unknownCb = None
self.access_add = 0
self.user_id_add = 0
self.state = ''
time.sleep_ms(100)
self.readUser()
示例7
def __init__(self, port):
self.uart = machine.UART(1, tx=port[0], rx=port[1])
self.uart.init(19600, bits=0, parity=None, stop=1)
self._timer = timEx.addTimer(100, timEx.PERIODIC, self._monitor)
self._times = 0
self.cb = None
self.unknownCb = None
self.access_add = 0
self.user_id_add = 0
self.state = ''
time.sleep_ms(100)
self.readUser()
示例8
def uart_close(self, **kwargs):
"""
Turn off the UART bus.
:param kwargs: Parameters to pass to :meth:`platypush.plugins.esp.EspPlugin.uart_open` and
:meth:`platypush.plugins.esp.EspPlugin.execute`.
"""
self.uart_open(**kwargs)
self.execute('uart.deinit()', **kwargs)
示例9
def uart_read(self, size: Optional[int] = None, **kwargs) -> str:
"""
Read from a UART interface.
:param size: Number of bytes to read (default: read all available characters).
:param kwargs: Parameters to pass to :meth:`platypush.plugins.esp.EspPlugin.uart_open` and
:meth:`platypush.plugins.esp.EspPlugin.execute`.
:return: String representation of the read bytes, or base64-encoded representation if the
data can't be decoded to a string.
"""
self.uart_open(**kwargs)
code = '''
args = []
if {size}:
args.append({size})
uart.read(*args)
'''.format(size=size)
response = self.execute(code, **kwargs).output
try:
return response.decode()
except UnicodeDecodeError:
return base64.encodebytes(response).decode()
示例10
def __init__(self, en, tx, rx, rst):
self.en = en
self.tx = tx
self.rx = rx
self.rst = rst
self.en.mode(Pin.OUT)
self.rst.mode(Pin.OUT)
self.uart = UART(1, baudrate=9600, pins=(self.tx, self.rx))
self.uart.deinit()
示例11
def read_gps_sample():
"""
Attempts to read GPS and print the latest GPS values.
"""
try:
# Attempt to read GPS data up to 3 times.
for i in range(3):
print("- Reading GPS data... ", end="")
# Configure the UART to the GPS required parameters.
u.init(9600, bits=8, parity=None, stop=1)
time.sleep(1)
# Ensures that there will only be a print if the UART
# receives information from the GPS module.
while not u.any():
if u.any():
break
# Read data from the GPS.
gps_data = str(u.read(), 'utf8')
# Close the UART.
u.deinit()
# Get latitude and longitude from the read GPS data.
lat = extract_latitude(extract_gps(gps_data))
lon = extract_longitude(extract_gps(gps_data))
# Print location.
if lon != 9999 and lat != 9999:
print("[OK]")
print("- Latitude: %s" % lat)
print("- Longitude: %s" % lon)
print(32 * "-")
break
else:
print("[ERROR]")
print(" * Bad GPS signal. Retrying...")
except Exception as E:
print("[ERROR]")
print(" * There was a problem getting GPS data: %s", str(E))
示例12
def init_gnss():
"""Initialize the GNSS receiver"""
log('Initializing GNSS...')
enable = Pin(GNSS_ENABLE_PIN, mode=Pin.OUT)
enable(False)
uart = UART(GNSS_UART_PORT)
uart.init(GNSS_UART_BAUD, bits=8, parity=None, stop=1)
enable(True)
log('Done!')
return (uart, enable)
示例13
def enable_serial(self):
""" """
# Disable these two lines if you don't want serial access.
# The Pycom forum tells us that this is already incorporated into
# more recent firmwares, so this is probably a thing of the past.
#uart = machine.UART(0, 115200)
#os.dupterm(uart)
pass
示例14
def start(self):
"""Start Terminal on UART0 interface."""
# Conditionally enable terminal on UART0. Default: False.
# https://forum.pycom.io/topic/1224/disable-console-to-uart0-to-use-uart0-for-other-purposes
uart0_enabled = self.settings.get('interfaces.uart0.terminal', False)
if uart0_enabled:
from machine import UART
self.uart = UART(0, 115200)
#self.uart = UART(0)
os.dupterm(self.uart)
else:
self.shutdown()
示例15
def uart_open(self, id=1, baudrate: Optional[int] = 9600, bits: Optional[int] = 8, parity: Optional[int] = None,
stop: int = 1, tx_pin: Optional[int] = None, rx_pin: Optional[int] = None,
timeout: Optional[float] = None, timeout_char: Optional[float] = None, **kwargs):
"""
Open a connection to a UART port.
:param id: Bus ID (default: 1).
:param baudrate: Port baudrate (default: 9600).
:param bits: Number of bits per character. It can be 7, 8 or 9.
:param parity: Parity configuration. It can be None (no parity), 0 (even) or 1 (odd).
:param stop: Number of stop bits. It can be 1 or 2.
:param tx_pin: Specify the TX PIN to use.
:param rx_pin: Specify the RX PIN to use.
:param timeout: Specify the time to wait for the first character in seconds.
:param timeout_char: Specify the time to wait between characters in seconds.
:param kwargs: Parameters to pass to :meth:`platypush.plugins.esp.EspPlugin.execute`.
"""
code = '''
args = {
'bits': {bits},
'parity': {parity},
'stop': {stop},
}
if {tx_pin}:
args['tx'] = {tx_pin}
if {rx_pin}:
args['rx'] = {rx_pin}
if {timeout}:
args['timeout'] = {timeout}
if {timeout_char}:
args['timeout_char'] = {timeout_char}
'''.format(bits=bits, parity=parity, stop=stop, tx_pin=tx_pin, rx_pin=rx_pin,
timeout=timeout, timeout_char=timeout_char)
self.execute(code, **kwargs)
code = '''
import machine
uart = machine.UART({id}, {baudrate}, **args)
'''.format(id=id, baudrate=baudrate)
self.execute(code, **kwargs)
示例16
def __init__(self, uart_number, uart_tx, uart_rx, set_pin=None, reset_pin=None,
interval_reading=0.1, active_mode=True, eco_mode=True,
interval_publish=None, mqtt_topic=None, friendly_name: list = None,
discover=True, expose_intervals=False, intervals_topic=None):
"""
:param uart_number: esp32 has multiple uarts
:param uart_tx: tx pin number
:param uart_rx: rx pin number
:param set_pin: optional pin number for set pin
:param reset_pin: optional pin number for reset pin
:param interval_reading: In passive mode controls the reading interval, defaults to 0.1 in active_mode.
:param active_mode:
:param eco_mode:
:param interval_publish: publish interval, independent of interval_reading and active_mode
:param mqtt_topic:
:param friendly_name: optional, list of friendly_names for all types. Has to provide a name for every type.
:param discover:
:param expose_intervals: intervals can be changed through mqtt
:param intervals_topic:
"""
super().__init__(COMPONENT_NAME, __version__, 0, discover, interval_publish,
interval_reading, mqtt_topic, _log, expose_intervals, intervals_topic)
if type(friendly_name) is not None:
if type(friendly_name) == list:
if len(friendly_name) != 12:
_log.warn("Length of friendly name is wrong, expected 12 got {!s}".format(
len(friendly_name)))
friendly_name = None
else:
_log.warn("Friendly name got unsupported type {!s}, expect list".format(
type(friendly_name)))
friendly_name = None
for tp in TYPES:
ind = TYPES.index(tp)
self._addSensorType(tp, 0, 0, VALUE_TEMPLATE_JSON.format(tp), UNITS[ind],
friendly_name[ind] if friendly_name is not None else tp,
None, DISCOVERY_PM.format(UNITS[ind], tp))
uart = machine.UART(uart_number, tx=uart_tx, rx=uart_rx, baudrate=9600)
self._count = 0
##############################
# create sensor object
self.pms = sensorModule.PMS5003(self, uart, config.Lock(), set_pin, reset_pin,
interval_reading, active_mode=active_mode,
eco_mode=eco_mode)
self._active_mode = active_mode
gc.collect()
if self._active_mode is False:
# in passive mode using callback because reading intervals could drift apart
# between sensor and ComponentSensor
self.pms.registerCallback(self._saveVals)