在 python 中从各种 USB 设备读取和存储各种数据
read and stock various data from various usb devices in python
我是 python 的初学者,我正在尝试从通过 USB 集线器连接到计算机的多个传感器(湿度、温度、压力传感器......)读取数据。我的主要目标是每五分钟记录一次这些传感器的不同值,然后将其存储起来进行分析。
我有我的传感器(来自 Hygrosens Instruments)的所有数据表和手册,我知道它们是如何工作的以及它们发送什么样的数据。但我不知道如何阅读它们。下面是我尝试过的,使用 pyserial.
import serial #import the serial library
from time import sleep #import the sleep command from the time library
import binascii
output_file = open('hygro.txt', 'w') #create a file and allow you to write in it only. The name of this file is hygro.txt
ser = serial.Serial("/dev/tty.usbserial-A400DUTI", 9600) #load into a variable 'ser' the information about the usb you are listening. /dev/tty.usbserial.... is the port after plugging in the hygrometer, 9600 is for bauds, it can be diminished
count = 0
while 1:
read_byte = ser.read(size=1)
while 1:
read_byte = ser.read(size=8) #read a byte
read_byte_hexa =binascii.hexlify(read_byte) #convert the byte into hexadecimal
trad_hexa = int(read_byte_hexa , 16) #convert the hexadecimal into an int in purpose to compare it with another int
trad_firstcrchar = int('3c' , 16) #convert the hexadecimal of the '<' into a int to compare it with the first byte
if (trad_hexa == trad_firstcrchar ): #compare the first byte with the '<'
read_byte = ser.read(size=1) #read the next byte (I am not sure if that really works)
read_byte_hexa =binascii.hexlify(read_byte)# from now I am doing the same thing as before
trad_hexa = int(read_byte_hexa , 16)
trad_scdcrchar = int('63' , 16)
print(trad_hexa, end='/')# this just show me if it gets in the condition
print(trad_scdcrchar)
if (trad_hexa == trad_scdcrchar ):
read_byte = ser.read(size=1) #read the next byte
read_byte_hexa =binascii.hexlify(read_byte)
trad_hexa = int(read_byte_hexa , 16)
trad_thirdcrchar = int('72' , 16)
print(trad_hexa, end='///')
print(trad_thirdcrchar)
if (trad_hexa == trad_thirdcrchar ):
read_byte = ser.read(size=1) #read the next byte
read_byte_hexa =binascii.hexlify(read_byte)
trad_hexa = int(read_byte_hexa , 16)
trad_fourthcrchar = int('3e' , 16)
print(trad_hexa, end='////')
print(trad_fourthcrchar)
if (trad_hexa == trad_fourthcrchar ):
print ('end of the line')
import time
def parse_info_line(line):
# implement to your own liking
logical_channel, physical_probe, hardware_id, crc = [line[index:index+2] for index in (1, 3, 5, 19)]
serialno = line[7:19]
return physical_probe
def parse_value_line(line):
channel, crc = [line[ind:ind+2] for ind in (1,7)]
encoded_temp = line[3:7]
return twos_comp(int(encoded_temp, 16), 16)/100.
def twos_comp(val, bits):
"""compute the 2's compliment of int value `val`"""
if (val & (1 << (bits - 1))) != 0: # if sign bit is set e.g., 8bit: 128-255
val = val - (1 << bits) # compute negative value
return val # return positive value as is
def listen_on_serial(ser):
ser.readline() # do nothing with the first line: you have no idea when you start listening to the data broadcast from the sensor
while True:
line = ser.readline()
try:
first_char = line[0]
except IndexError: # got no data from sensor
break
else:
if first_char == '@': # begins a new sensor record
in_record = True
elif first_char == '$':
in_record = False
elif first_char == 'I':
parse_info_line(line)
elif first_char == 'V':
print(parse_value_line(line))
else:
print("Unexpected character at the start of the line:\
{}".format(line))
time.sleep(2)
>>> import serial
>>> import io
>>>
>>> ser = serial.serial_for_url('loop://', timeout=1)
>>> serio = io.TextIOWrapper(io.BufferedRWPair(ser, ser), newline='\
', line_buffering=True)
>>> serio.write(u'A1A0\
' # simulation of starting to listen halfway between 2 records
... '$\
' # marks the end of the previous record
... '@\
' # marks the start of a new sensor record
... 'I0101010000000000001B\
' # info about a sensor's probe
... 'V0109470D\
' # data matching that probe
... 'I0202010000000000002B\
' # other probe, same sensor
... 'V021BB55C\
') # data corresponding with 2nd probe
73L
>>>
>>> listen_on_serial(serio)
23.75
70.93
>>>
所以现在我想找到数据行的结尾,因为我需要的测量信息在以"V"开头的行中,如果我的传感器的数据表,它说一行结束by ,所以我想一次读取一个字节并查找''。所以我想这样做:
import serial #import the serial library
from time import sleep #import the sleep command from the time library
import binascii
output_file = open('hygro.txt', 'w') #create a file and allow you to write in it only. The name of this file is hygro.txt
ser = serial.Serial("/dev/tty.usbserial-A400DUTI", 9600) #load into a variable 'ser' the information about the usb you are listening. /dev/tty.usbserial.... is the port after plugging in the hygrometer, 9600 is for bauds, it can be diminished
count = 0
while 1:
read_byte = ser.read(size=1)
while 1:
read_byte = ser.read(size=8) #read a byte
read_byte_hexa =binascii.hexlify(read_byte) #convert the byte into hexadecimal
trad_hexa = int(read_byte_hexa , 16) #convert the hexadecimal into an int in purpose to compare it with another int
trad_firstcrchar = int('3c' , 16) #convert the hexadecimal of the '<' into a int to compare it with the first byte
if (trad_hexa == trad_firstcrchar ): #compare the first byte with the '<'
read_byte = ser.read(size=1) #read the next byte (I am not sure if that really works)
read_byte_hexa =binascii.hexlify(read_byte)# from now I am doing the same thing as before
trad_hexa = int(read_byte_hexa , 16)
trad_scdcrchar = int('63' , 16)
print(trad_hexa, end='/')# this just show me if it gets in the condition
print(trad_scdcrchar)
if (trad_hexa == trad_scdcrchar ):
read_byte = ser.read(size=1) #read the next byte
read_byte_hexa =binascii.hexlify(read_byte)
trad_hexa = int(read_byte_hexa , 16)
trad_thirdcrchar = int('72' , 16)
print(trad_hexa, end='///')
print(trad_thirdcrchar)
if (trad_hexa == trad_thirdcrchar ):
read_byte = ser.read(size=1) #read the next byte
read_byte_hexa =binascii.hexlify(read_byte)
trad_hexa = int(read_byte_hexa , 16)
trad_fourthcrchar = int('3e' , 16)
print(trad_hexa, end='////')
print(trad_fourthcrchar)
if (trad_hexa == trad_fourthcrchar ):
print ('end of the line')
import time
def parse_info_line(line):
# implement to your own liking
logical_channel, physical_probe, hardware_id, crc = [line[index:index+2] for index in (1, 3, 5, 19)]
serialno = line[7:19]
return physical_probe
def parse_value_line(line):
channel, crc = [line[ind:ind+2] for ind in (1,7)]
encoded_temp = line[3:7]
return twos_comp(int(encoded_temp, 16), 16)/100.
def twos_comp(val, bits):
"""compute the 2's compliment of int value `val`"""
if (val & (1 << (bits - 1))) != 0: # if sign bit is set e.g., 8bit: 128-255
val = val - (1 << bits) # compute negative value
return val # return positive value as is
def listen_on_serial(ser):
ser.readline() # do nothing with the first line: you have no idea when you start listening to the data broadcast from the sensor
while True:
line = ser.readline()
try:
first_char = line[0]
except IndexError: # got no data from sensor
break
else:
if first_char == '@': # begins a new sensor record
in_record = True
elif first_char == '$':
in_record = False
elif first_char == 'I':
parse_info_line(line)
elif first_char == 'V':
print(parse_value_line(line))
else:
print("Unexpected character at the start of the line:\
{}".format(line))
time.sleep(2)
>>> import serial
>>> import io
>>>
>>> ser = serial.serial_for_url('loop://', timeout=1)
>>> serio = io.TextIOWrapper(io.BufferedRWPair(ser, ser), newline='\
', line_buffering=True)
>>> serio.write(u'A1A0\
' # simulation of starting to listen halfway between 2 records
... '$\
' # marks the end of the previous record
... '@\
' # marks the start of a new sensor record
... 'I0101010000000000001B\
' # info about a sensor's probe
... 'V0109470D\
' # data matching that probe
... 'I0202010000000000002B\
' # other probe, same sensor
... 'V021BB55C\
') # data corresponding with 2nd probe
73L
>>>
>>> listen_on_serial(serio)
23.75
70.93
>>>
但我不确定它是否有效,我的意思是我认为它没有时间读取第二个字节,我正在读取的第二个字节,它不完全是第二个字节。所以这就是我想使用缓冲区的原因,但我真的不明白我该怎么做。我要去寻找它,但如果有人知道一种更简单的方法来做我想做的事,我已经准备好尝试了!
谢谢
您似乎认为该传感器的通信协议的行尾字符是 4 个不同的字符:<、c、r 和 >。然而,这里所指的是回车,通常用 <cr> 表示,在许多编程语言中只用 \
表示(尽管它看起来像 2 个字符,但它只表示一个字符)。
由于协议是结构化的,因此您可以通过逐行读取传感器的数据来大大简化您的代码。以下内容可帮助您入门:
import serial #import the serial library
from time import sleep #import the sleep command from the time library
import binascii
output_file = open('hygro.txt', 'w') #create a file and allow you to write in it only. The name of this file is hygro.txt
ser = serial.Serial("/dev/tty.usbserial-A400DUTI", 9600) #load into a variable 'ser' the information about the usb you are listening. /dev/tty.usbserial.... is the port after plugging in the hygrometer, 9600 is for bauds, it can be diminished
count = 0
while 1:
read_byte = ser.read(size=1)
while 1:
read_byte = ser.read(size=8) #read a byte
read_byte_hexa =binascii.hexlify(read_byte) #convert the byte into hexadecimal
trad_hexa = int(read_byte_hexa , 16) #convert the hexadecimal into an int in purpose to compare it with another int
trad_firstcrchar = int('3c' , 16) #convert the hexadecimal of the '<' into a int to compare it with the first byte
if (trad_hexa == trad_firstcrchar ): #compare the first byte with the '<'
read_byte = ser.read(size=1) #read the next byte (I am not sure if that really works)
read_byte_hexa =binascii.hexlify(read_byte)# from now I am doing the same thing as before
trad_hexa = int(read_byte_hexa , 16)
trad_scdcrchar = int('63' , 16)
print(trad_hexa, end='/')# this just show me if it gets in the condition
print(trad_scdcrchar)
if (trad_hexa == trad_scdcrchar ):
read_byte = ser.read(size=1) #read the next byte
read_byte_hexa =binascii.hexlify(read_byte)
trad_hexa = int(read_byte_hexa , 16)
trad_thirdcrchar = int('72' , 16)
print(trad_hexa, end='///')
print(trad_thirdcrchar)
if (trad_hexa == trad_thirdcrchar ):
read_byte = ser.read(size=1) #read the next byte
read_byte_hexa =binascii.hexlify(read_byte)
trad_hexa = int(read_byte_hexa , 16)
trad_fourthcrchar = int('3e' , 16)
print(trad_hexa, end='////')
print(trad_fourthcrchar)
if (trad_hexa == trad_fourthcrchar ):
print ('end of the line')
import time
def parse_info_line(line):
# implement to your own liking
logical_channel, physical_probe, hardware_id, crc = [line[index:index+2] for index in (1, 3, 5, 19)]
serialno = line[7:19]
return physical_probe
def parse_value_line(line):
channel, crc = [line[ind:ind+2] for ind in (1,7)]
encoded_temp = line[3:7]
return twos_comp(int(encoded_temp, 16), 16)/100.
def twos_comp(val, bits):
"""compute the 2's compliment of int value `val`"""
if (val & (1 << (bits - 1))) != 0: # if sign bit is set e.g., 8bit: 128-255
val = val - (1 << bits) # compute negative value
return val # return positive value as is
def listen_on_serial(ser):
ser.readline() # do nothing with the first line: you have no idea when you start listening to the data broadcast from the sensor
while True:
line = ser.readline()
try:
first_char = line[0]
except IndexError: # got no data from sensor
break
else:
if first_char == '@': # begins a new sensor record
in_record = True
elif first_char == '$':
in_record = False
elif first_char == 'I':
parse_info_line(line)
elif first_char == 'V':
print(parse_value_line(line))
else:
print("Unexpected character at the start of the line:\
{}".format(line))
time.sleep(2)
>>> import serial
>>> import io
>>>
>>> ser = serial.serial_for_url('loop://', timeout=1)
>>> serio = io.TextIOWrapper(io.BufferedRWPair(ser, ser), newline='\
', line_buffering=True)
>>> serio.write(u'A1A0\
' # simulation of starting to listen halfway between 2 records
... '$\
' # marks the end of the previous record
... '@\
' # marks the start of a new sensor record
... 'I0101010000000000001B\
' # info about a sensor's probe
... 'V0109470D\
' # data matching that probe
... 'I0202010000000000002B\
' # other probe, same sensor
... 'V021BB55C\
') # data corresponding with 2nd probe
73L
>>>
>>> listen_on_serial(serio)
23.75
70.93
>>>
twos_comp 函数是由 travc 编写的,如果您有足够的声誉并且打算使用他的代码(即使您不这样做,它仍然是一个不错的选择),我们鼓励您支持他的答案回答,我刚刚投了赞成票)。 listen_on_serial 也可以改进(许多 Python 程序员会识别开关结构并使用字典而不是 if... elif... elif... 来实现它),但这只是为了让您入门。
作为测试,以下代码摘录模拟传感器发送一些数据(以行分隔,使用回车作为行尾标记),这些数据是我从您链接到的 pdf 中复制的 (FAQ_terminalfenster_E. pdf).
import serial #import the serial library
from time import sleep #import the sleep command from the time library
import binascii
output_file = open('hygro.txt', 'w') #create a file and allow you to write in it only. The name of this file is hygro.txt
ser = serial.Serial("/dev/tty.usbserial-A400DUTI", 9600) #load into a variable 'ser' the information about the usb you are listening. /dev/tty.usbserial.... is the port after plugging in the hygrometer, 9600 is for bauds, it can be diminished
count = 0
while 1:
read_byte = ser.read(size=1)
while 1:
read_byte = ser.read(size=8) #read a byte
read_byte_hexa =binascii.hexlify(read_byte) #convert the byte into hexadecimal
trad_hexa = int(read_byte_hexa , 16) #convert the hexadecimal into an int in purpose to compare it with another int
trad_firstcrchar = int('3c' , 16) #convert the hexadecimal of the '<' into a int to compare it with the first byte
if (trad_hexa == trad_firstcrchar ): #compare the first byte with the '<'
read_byte = ser.read(size=1) #read the next byte (I am not sure if that really works)
read_byte_hexa =binascii.hexlify(read_byte)# from now I am doing the same thing as before
trad_hexa = int(read_byte_hexa , 16)
trad_scdcrchar = int('63' , 16)
print(trad_hexa, end='/')# this just show me if it gets in the condition
print(trad_scdcrchar)
if (trad_hexa == trad_scdcrchar ):
read_byte = ser.read(size=1) #read the next byte
read_byte_hexa =binascii.hexlify(read_byte)
trad_hexa = int(read_byte_hexa , 16)
trad_thirdcrchar = int('72' , 16)
print(trad_hexa, end='///')
print(trad_thirdcrchar)
if (trad_hexa == trad_thirdcrchar ):
read_byte = ser.read(size=1) #read the next byte
read_byte_hexa =binascii.hexlify(read_byte)
trad_hexa = int(read_byte_hexa , 16)
trad_fourthcrchar = int('3e' , 16)
print(trad_hexa, end='////')
print(trad_fourthcrchar)
if (trad_hexa == trad_fourthcrchar ):
print ('end of the line')
import time
def parse_info_line(line):
# implement to your own liking
logical_channel, physical_probe, hardware_id, crc = [line[index:index+2] for index in (1, 3, 5, 19)]
serialno = line[7:19]
return physical_probe
def parse_value_line(line):
channel, crc = [line[ind:ind+2] for ind in (1,7)]
encoded_temp = line[3:7]
return twos_comp(int(encoded_temp, 16), 16)/100.
def twos_comp(val, bits):
"""compute the 2's compliment of int value `val`"""
if (val & (1 << (bits - 1))) != 0: # if sign bit is set e.g., 8bit: 128-255
val = val - (1 << bits) # compute negative value
return val # return positive value as is
def listen_on_serial(ser):
ser.readline() # do nothing with the first line: you have no idea when you start listening to the data broadcast from the sensor
while True:
line = ser.readline()
try:
first_char = line[0]
except IndexError: # got no data from sensor
break
else:
if first_char == '@': # begins a new sensor record
in_record = True
elif first_char == '$':
in_record = False
elif first_char == 'I':
parse_info_line(line)
elif first_char == 'V':
print(parse_value_line(line))
else:
print("Unexpected character at the start of the line:\
{}".format(line))
time.sleep(2)
>>> import serial
>>> import io
>>>
>>> ser = serial.serial_for_url('loop://', timeout=1)
>>> serio = io.TextIOWrapper(io.BufferedRWPair(ser, ser), newline='\
', line_buffering=True)
>>> serio.write(u'A1A0\
' # simulation of starting to listen halfway between 2 records
... '$\
' # marks the end of the previous record
... '@\
' # marks the start of a new sensor record
... 'I0101010000000000001B\
' # info about a sensor's probe
... 'V0109470D\
' # data matching that probe
... 'I0202010000000000002B\
' # other probe, same sensor
... 'V021BB55C\
') # data corresponding with 2nd probe
73L
>>>
>>> listen_on_serial(serio)
23.75
70.93
>>>
请注意,当行尾字符不是 \
(换行符)时,pyserial 文档建议使用 TextIOWrapper,这里也已回答。