Python源码示例:machine.I2C

示例1
def __init__(self,
                 pin_id_led = ON_BOARD_LED_PIN_NO,
                 on_board_led_high_is_on = ON_BOARD_LED_HIGH_IS_ON,
                 pin_id_reset = PIN_ID_FOR_LORA_RESET,
                 blink_on_start = (2, 0.5, 0.5),
                 oled_width = OLED_WIDTH, oled_height = OLED_HEIGHT,
                 scl_pin_id = PIN_ID_SCL, sda_pin_id = PIN_ID_SDA,
                 freq = OLED_I2C_FREQ):

        controller_esp.Controller.__init__(self,
                                           pin_id_led,
                                           on_board_led_high_is_on,
                                           pin_id_reset,
                                           blink_on_start)

        self.reset_pin(self.prepare_pin(self.PIN_ID_FOR_OLED_RESET))

        i2c = machine.I2C(scl = machine.Pin(scl_pin_id, machine.Pin.OUT),
                          sda = machine.Pin(sda_pin_id),
                          freq = freq)
        display_ssd1306_i2c.Display.__init__(self, i2c,
                                             width = oled_width, height = oled_height)
        self.show_text('Hello !') 
示例2
def __init__(
        self, i2c, address=0x68,
        accel_fs=ACCEL_FS_SEL_2G, gyro_fs=GYRO_FS_SEL_250DPS,
        accel_sf=SF_M_S2, gyro_sf=SF_RAD_S,
        gyro_offset=(0, 0, 0)
    ):
        self.i2c = i2c
        self.address = address

        # 0x70 = standalone MPU6500, 0x71 = MPU6250 SIP
        if self.whoami not in [0x71, 0x70]:
            raise RuntimeError("MPU6500 not found in I2C bus.")

        self._accel_so = self._accel_fs(accel_fs)
        self._gyro_so = self._gyro_fs(gyro_fs)
        self._accel_sf = accel_sf
        self._gyro_sf = gyro_sf
        self._gyro_offset = gyro_offset 
示例3
def get(port):
    global bus_0, bus_1, bus_other
    if port == PORTA or port == M_BUS:
        if bus_0 == None:
            bus_0 = I2C(id=0, sda=port[0], scl=port[1])
        return bus_0
    elif port == PORTC:
        if bus_1 == None:
            bus_1 = I2C(id=1, sda=port[0], scl=port[1])
        return bus_1
    else:
        if bus_1 == None:
            if bus_other == None:
                bus_other = I2C(id=1, sda=port[0], scl=port[1])
                return bus_other
            else:
                return bus_other
        else:
            raise OSError('I2C bus not support 3') 
示例4
def i2c_read(self, payload):
        """
        Establish an i2c object if not already establed and
        read from the device.
        :param payload:
        :return:
        """
        if not self.i2c:
            self.i2c = I2C(scl=Pin(5), sda=Pin(4), freq=100000)
        try:
            data = self.i2c.readfrom_mem(payload['addr'], payload['register'], payload['number_of_bytes'])
        except TypeError:
            print('read')
            raise
        try:
            data = list(data)
        except TypeError:
            print(payload, data)
            raise
        payload = {'report': 'i2c_data', 'value': data}
        self.send_payload_to_gateway(payload) 
示例5
def i2c_read(self, address: int, size: int, **kwargs) -> str:
        """
        Read data from the I2C bus.

        :param address: I2C address.
        :param size: Number of bytes to read.
        :param kwargs: Parameters to pass to :meth:`platypush.plugins.esp.EspPlugin.i2c_open` and
            :meth:`platypush.plugins.esp.EspPlugin.execute`.
        :return: String representation of the read bytes, or base64-encoded representation if the
            data can't be decoded to a string.
        """
        self.i2c_open(**kwargs)
        code = 'i2c.readfrom({address}, {size})'.format(address=address, size=size)
        response = self.execute(code, **kwargs).output

        try:
            return response.decode()
        except UnicodeDecodeError:
            return base64.encodebytes(response).decode() 
