Python源码示例:machine.UART

示例1
def uart_readline(self, **kwargs) -> str:
        """
        Read a line (any character until newline is found) from a UART interface.

        :param kwargs: Parameters to pass to :meth:`platypush.plugins.esp.EspPlugin.uart_open` and
            :meth:`platypush.plugins.esp.EspPlugin.execute`.
        :return: String representation of the read bytes, or base64-encoded representation if the
            data can't be decoded to a string.
        """
        self.uart_open(**kwargs)
        response = self.execute('uart.readline()', **kwargs).output

        try:
            return response.decode()
        except UnicodeDecodeError:
            return base64.encodebytes(response).decode() 
示例2
def uart_write(self, data: str, binary: bool = False, **kwargs):
        """
        Write data to the UART bus.

        :param data: Data to be written.
        :param binary: By default data will be treated as a string. Set binary to True if it should
            instead be treated as a base64-encoded binary string to be decoded before being sent.
        :param kwargs: Parameters to pass to :meth:`platypush.plugins.esp.EspPlugin.uart_open` and
            :meth:`platypush.plugins.esp.EspPlugin.execute`.
        """
        if binary:
            data = base64.decodebytes(data.encode())
        else:
            data = data.encode()

        data = 'b"' + ''.join(['\\x{:02x}'.format(b) for b in data]) + '"'
        self.uart_open(**kwargs)

        code = 'uart.write({data})'.format(data=data)
        self.execute(code, **kwargs) 
示例3
def __init__(self):
        self.uart = machine.UART(2, baudrate=115200, tx=17, rx=-2) 
示例4
def __init__(self):
        self.uart = machine.UART(1, tx=17, rx=16)
        self.uart.init(9600, bits=8, parity=None, stop=1)
        self._timer = timEx.addTimer(200, timEx.PERIODIC, self._monitor)
        self.data_save = b'' 
示例5
def __init__(self):
        self.uart = machine.UART(1, tx=17, rx=16)
        self.uart.init(9600, bits=8, parity=None, stop=1)
        self.callback = None
        if self._reset() == 1:
            raise module.Module('Module lorawan not connect')
        self._timer = None
        self.uartString = ''
        time.sleep(2) 
示例6
def __init__(self):

        self.uart = machine.UART(1, tx=17, rx=16)
        self.uart.init(19600, bits=0, parity=None, stop=1)
        self._timer = timEx.addTimer(100, timEx.PERIODIC, self._monitor)
        self._times = 0
        self.cb = None
        self.unknownCb = None
        self.access_add = 0
        self.user_id_add = 0
        
        self.state = ''
        time.sleep_ms(100)
        self.readUser() 
示例7
def __init__(self, port):
        self.uart = machine.UART(1, tx=port[0], rx=port[1])
        self.uart.init(19600, bits=0, parity=None, stop=1)
        self._timer = timEx.addTimer(100, timEx.PERIODIC, self._monitor)
        self._times = 0
        self.cb = None
        self.unknownCb = None
        self.access_add = 0
        self.user_id_add = 0
        
        self.state = ''
        time.sleep_ms(100)
        self.readUser() 
示例8
def uart_close(self, **kwargs):
        """
        Turn off the UART bus.

        :param kwargs: Parameters to pass to :meth:`platypush.plugins.esp.EspPlugin.uart_open` and
            :meth:`platypush.plugins.esp.EspPlugin.execute`.
        """
        self.uart_open(**kwargs)
        self.execute('uart.deinit()', **kwargs) 
示例9
def uart_read(self, size: Optional[int] = None, **kwargs) -> str:
        """
        Read from a UART interface.

        :param size: Number of bytes to read (default: read all available characters).
        :param kwargs: Parameters to pass to :meth:`platypush.plugins.esp.EspPlugin.uart_open` and
            :meth:`platypush.plugins.esp.EspPlugin.execute`.
        :return: String representation of the read bytes, or base64-encoded representation if the
            data can't be decoded to a string.
        """
        self.uart_open(**kwargs)

        code = '''
args = []
if {size}:
    args.append({size})

uart.read(*args)
'''.format(size=size)

        response = self.execute(code, **kwargs).output
        try:
            return response.decode()
        except UnicodeDecodeError:
            return base64.encodebytes(response).decode() 
示例10
def __init__(self, en, tx, rx, rst):
        self.en = en
        self.tx = tx
        self.rx = rx
        self.rst = rst
        self.en.mode(Pin.OUT)
        self.rst.mode(Pin.OUT)

        self.uart = UART(1, baudrate=9600, pins=(self.tx, self.rx))
        self.uart.deinit() 
示例11
def read_gps_sample():
    """
    Attempts to read GPS and print the latest GPS values.
    """

    try:
        # Attempt to read GPS data up to 3 times.
        for i in range(3):
            print("- Reading GPS data... ",  end="")
            # Configure the UART to the GPS required parameters.
            u.init(9600, bits=8, parity=None, stop=1)
            time.sleep(1)
            # Ensures that there will only be a print if the UART
            # receives information from the GPS module.
            while not u.any():
                if u.any():
                    break
            # Read data from the GPS.
            gps_data = str(u.read(), 'utf8')
            # Close the UART.
            u.deinit()
            # Get latitude and longitude from the read GPS data.
            lat = extract_latitude(extract_gps(gps_data))
            lon = extract_longitude(extract_gps(gps_data))
            # Print location.
            if lon != 9999 and lat != 9999:
                print("[OK]")
                print("- Latitude: %s" % lat)
                print("- Longitude: %s" % lon)
                print(32 * "-")
                break
            else:
                print("[ERROR]")
                print("   * Bad GPS signal. Retrying...")

    except Exception as E:
        print("[ERROR]")
        print("   * There was a problem getting GPS data: %s", str(E)) 
