Python源码示例:machine.I2C
示例1
def __init__(self,
pin_id_led = ON_BOARD_LED_PIN_NO,
on_board_led_high_is_on = ON_BOARD_LED_HIGH_IS_ON,
pin_id_reset = PIN_ID_FOR_LORA_RESET,
blink_on_start = (2, 0.5, 0.5),
oled_width = OLED_WIDTH, oled_height = OLED_HEIGHT,
scl_pin_id = PIN_ID_SCL, sda_pin_id = PIN_ID_SDA,
freq = OLED_I2C_FREQ):
controller_esp.Controller.__init__(self,
pin_id_led,
on_board_led_high_is_on,
pin_id_reset,
blink_on_start)
self.reset_pin(self.prepare_pin(self.PIN_ID_FOR_OLED_RESET))
i2c = machine.I2C(scl = machine.Pin(scl_pin_id, machine.Pin.OUT),
sda = machine.Pin(sda_pin_id),
freq = freq)
display_ssd1306_i2c.Display.__init__(self, i2c,
width = oled_width, height = oled_height)
self.show_text('Hello !')
示例2
def __init__(
self, i2c, address=0x68,
accel_fs=ACCEL_FS_SEL_2G, gyro_fs=GYRO_FS_SEL_250DPS,
accel_sf=SF_M_S2, gyro_sf=SF_RAD_S,
gyro_offset=(0, 0, 0)
):
self.i2c = i2c
self.address = address
# 0x70 = standalone MPU6500, 0x71 = MPU6250 SIP
if self.whoami not in [0x71, 0x70]:
raise RuntimeError("MPU6500 not found in I2C bus.")
self._accel_so = self._accel_fs(accel_fs)
self._gyro_so = self._gyro_fs(gyro_fs)
self._accel_sf = accel_sf
self._gyro_sf = gyro_sf
self._gyro_offset = gyro_offset
示例3
def get(port):
global bus_0, bus_1, bus_other
if port == PORTA or port == M_BUS:
if bus_0 == None:
bus_0 = I2C(id=0, sda=port[0], scl=port[1])
return bus_0
elif port == PORTC:
if bus_1 == None:
bus_1 = I2C(id=1, sda=port[0], scl=port[1])
return bus_1
else:
if bus_1 == None:
if bus_other == None:
bus_other = I2C(id=1, sda=port[0], scl=port[1])
return bus_other
else:
return bus_other
else:
raise OSError('I2C bus not support 3')
示例4
def i2c_read(self, payload):
"""
Establish an i2c object if not already establed and
read from the device.
:param payload:
:return:
"""
if not self.i2c:
self.i2c = I2C(scl=Pin(5), sda=Pin(4), freq=100000)
try:
data = self.i2c.readfrom_mem(payload['addr'], payload['register'], payload['number_of_bytes'])
except TypeError:
print('read')
raise
try:
data = list(data)
except TypeError:
print(payload, data)
raise
payload = {'report': 'i2c_data', 'value': data}
self.send_payload_to_gateway(payload)
示例5
def i2c_read(self, address: int, size: int, **kwargs) -> str:
"""
Read data from the I2C bus.
:param address: I2C address.
:param size: Number of bytes to read.
:param kwargs: Parameters to pass to :meth:`platypush.plugins.esp.EspPlugin.i2c_open` and
:meth:`platypush.plugins.esp.EspPlugin.execute`.
:return: String representation of the read bytes, or base64-encoded representation if the
data can't be decoded to a string.
"""
self.i2c_open(**kwargs)
code = 'i2c.readfrom({address}, {size})'.format(address=address, size=size)
response = self.execute(code, **kwargs).output
try:
return response.decode()
except UnicodeDecodeError:
return base64.encodebytes(response).decode()
示例6
def i2c_write(self, address: int, data: str, binary: bool = False, **kwargs):
"""
Write data to the I2C bus.
:param address: I2C address.
:param data: Data to be sent.
:param binary: By default data will be treated as a string. Set binary to True if it should
instead be treated as a base64-encoded binary string to be decoded before being sent.
:param kwargs: Parameters to pass to :meth:`platypush.plugins.esp.EspPlugin.i2c_open` and
:meth:`platypush.plugins.esp.EspPlugin.execute`.
:return: String representation of the read bytes, or base64-encoded representation if the
data can't be decoded to a string.
"""
if binary:
data = base64.decodebytes(data.encode())
else:
data = data.encode()
data = 'b"' + ''.join(['\\x{:02x}'.format(b) for b in data]) + '"'
self.i2c_open(**kwargs)
code = 'i2c.writeto({address}, {data})'.format(address=address, data=data)
self.execute(code, **kwargs)
示例7
def send_cmd(self, cmd_request, response_size=6, read_delay_ms=100):
"""
Send a command to the sensor and read (optionally) the response
The responsed data is validated by CRC
"""
try:
self.i2c.start();
self.i2c.writeto(self.i2c_addr, cmd_request);
if not response_size:
self.i2c.stop();
return
time.sleep_ms(read_delay_ms)
data = self.i2c.readfrom(self.i2c_addr, response_size)
self.i2c.stop();
for i in range(response_size//3):
if not self._check_crc(data[i*3:(i+1)*3]): # pos 2 and 5 are CRC
raise SHT30Error(SHT30Error.CRC_ERROR)
if data == bytearray(response_size):
raise SHT30Error(SHT30Error.DATA_ERROR)
return data
except OSError as ex:
if 'I2C' in ex.args[0]:
raise SHT30Error(SHT30Error.BUS_ERROR)
raise ex
示例8
def __init__(self, sda=None, scl=None, disable_interrupts=False):
# create i2c object
self._timeout = 10
self.disable_interrupts = False
self._mpu_i2c = I2C(sda, scl)
self.chip_id = int(unp('>h', self._read(1, 0x75, self.mpu_addr))[0])
# now apply user setting for interrupts
self.disable_interrupts = disable_interrupts
# wake it up
self.wake()
self.accel_range(1)
self._ar = self.accel_range()
self.gyro_range(0)
self._gr = self.gyro_range()
# read from device
示例9
def __init__(self, type, scl, sda):
self.type = type
if self.type == 0:
i2c_bus = I2C(scl=Pin(scl), sda=Pin(sda), freq=100000)
self.sensor = BMP180.BMP180(i2c_bus)
self.sensor.oversample_sett = 2
self.sensor.baseline = 101325
elif self.type == 1:
pass #TODO
else:
log.error("Unknown sensor type '{}'. Cannot instantiate it.".format(self.type))
# @timed_function
示例10
def __init__(self, address=0x20, gpioScl=5, gpioSda=4):
"""Initialize MCP230xx at specified I2C address and bus number. If bus
is not specified it will default to the appropriate platform detected bus.
"""
self.address = address
self.i2c = I2C(scl=Pin(gpioScl),sda=Pin(gpioSda))
# Assume starting in ICON.BANK = 0 mode (sequential access).
# Compute how many bytes are needed to store count of GPIO.
self.gpio_bytes = self.NUM_GPIO//8
# Buffer register values so they can be changed without reading.
self.iodir = bytearray(self.gpio_bytes) # Default direction to all inputs.
self.gppu = bytearray(self.gpio_bytes) # Default to pullups disabled.
self.gpio = bytearray(self.gpio_bytes)
# Write current direction and pullup buffer state.
self.write_iodir()
self.write_gppu()
示例11
def __init__(self, i2c, btm_servo_idx, top_servo_idx):
'''
PCA9685与ESP32之间通过I2C相连接
@gpio_scl : I2C的SCL管脚GPIO编号
@gpio_sda : I2C的SDA管脚的GPIO编号
@btm_servo_idx: 云台下方舵机在PCA9685上面的编号
@top_servo_idx: 云台上方电机在PCA9695上面的编号
'''
self.servos = Servos(i2c, address=0x40) # 实例化一个舵机控制板(servo control board)
self.btm_servo_idx = btm_servo_idx # 底部舵机的编号
self.top_servo_idx = top_servo_idx # 云台上方舵机的编号
self.btm_min_angle = 0 # 底部舵机最小旋转角度
self.btm_max_angle = 180 # 底部舵机最大旋转角度
self.btm_init_angle = 100 # 底部舵机的初始角度
self.top_min_angle = 0 # 顶部舵机的最小旋转角度
self.top_max_angle = 180 # 顶部舵机的最大旋转角度
self.top_init_angle = 100 # 顶部舵机的初始角度
# 初始化云台角度
self.init_cloud_platform()
示例12
def __init__(self,
width = 128, height = 64,
scl_pin_id = 5, sda_pin_id = 4,
freq = 400000):
self.width = width
self.height = height
self.i2c = machine.I2C(scl = machine.Pin(scl_pin_id, machine.Pin.OUT),
sda = machine.Pin(sda_pin_id),
freq = freq)
self.display = ssd1306.SSD1306_I2C(width, height, self.i2c)
self.show = self.display.show
示例13
def setup(use_spi=False, soft=True):
if use_spi:
# Pyb SSD
# 3v3 Vin
# Gnd Gnd
# X1 DC
# X2 CS
# X3 Rst
# X6 CLK
# X8 DATA
pdc = machine.Pin('X1', machine.Pin.OUT_PP)
pcs = machine.Pin('X2', machine.Pin.OUT_PP)
prst = machine.Pin('X3', machine.Pin.OUT_PP)
if soft:
spi = machine.SPI(sck=machine.Pin('X6'), mosi=machine.Pin('X8'), miso=machine.Pin('X7'))
else:
spi = machine.SPI(1)
ssd = SSD1306_SPI(WIDTH, HEIGHT, spi, pdc, prst, pcs)
else: # I2C
# Pyb SSD
# 3v3 Vin
# Gnd Gnd
# Y9 CLK
# Y10 DATA
if soft:
pscl = machine.Pin('Y9', machine.Pin.OPEN_DRAIN)
psda = machine.Pin('Y10', machine.Pin.OPEN_DRAIN)
i2c = machine.I2C(scl=pscl, sda=psda)
else:
i2c = machine.I2C(2)
ssd = SSD1306_I2C(WIDTH, HEIGHT, i2c)
return ssd
示例14
def __init__(
self, i2c, address=0x0c,
mode=MODE_CONTINOUS_MEASURE_1, output=OUTPUT_16_BIT,
offset=(0, 0, 0), scale=(1, 1, 1)
):
self.i2c = i2c
self.address = address
self._offset = offset
self._scale = scale
if 0x48 != self.whoami:
raise RuntimeError("AK8963 not found in I2C bus.")
# Sensitivity adjustement values
self._register_char(_CNTL1, _MODE_FUSE_ROM_ACCESS)
asax = self._register_char(_ASAX)
asay = self._register_char(_ASAY)
asaz = self._register_char(_ASAZ)
self._register_char(_CNTL1, _MODE_POWER_DOWN)
# Should wait atleast 100us before next mode
self._adjustement = (
(0.5 * (asax - 128)) / 128 + 1,
(0.5 * (asay - 128)) / 128 + 1,
(0.5 * (asaz - 128)) / 128 + 1
)
# Power on
self._register_char(_CNTL1, (mode | output))
if output is OUTPUT_16_BIT:
self._so = _SO_16BIT
else:
self._so = _SO_14BIT
示例15
def __init__(self, i2c=None, addr=0x5c):
if i2c == None:
from machine import I2C, Pin
self.i2c = I2C(sda=21, scl=22)
else:
self.i2c = i2c
self.addr = addr
self.buf = bytearray(5)
示例16
def __init__(self, side_str, device_addr=None, transposition=(0, 1, 2), scaling=(1, 1, 1)):
self._accel = Vector3d(transposition, scaling, self._accel_callback)
self._gyro = Vector3d(transposition, scaling, self._gyro_callback)
self.buf1 = bytearray(1) # Pre-allocated buffers for reads: allows reads to
self.buf2 = bytearray(2) # be done in interrupt handlers
self.buf3 = bytearray(3)
self.buf6 = bytearray(6)
sleep_ms(200) # Ensure PSU and device have settled
if isinstance(side_str, str): # Non-pyb targets may use other than X or Y
self._mpu_i2c = I2C(side_str)
elif hasattr(side_str, 'readfrom'): # Soft or hard I2C instance. See issue #3097
self._mpu_i2c = side_str
else:
raise ValueError("Invalid I2C instance")
if device_addr is None:
devices = set(self._mpu_i2c.scan())
mpus = devices.intersection(set(self._mpu_addr))
number_of_mpus = len(mpus)
if number_of_mpus == 0:
raise MPUException("No MPU's detected")
elif number_of_mpus == 1:
self.mpu_addr = mpus.pop()
else:
raise ValueError("Two MPU's detected: must specify a device address")
else:
if device_addr not in (0, 1):
raise ValueError('Device address must be 0 or 1')
self.mpu_addr = self._mpu_addr[device_addr]
self.chip_id # Test communication by reading chip_id: throws exception on error
# Can communicate with chip. Set it up.
self.wake() # wake it up
self.passthrough = True # Enable mag access from main I2C bus
self.accel_range = 0 # default to highest sensitivity
self.gyro_range = 0 # Likewise for gyro
# read from device
示例17
def _read(self, buf, memaddr, addr): # addr = I2C device address, memaddr = memory location within the I2C device
'''
Read bytes to pre-allocated buffer Caller traps OSError.
'''
self._mpu_i2c.readfrom_mem_into(addr, memaddr, buf)
# write to device
示例18
def __init__(self, sensor_id='mcp9808',
address=MCP9808_I2CADDR_DEFAULT, scl_pinno=5, sda_pinno=4,
i2c=None):
"""Initialize MCP9808 device on the specified I2C address and bus number.
Address defaults to 0x18 and bus number defaults to the appropriate bus
for the hardware.
"""
self.sensor_id = sensor_id
if i2c is None:
self.i2c = I2C(scl=Pin(scl_pinno, Pin.IN),sda=Pin(sda_pinno, Pin.IN))
else:
self.i2c = i2c
self.address=address
assert self.begin(), "Invalid values read from I2C bus for MCP9808"
示例19
def __init__(self, scl_pinno=5, sda_pinno=4):
self.i2c = I2C(scl=Pin(scl_pinno, Pin.IN),
sda=Pin(sda_pinno, Pin.IN))
示例20
def __init__(self, scl, sda):
"""Initiate the HUT21D
Args:
scl (int): Pin id where the sdl pin is connected to
sda (int): Pin id where the sda pin is connected to
"""
self.i2c = I2C(scl=Pin(scl, Pin.OPEN_DRAIN, Pin.PULL_UP), sda=Pin(sda, Pin.OPEN_DRAIN, Pin.PULL_UP), freq=100000)
示例21
def __init__(self):
# initialize ugfx
ugfx.init()
ugfx.clear(ugfx.WHITE)
# Buttons
ugfx.input_init()
self.init_buttons()
# Container
width = ugfx.width()
height = ugfx.height()
ind_height = 46
container_height = height - ind_height
self.indicator = ugfx.Container(0, 0, width, ind_height, style=styles.ibm_st)
self.container = ugfx.Container(0, ind_height, width, container_height, style=styles.ibm_st)
self.graph_basepos = container_height - 5
# Sensor
SCL = Pin(26) # SCL
SDA = Pin(25) # SDA
self.sensor = MPU6050(I2C(scl=SCL, sda=SDA))
# Buzzer
self.buzzer = Buzzer()
示例22
def i2c_write(self, payload):
"""
Establish an i2c object if not already establed and
write to the device.
:param payload:
:return:
"""
if not self.i2c:
self.i2c = I2C(scl=Pin(5), sda=Pin(4), freq=100000)
self.i2c.writeto(payload['addr'], bytearray(payload['data']))
示例23
def i2c_read(self,payload):
if not self.i2c:
self.i2c=I2C(scl=Pin(5),sda=Pin(4),freq=100000)
try:
data=self.i2c.readfrom_mem(payload['addr'],payload['register'],payload['number_of_bytes'])
except TypeError:
print('read')
raise
try:
data=list(data)
except TypeError:
print(payload,data)
raise
payload={'report':'i2c_data','value':data}
self.send_payload_to_gateway(payload)
示例24
def i2c_write(self,payload):
if not self.i2c:
self.i2c=I2C(scl=Pin(5),sda=Pin(4),freq=100000)
self.i2c.writeto(payload['addr'],bytearray(payload['data']))
示例25
def __init__(self, verbose):
self.verbose = verbose
self.cl = None # Client instance for server comms.
# Instantiate a Pyboard Channel
i2c = I2C(scl=Pin(0), sda=Pin(2)) # software I2C
syn = Pin(5)
ack = Pin(4)
self.chan = asi2c.Responder(i2c, syn, ack) # Channel to Pyboard
self.sreader = asyncio.StreamReader(self.chan)
self.swriter = asyncio.StreamWriter(self.chan, {})
示例26
def i2c_open(self, scl: Optional[int] = None, sda: Optional[int] = None, id: int = -1, baudrate: int = 400000, **kwargs):
"""
Open a connection to an I2C (or "two-wire") port.
:param scl: PIN number for the SCL (serial clock) line.
:param sda: PIN number for the SDA (serial data) line.
:param id: The default value of -1 selects a software implementation of I2C which can work (in most cases)
with arbitrary pins for SCL and SDA. If id is -1 then scl and sda must be specified. Other allowed
values for id depend on the particular port/board, and specifying scl and sda may or may not be required
or allowed in this case.
:param baudrate: Port frequency/clock rate (default: 400 kHz).
:param kwargs: Parameters to pass to :meth:`platypush.plugins.esp.EspPlugin.execute`.
"""
code = '''
import machine
args = {}
if {scl}:
args['scl'] = machine.Pin(''' + str(scl) + ''')
if {sda}:
args['sda'] = machine.Pin(''' + str(sda) + ''')
'''
self.execute(code, **kwargs)
code = '''
i2c = machine.I2C(id={id}, freq={baudrate}, **args)
'''.format(id=id, baudrate=baudrate)
self.execute(code, **kwargs)
示例27
def i2c_close(self, **kwargs):
"""
Turn off an I2C bus.
:param kwargs: Parameters to pass to :meth:`platypush.plugins.esp.EspPlugin.i2c_open` and
:meth:`platypush.plugins.esp.EspPlugin.execute`.
"""
self.i2c_open(**kwargs)
self.execute('i2c.deinit()', **kwargs)
示例28
def i2c_scan(self, **kwargs) -> List[int]:
"""
Scan for device addresses on the I2C bus.
:param kwargs: Parameters to pass to :meth:`platypush.plugins.esp.EspPlugin.i2c_open` and
:meth:`platypush.plugins.esp.EspPlugin.execute`.
:return: List of 7-bit addresses.
"""
self.i2c_open(**kwargs)
return self.execute('i2c.scan', **kwargs).output
示例29
def i2c_stop(self, **kwargs):
"""
Generate a STOP condition on an I2C bus.
:param kwargs: Parameters to pass to :meth:`platypush.plugins.esp.EspPlugin.i2c_open` and
:meth:`platypush.plugins.esp.EspPlugin.execute`.
:return: String representation of the read bytes, or base64-encoded representation if the
data can't be decoded to a string.
"""
self.i2c_open(**kwargs)
self.execute('i2c.stop()', **kwargs)
示例30
def __init__(self, is_debug=False):
'''
Car构造器函数
'''
# 电池ADC采样
self.battery_adc = BatteryVoltage(
gpio_dict['BATTERY_ADC'],
is_debug=False)
# 用户按键
self.user_button = Button(
0,
callback=lambda pin: self.stop_trigger(pin))
try:
# 创建一个I2C对象
i2c = I2C(
scl=Pin(gpio_dict['I2C_SCL']),
sda=Pin(gpio_dict['I2C_SDA']),
freq=car_property['I2C_FREQUENCY'])
# 创建舵机云台对象
self.cloud_platform = CloudPlatform(i2c)
except:
print('[ERROR]: pca9885舵机驱动模块初始化失败')
print('[Hint]: 请检查接线')
self.speed_percent = 70 # 小车默认速度
# 左侧电机
self.left_motor = Motor(0)
self.left_motor.stop() # 左侧电机停止
# 右侧电机
self.right_motor = Motor(1)
self.right_motor.stop() # 右侧电机停止
# 小车停止标志位
self.stop_flag = False
self.is_debug = is_debug # 是否开始调试模式