示例6
def i2c_write(self, address: int, data: str, binary: bool = False, **kwargs):
        """
        Write data to the I2C bus.

        :param address: I2C address.
        :param data: Data to be sent.
        :param binary: By default data will be treated as a string. Set binary to True if it should
            instead be treated as a base64-encoded binary string to be decoded before being sent.
        :param kwargs: Parameters to pass to :meth:`platypush.plugins.esp.EspPlugin.i2c_open` and
            :meth:`platypush.plugins.esp.EspPlugin.execute`.
        :return: String representation of the read bytes, or base64-encoded representation if the
            data can't be decoded to a string.
        """
        if binary:
            data = base64.decodebytes(data.encode())
        else:
            data = data.encode()

        data = 'b"' + ''.join(['\\x{:02x}'.format(b) for b in data]) + '"'
        self.i2c_open(**kwargs)
        code = 'i2c.writeto({address}, {data})'.format(address=address, data=data)
        self.execute(code, **kwargs) 
示例7
def send_cmd(self, cmd_request, response_size=6, read_delay_ms=100):
        """
        Send a command to the sensor and read (optionally) the response
        The responsed data is validated by CRC
        """
        try:
            self.i2c.start(); 
            self.i2c.writeto(self.i2c_addr, cmd_request); 
            if not response_size:
                self.i2c.stop(); 	
                return
            time.sleep_ms(read_delay_ms)
            data = self.i2c.readfrom(self.i2c_addr, response_size) 
            self.i2c.stop(); 
            for i in range(response_size//3):
                if not self._check_crc(data[i*3:(i+1)*3]): # pos 2 and 5 are CRC
                    raise SHT30Error(SHT30Error.CRC_ERROR)
            if data == bytearray(response_size):
                raise SHT30Error(SHT30Error.DATA_ERROR)
            return data
        except OSError as ex:
            if 'I2C' in ex.args[0]:
                raise SHT30Error(SHT30Error.BUS_ERROR)
            raise ex 
示例8
def __init__(self, sda=None, scl=None, disable_interrupts=False):

        # create i2c object
        self._timeout = 10
        self.disable_interrupts = False
        self._mpu_i2c = I2C(sda, scl)
        self.chip_id = int(unp('>h', self._read(1, 0x75, self.mpu_addr))[0])

        # now apply user setting for interrupts
        self.disable_interrupts = disable_interrupts

        # wake it up
        self.wake()
        self.accel_range(1)
        self._ar = self.accel_range()
        self.gyro_range(0)
        self._gr = self.gyro_range()

    # read from device 
示例9
def __init__(self, type, scl, sda):
        self.type = type

        if self.type == 0:
            i2c_bus = I2C(scl=Pin(scl), sda=Pin(sda), freq=100000)
            self.sensor = BMP180.BMP180(i2c_bus)
            self.sensor.oversample_sett = 2
            self.sensor.baseline = 101325

        elif self.type == 1:
             pass #TODO

        else:
            log.error("Unknown sensor type '{}'. Cannot instantiate it.".format(self.type))

    # @timed_function 
示例10
def __init__(self, address=0x20, gpioScl=5, gpioSda=4):
        """Initialize MCP230xx at specified I2C address and bus number.  If bus
        is not specified it will default to the appropriate platform detected bus.
        """
        self.address = address
        self.i2c = I2C(scl=Pin(gpioScl),sda=Pin(gpioSda))
        # Assume starting in ICON.BANK = 0 mode (sequential access).
        # Compute how many bytes are needed to store count of GPIO.
        self.gpio_bytes = self.NUM_GPIO//8
        # Buffer register values so they can be changed without reading.
        self.iodir = bytearray(self.gpio_bytes)  # Default direction to all inputs.
        self.gppu = bytearray(self.gpio_bytes)  # Default to pullups disabled.
        self.gpio = bytearray(self.gpio_bytes)
        # Write current direction and pullup buffer state.
        self.write_iodir()
        self.write_gppu() 
示例11
def __init__(self, i2c, btm_servo_idx, top_servo_idx):
        '''
        PCA9685与ESP32之间通过I2C相连接
        @gpio_scl : I2C的SCL管脚GPIO编号
        @gpio_sda : I2C的SDA管脚的GPIO编号
        @btm_servo_idx: 云台下方舵机在PCA9685上面的编号
        @top_servo_idx: 云台上方电机在PCA9695上面的编号
        '''
        

        self.servos = Servos(i2c, address=0x40) # 实例化一个舵机控制板(servo control board)
        self.btm_servo_idx = btm_servo_idx # 底部舵机的编号
        self.top_servo_idx = top_servo_idx # 云台上方舵机的编号
        self.btm_min_angle = 0 # 底部舵机最小旋转角度
        self.btm_max_angle = 180 # 底部舵机最大旋转角度
        self.btm_init_angle = 100 # 底部舵机的初始角度
        self.top_min_angle = 0 # 顶部舵机的最小旋转角度
        self.top_max_angle = 180 # 顶部舵机的最大旋转角度
        self.top_init_angle = 100 # 顶部舵机的初始角度

        # 初始化云台角度
        self.init_cloud_platform() 
示例12
def __init__(self, 
                 width = 128, height = 64, 
                 scl_pin_id = 5, sda_pin_id = 4,
                 freq = 400000): 
                 
        self.width = width
        self.height = height
        self.i2c = machine.I2C(scl = machine.Pin(scl_pin_id, machine.Pin.OUT),
                               sda = machine.Pin(sda_pin_id), 
                               freq = freq) 
        self.display = ssd1306.SSD1306_I2C(width, height, self.i2c)
        self.show = self.display.show 
示例13
def setup(use_spi=False, soft=True):
    if use_spi:
        # Pyb   SSD
        # 3v3   Vin
        # Gnd   Gnd
        # X1    DC
        # X2    CS
        # X3    Rst
        # X6    CLK
        # X8    DATA
        pdc = machine.Pin('X1', machine.Pin.OUT_PP)
        pcs = machine.Pin('X2', machine.Pin.OUT_PP)
        prst = machine.Pin('X3', machine.Pin.OUT_PP)
        if soft:
            spi = machine.SPI(sck=machine.Pin('X6'), mosi=machine.Pin('X8'), miso=machine.Pin('X7'))
        else:
            spi = machine.SPI(1)
        ssd = SSD1306_SPI(WIDTH, HEIGHT, spi, pdc, prst, pcs)
    else:  # I2C
        # Pyb   SSD
        # 3v3   Vin
        # Gnd   Gnd
        # Y9    CLK
        # Y10   DATA
        if soft:
            pscl = machine.Pin('Y9', machine.Pin.OPEN_DRAIN)
            psda = machine.Pin('Y10', machine.Pin.OPEN_DRAIN)
            i2c = machine.I2C(scl=pscl, sda=psda)
        else:
            i2c = machine.I2C(2)
        ssd = SSD1306_I2C(WIDTH, HEIGHT, i2c)
    return ssd 
示例14
def __init__(
        self, i2c, address=0x0c,
        mode=MODE_CONTINOUS_MEASURE_1, output=OUTPUT_16_BIT,
        offset=(0, 0, 0), scale=(1, 1, 1)
    ):
        self.i2c = i2c
        self.address = address
        self._offset = offset
        self._scale = scale

        if 0x48 != self.whoami:
            raise RuntimeError("AK8963 not found in I2C bus.")

        # Sensitivity adjustement values
        self._register_char(_CNTL1, _MODE_FUSE_ROM_ACCESS)
        asax = self._register_char(_ASAX)
        asay = self._register_char(_ASAY)
        asaz = self._register_char(_ASAZ)
        self._register_char(_CNTL1, _MODE_POWER_DOWN)

        # Should wait atleast 100us before next mode
        self._adjustement = (
            (0.5 * (asax - 128)) / 128 + 1,
            (0.5 * (asay - 128)) / 128 + 1,
            (0.5 * (asaz - 128)) / 128 + 1
        )

        # Power on
        self._register_char(_CNTL1, (mode | output))

        if output is OUTPUT_16_BIT:
            self._so = _SO_16BIT
        else:
            self._so = _SO_14BIT 
示例15
def __init__(self, i2c=None, addr=0x5c):
        if i2c == None:
            from machine import I2C, Pin
            self.i2c = I2C(sda=21, scl=22)
        else:
            self.i2c = i2c
        self.addr = addr
        self.buf = bytearray(5) 
示例16
def __init__(self, side_str, device_addr=None, transposition=(0, 1, 2), scaling=(1, 1, 1)):

        self._accel = Vector3d(transposition, scaling, self._accel_callback)
        self._gyro = Vector3d(transposition, scaling, self._gyro_callback)
        self.buf1 = bytearray(1)                # Pre-allocated buffers for reads: allows reads to
        self.buf2 = bytearray(2)                # be done in interrupt handlers
        self.buf3 = bytearray(3)
        self.buf6 = bytearray(6)

        sleep_ms(200)                           # Ensure PSU and device have settled
        if isinstance(side_str, str):           # Non-pyb targets may use other than X or Y
            self._mpu_i2c = I2C(side_str)
        elif hasattr(side_str, 'readfrom'):     # Soft or hard I2C instance. See issue #3097
            self._mpu_i2c = side_str
        else:
            raise ValueError("Invalid I2C instance")

        if device_addr is None:
            devices = set(self._mpu_i2c.scan())
            mpus = devices.intersection(set(self._mpu_addr))
            number_of_mpus = len(mpus)
            if number_of_mpus == 0:
                raise MPUException("No MPU's detected")
            elif number_of_mpus == 1:
                self.mpu_addr = mpus.pop()
            else:
                raise ValueError("Two MPU's detected: must specify a device address")
        else:
            if device_addr not in (0, 1):
                raise ValueError('Device address must be 0 or 1')
            self.mpu_addr = self._mpu_addr[device_addr]

        self.chip_id                     # Test communication by reading chip_id: throws exception on error
        # Can communicate with chip. Set it up.
        self.wake()                             # wake it up
        self.passthrough = True                 # Enable mag access from main I2C bus
        self.accel_range = 0                    # default to highest sensitivity
        self.gyro_range = 0                     # Likewise for gyro

    # read from device 
示例17
def _read(self, buf, memaddr, addr):        # addr = I2C device address, memaddr = memory location within the I2C device
        '''
        Read bytes to pre-allocated buffer Caller traps OSError.
        '''
        self._mpu_i2c.readfrom_mem_into(addr, memaddr, buf)

    # write to device 
示例18
def __init__(self, sensor_id='mcp9808',
                 address=MCP9808_I2CADDR_DEFAULT, scl_pinno=5, sda_pinno=4,
                 i2c=None):
        """Initialize MCP9808 device on the specified I2C address and bus number.
        Address defaults to 0x18 and bus number defaults to the appropriate bus
        for the hardware.
        """
        self.sensor_id = sensor_id
        if i2c is None:
            self.i2c = I2C(scl=Pin(scl_pinno, Pin.IN),sda=Pin(sda_pinno, Pin.IN))
        else:
            self.i2c = i2c
        self.address=address
        assert self.begin(), "Invalid values read from I2C bus for MCP9808" 
示例19
def __init__(self, scl_pinno=5, sda_pinno=4):
        self.i2c = I2C(scl=Pin(scl_pinno, Pin.IN),
                       sda=Pin(sda_pinno, Pin.IN)) 
示例20
def __init__(self, scl, sda):
        """Initiate the HUT21D

        Args:
            scl (int): Pin id where the sdl pin is connected to
            sda (int): Pin id where the sda pin is connected to
        """
        self.i2c = I2C(scl=Pin(scl, Pin.OPEN_DRAIN, Pin.PULL_UP), sda=Pin(sda, Pin.OPEN_DRAIN, Pin.PULL_UP), freq=100000) 
示例21
def __init__(self):
        # initialize ugfx
        ugfx.init()
        ugfx.clear(ugfx.WHITE)
        
        # Buttons
        ugfx.input_init()
        self.init_buttons()

        # Container
        width = ugfx.width()
        height = ugfx.height()
        ind_height = 46
        container_height = height - ind_height

        self.indicator = ugfx.Container(0, 0, width, ind_height, style=styles.ibm_st)
        self.container = ugfx.Container(0, ind_height, width, container_height, style=styles.ibm_st)

        self.graph_basepos = container_height - 5
        
        # Sensor
        SCL = Pin(26) # SCL
        SDA  = Pin(25) # SDA
        self.sensor = MPU6050(I2C(scl=SCL, sda=SDA))

        # Buzzer
        self.buzzer = Buzzer() 
示例22
def i2c_write(self, payload):
        """
        Establish an i2c object if not already establed and
        write to the device.
        :param payload:
        :return:
        """
        if not self.i2c:
            self.i2c = I2C(scl=Pin(5), sda=Pin(4), freq=100000)
        self.i2c.writeto(payload['addr'], bytearray(payload['data'])) 
示例23
def i2c_read(self,payload):
  if not self.i2c:
   self.i2c=I2C(scl=Pin(5),sda=Pin(4),freq=100000)
  try:
   data=self.i2c.readfrom_mem(payload['addr'],payload['register'],payload['number_of_bytes'])
  except TypeError:
   print('read')
   raise
  try:
   data=list(data)
  except TypeError:
   print(payload,data)
   raise
  payload={'report':'i2c_data','value':data}
  self.send_payload_to_gateway(payload) 
示例24
def i2c_write(self,payload):
  if not self.i2c:
   self.i2c=I2C(scl=Pin(5),sda=Pin(4),freq=100000)
  self.i2c.writeto(payload['addr'],bytearray(payload['data'])) 
示例25
def __init__(self, verbose):
        self.verbose = verbose
        self.cl = None  # Client instance for server comms.
        # Instantiate a Pyboard Channel
        i2c = I2C(scl=Pin(0), sda=Pin(2))  # software I2C
        syn = Pin(5)
        ack = Pin(4)
        self.chan = asi2c.Responder(i2c, syn, ack)  # Channel to Pyboard
        self.sreader = asyncio.StreamReader(self.chan)
        self.swriter = asyncio.StreamWriter(self.chan, {}) 
示例26
def i2c_open(self, scl: Optional[int] = None, sda: Optional[int] = None, id: int = -1, baudrate: int = 400000, **kwargs):
        """
        Open a connection to an I2C (or "two-wire") port.

        :param scl: PIN number for the SCL (serial clock) line.
        :param sda: PIN number for the SDA (serial data) line.
        :param id: The default value of -1 selects a software implementation of I2C which can work (in most cases)
            with arbitrary pins for SCL and SDA. If id is -1 then scl and sda must be specified. Other allowed
            values for id depend on the particular port/board, and specifying scl and sda may or may not be required
            or allowed in this case.
        :param baudrate: Port frequency/clock rate (default: 400 kHz).
        :param kwargs: Parameters to pass to :meth:`platypush.plugins.esp.EspPlugin.execute`.
        """
        code = '''
import machine

args = {}
if {scl}:
    args['scl'] = machine.Pin(''' + str(scl) + ''')
if {sda}:
    args['sda'] = machine.Pin(''' + str(sda) + ''')
'''

        self.execute(code, **kwargs)

        code = '''
i2c = machine.I2C(id={id}, freq={baudrate}, **args)
'''.format(id=id, baudrate=baudrate)

        self.execute(code, **kwargs) 
示例27
def i2c_close(self, **kwargs):
        """
        Turn off an I2C bus.

        :param kwargs: Parameters to pass to :meth:`platypush.plugins.esp.EspPlugin.i2c_open` and
            :meth:`platypush.plugins.esp.EspPlugin.execute`.
        """
        self.i2c_open(**kwargs)
        self.execute('i2c.deinit()', **kwargs) 
示例28
def i2c_scan(self, **kwargs) -> List[int]:
        """
        Scan for device addresses on the I2C bus.

        :param kwargs: Parameters to pass to :meth:`platypush.plugins.esp.EspPlugin.i2c_open` and
            :meth:`platypush.plugins.esp.EspPlugin.execute`.
        :return: List of 7-bit addresses.
        """
        self.i2c_open(**kwargs)
        return self.execute('i2c.scan', **kwargs).output 
示例29
def i2c_stop(self, **kwargs):
        """
        Generate a STOP condition on an I2C bus.

        :param kwargs: Parameters to pass to :meth:`platypush.plugins.esp.EspPlugin.i2c_open` and
            :meth:`platypush.plugins.esp.EspPlugin.execute`.
        :return: String representation of the read bytes, or base64-encoded representation if the
            data can't be decoded to a string.
        """
        self.i2c_open(**kwargs)
        self.execute('i2c.stop()', **kwargs) 
示例30
def __init__(self, is_debug=False):
        '''
        Car构造器函数
        '''
        # 电池ADC采样
        self.battery_adc = BatteryVoltage(
            gpio_dict['BATTERY_ADC'],
            is_debug=False)
        
        # 用户按键
        self.user_button = Button(
            0, 
            callback=lambda pin: self.stop_trigger(pin))
        
        try:
            # 创建一个I2C对象
            i2c = I2C(
                scl=Pin(gpio_dict['I2C_SCL']),
                sda=Pin(gpio_dict['I2C_SDA']),
                freq=car_property['I2C_FREQUENCY'])
            # 创建舵机云台对象
            self.cloud_platform = CloudPlatform(i2c)
        except:
            print('[ERROR]: pca9885舵机驱动模块初始化失败')
            print('[Hint]: 请检查接线')
        
        self.speed_percent = 70 # 小车默认速度

        # 左侧电机
        self.left_motor = Motor(0)
        self.left_motor.stop() # 左侧电机停止

        # 右侧电机
        self.right_motor = Motor(1)
        self.right_motor.stop() # 右侧电机停止

        # 小车停止标志位
        self.stop_flag = False
        self.is_debug = is_debug # 是否开始调试模式