示例12
def init_gnss():
    """Initialize the GNSS receiver"""

    log('Initializing GNSS...')

    enable = Pin(GNSS_ENABLE_PIN, mode=Pin.OUT)
    enable(False)
    uart = UART(GNSS_UART_PORT)
    uart.init(GNSS_UART_BAUD, bits=8, parity=None, stop=1)
    enable(True)

    log('Done!')

    return (uart, enable) 
示例13
def enable_serial(self):
        """ """
        # Disable these two lines if you don't want serial access.
        # The Pycom forum tells us that this is already incorporated into
        # more recent firmwares, so this is probably a thing of the past.
        #uart = machine.UART(0, 115200)
        #os.dupterm(uart)
        pass 
示例14
def start(self):
        """Start Terminal on UART0 interface."""
        # Conditionally enable terminal on UART0. Default: False.
        # https://forum.pycom.io/topic/1224/disable-console-to-uart0-to-use-uart0-for-other-purposes
        uart0_enabled = self.settings.get('interfaces.uart0.terminal', False)
        if uart0_enabled:
            from machine import UART
            self.uart = UART(0, 115200)
            #self.uart = UART(0)
            os.dupterm(self.uart)
        else:
            self.shutdown() 
示例15
def uart_open(self, id=1, baudrate: Optional[int] = 9600, bits: Optional[int] = 8, parity: Optional[int] = None,
                  stop: int = 1, tx_pin: Optional[int] = None, rx_pin: Optional[int] = None,
                  timeout: Optional[float] = None, timeout_char: Optional[float] = None, **kwargs):
        """
        Open a connection to a UART port.

        :param id: Bus ID (default: 1).
        :param baudrate: Port baudrate (default: 9600).
        :param bits: Number of bits per character. It can be 7, 8 or 9.
        :param parity: Parity configuration. It can be None (no parity), 0 (even) or 1 (odd).
        :param stop: Number of stop bits. It can be 1 or 2.
        :param tx_pin: Specify the TX PIN to use.
        :param rx_pin: Specify the RX PIN to use.
        :param timeout: Specify the time to wait for the first character in seconds.
        :param timeout_char: Specify the time to wait between characters in seconds.
        :param kwargs: Parameters to pass to :meth:`platypush.plugins.esp.EspPlugin.execute`.
        """
        code = '''
args = {
    'bits': {bits},
    'parity': {parity},
    'stop': {stop},
}

if {tx_pin}:
    args['tx'] = {tx_pin}
if {rx_pin}:
    args['rx'] = {rx_pin}
if {timeout}:
    args['timeout'] = {timeout}
if {timeout_char}:
    args['timeout_char'] = {timeout_char}
'''.format(bits=bits, parity=parity, stop=stop, tx_pin=tx_pin, rx_pin=rx_pin,
           timeout=timeout, timeout_char=timeout_char)

        self.execute(code, **kwargs)

        code = '''
import machine
uart = machine.UART({id}, {baudrate}, **args)
'''.format(id=id, baudrate=baudrate)

        self.execute(code, **kwargs) 
示例16
def __init__(self, uart_number, uart_tx, uart_rx, set_pin=None, reset_pin=None,
                 interval_reading=0.1, active_mode=True, eco_mode=True,
                 interval_publish=None, mqtt_topic=None, friendly_name: list = None,
                 discover=True, expose_intervals=False, intervals_topic=None):
        """
        :param uart_number: esp32 has multiple uarts
        :param uart_tx: tx pin number
        :param uart_rx: rx pin number
        :param set_pin: optional pin number for set pin
        :param reset_pin: optional pin number for reset pin
        :param interval_reading: In passive mode controls the reading interval, defaults to 0.1 in active_mode.
        :param active_mode:
        :param eco_mode:
        :param interval_publish: publish interval, independent of interval_reading and active_mode
        :param mqtt_topic:
        :param friendly_name: optional, list of friendly_names for all types. Has to provide a name for every type.
        :param discover:
        :param expose_intervals: intervals can be changed through mqtt
        :param intervals_topic:
        """
        super().__init__(COMPONENT_NAME, __version__, 0, discover, interval_publish,
                         interval_reading, mqtt_topic, _log, expose_intervals, intervals_topic)
        if type(friendly_name) is not None:
            if type(friendly_name) == list:
                if len(friendly_name) != 12:
                    _log.warn("Length of friendly name is wrong, expected 12 got {!s}".format(
                        len(friendly_name)))
                    friendly_name = None
            else:
                _log.warn("Friendly name got unsupported type {!s}, expect list".format(
                    type(friendly_name)))
                friendly_name = None
        for tp in TYPES:
            ind = TYPES.index(tp)
            self._addSensorType(tp, 0, 0, VALUE_TEMPLATE_JSON.format(tp), UNITS[ind],
                                friendly_name[ind] if friendly_name is not None else tp,
                                None, DISCOVERY_PM.format(UNITS[ind], tp))
        uart = machine.UART(uart_number, tx=uart_tx, rx=uart_rx, baudrate=9600)
        self._count = 0

        ##############################
        # create sensor object
        self.pms = sensorModule.PMS5003(self, uart, config.Lock(), set_pin, reset_pin,
                                        interval_reading, active_mode=active_mode,
                                        eco_mode=eco_mode)
        self._active_mode = active_mode
        gc.collect()
        if self._active_mode is False:
            # in passive mode using callback because reading intervals could drift apart
            # between sensor and ComponentSensor
            self.pms.registerCallback(self._saveVals)