在 python 中从各种 USB 设备读取和存储各种数据

read and stock various data from various usb devices in python

我是 python 的初学者,我正在尝试从通过 USB 集线器连接到计算机的多个传感器(湿度、温度、压力传感器......)读取数据。我的主要目标是每五分钟记录一次这些传感器的不同值,然后将其存储起来进行分析。

我有我的传感器(来自 Hygrosens Instruments)的所有数据表和手册,我知道它们是如何工作的以及它们发送什么样的数据。但我不知道如何阅读它们。下面是我尝试过的,使用 pyserial.

import serial #import the serial library

from time import sleep #import the sleep command from the time library

import binascii



output_file = open('hygro.txt', 'w') #create a file and allow you to write in it only. The name of this file is hygro.txt



ser = serial.Serial("/dev/tty.usbserial-A400DUTI", 9600) #load into a variable 'ser' the information about the usb you are listening. /dev/tty.usbserial.... is the port after plugging in the hygrometer, 9600 is for bauds, it can be diminished

count = 0

while 1:

  read_byte = ser.read(size=1)

while 1:

  read_byte = ser.read(size=8) #read a byte

  read_byte_hexa =binascii.hexlify(read_byte) #convert the byte into hexadecimal



  trad_hexa = int(read_byte_hexa , 16) #convert the hexadecimal into an int in purpose to compare it with another int

  trad_firstcrchar = int('3c' , 16) #convert the hexadecimal of the '<' into a int to compare it with the first byte  

  if (trad_hexa == trad_firstcrchar ): #compare the first byte with the '<'  

    read_byte = ser.read(size=1) #read the next byte (I am not sure if that really works)

    read_byte_hexa =binascii.hexlify(read_byte)# from now I am doing the same thing as before

    trad_hexa = int(read_byte_hexa , 16)

    trad_scdcrchar = int('63' , 16)

    print(trad_hexa, end='/')# this just show me if it gets in the condition

    print(trad_scdcrchar)  

    if (trad_hexa == trad_scdcrchar ):  

      read_byte = ser.read(size=1) #read the next byte 

      read_byte_hexa =binascii.hexlify(read_byte)

      trad_hexa = int(read_byte_hexa , 16)

      trad_thirdcrchar = int('72' , 16)

      print(trad_hexa, end='///')

      print(trad_thirdcrchar)  

      if (trad_hexa == trad_thirdcrchar ):  

        read_byte = ser.read(size=1) #read the next byte 

        read_byte_hexa =binascii.hexlify(read_byte)

        trad_hexa = int(read_byte_hexa , 16)

        trad_fourthcrchar = int('3e' , 16)

        print(trad_hexa, end='////')

        print(trad_fourthcrchar)  

        if (trad_hexa == trad_fourthcrchar ):  

          print ('end of the line')

import time



def parse_info_line(line):

  # implement to your own liking

  logical_channel, physical_probe, hardware_id, crc = [line[index:index+2] for index in (1, 3, 5, 19)]

  serialno = line[7:19]

  return physical_probe



def parse_value_line(line):

  channel, crc = [line[ind:ind+2] for ind in (1,7)]

  encoded_temp = line[3:7]

  return twos_comp(int(encoded_temp, 16), 16)/100.



def twos_comp(val, bits):

 """compute the 2's compliment of int value `val`"""

  if (val & (1 << (bits - 1))) != 0: # if sign bit is set e.g., 8bit: 128-255

    val = val - (1 << bits)    # compute negative value

  return val             # return positive value as is



def listen_on_serial(ser):

  ser.readline() # do nothing with the first line: you have no idea when you start listening to the data broadcast from the sensor

  while True:

    line = ser.readline()

    try:

      first_char = line[0]

    except IndexError: # got no data from sensor

      break

    else:

      if first_char == '@': # begins a new sensor record

        in_record = True

      elif first_char == '$':

        in_record = False

      elif first_char == 'I':

        parse_info_line(line)

      elif first_char == 'V':

        print(parse_value_line(line))

      else:

        print("Unexpected character at the start of the line:\

{}".format(line))

      time.sleep(2)

>>> import serial

>>> import io

>>> 

>>> ser = serial.serial_for_url('loop://', timeout=1)

>>> serio = io.TextIOWrapper(io.BufferedRWPair(ser, ser), newline='\

', line_buffering=True)

>>> serio.write(u'A1A0\

' # simulation of starting to listen halfway between 2 records

...   '$\

'       # marks the end of the previous record

...   '@\

'       # marks the start of a new sensor record

...   'I0101010000000000001B\

' # info about a sensor's probe

...   'V0109470D\

'       # data matching that probe

...   'I0202010000000000002B\

' # other probe, same sensor

...   'V021BB55C\

')       # data corresponding with 2nd probe

73L

>>> 

>>> listen_on_serial(serio)

23.75

70.93

>>>

所以现在我想找到数据行的结尾,因为我需要的测量信息在以"V"开头的行中,如果我的传感器的数据表,它说一行结束by ,所以我想一次读取一个字节并查找''。所以我想这样做:

import serial #import the serial library

from time import sleep #import the sleep command from the time library

import binascii



output_file = open('hygro.txt', 'w') #create a file and allow you to write in it only. The name of this file is hygro.txt



ser = serial.Serial("/dev/tty.usbserial-A400DUTI", 9600) #load into a variable 'ser' the information about the usb you are listening. /dev/tty.usbserial.... is the port after plugging in the hygrometer, 9600 is for bauds, it can be diminished

count = 0

while 1:

  read_byte = ser.read(size=1)

while 1:

  read_byte = ser.read(size=8) #read a byte

  read_byte_hexa =binascii.hexlify(read_byte) #convert the byte into hexadecimal



  trad_hexa = int(read_byte_hexa , 16) #convert the hexadecimal into an int in purpose to compare it with another int

  trad_firstcrchar = int('3c' , 16) #convert the hexadecimal of the '<' into a int to compare it with the first byte  

  if (trad_hexa == trad_firstcrchar ): #compare the first byte with the '<'  

    read_byte = ser.read(size=1) #read the next byte (I am not sure if that really works)

    read_byte_hexa =binascii.hexlify(read_byte)# from now I am doing the same thing as before

    trad_hexa = int(read_byte_hexa , 16)

    trad_scdcrchar = int('63' , 16)

    print(trad_hexa, end='/')# this just show me if it gets in the condition

    print(trad_scdcrchar)  

    if (trad_hexa == trad_scdcrchar ):  

      read_byte = ser.read(size=1) #read the next byte 

      read_byte_hexa =binascii.hexlify(read_byte)

      trad_hexa = int(read_byte_hexa , 16)

      trad_thirdcrchar = int('72' , 16)

      print(trad_hexa, end='///')

      print(trad_thirdcrchar)  

      if (trad_hexa == trad_thirdcrchar ):  

        read_byte = ser.read(size=1) #read the next byte 

        read_byte_hexa =binascii.hexlify(read_byte)

        trad_hexa = int(read_byte_hexa , 16)

        trad_fourthcrchar = int('3e' , 16)

        print(trad_hexa, end='////')

        print(trad_fourthcrchar)  

        if (trad_hexa == trad_fourthcrchar ):  

          print ('end of the line')

import time



def parse_info_line(line):

  # implement to your own liking

  logical_channel, physical_probe, hardware_id, crc = [line[index:index+2] for index in (1, 3, 5, 19)]

  serialno = line[7:19]

  return physical_probe



def parse_value_line(line):

  channel, crc = [line[ind:ind+2] for ind in (1,7)]

  encoded_temp = line[3:7]

  return twos_comp(int(encoded_temp, 16), 16)/100.



def twos_comp(val, bits):

 """compute the 2's compliment of int value `val`"""

  if (val & (1 << (bits - 1))) != 0: # if sign bit is set e.g., 8bit: 128-255

    val = val - (1 << bits)    # compute negative value

  return val             # return positive value as is



def listen_on_serial(ser):

  ser.readline() # do nothing with the first line: you have no idea when you start listening to the data broadcast from the sensor

  while True:

    line = ser.readline()

    try:

      first_char = line[0]

    except IndexError: # got no data from sensor

      break

    else:

      if first_char == '@': # begins a new sensor record

        in_record = True

      elif first_char == '$':

        in_record = False

      elif first_char == 'I':

        parse_info_line(line)

      elif first_char == 'V':

        print(parse_value_line(line))

      else:

        print("Unexpected character at the start of the line:\

{}".format(line))

      time.sleep(2)

>>> import serial

>>> import io

>>> 

>>> ser = serial.serial_for_url('loop://', timeout=1)

>>> serio = io.TextIOWrapper(io.BufferedRWPair(ser, ser), newline='\

', line_buffering=True)

>>> serio.write(u'A1A0\

' # simulation of starting to listen halfway between 2 records

...   '$\

'       # marks the end of the previous record

...   '@\

'       # marks the start of a new sensor record

...   'I0101010000000000001B\

' # info about a sensor's probe

...   'V0109470D\

'       # data matching that probe

...   'I0202010000000000002B\

' # other probe, same sensor

...   'V021BB55C\

')       # data corresponding with 2nd probe

73L

>>> 

>>> listen_on_serial(serio)

23.75

70.93

>>>

但我不确定它是否有效,我的意思是我认为它没有时间读取第二个字节,我正在读取的第二个字节,它不完全是第二个字节。所以这就是我想使用缓冲区的原因,但我真的不明白我该怎么做。我要去寻找它,但如果有人知道一种更简单的方法来做我想做的事,我已经准备好尝试了!

谢谢


您似乎认为该传感器的通信协议的行尾字符是 4 个不同的字符:<cr>。然而,这里所指的是回车,通常用 <cr> 表示,在许多编程语言中只用 \

表示(尽管它看起来像 2 个字符,但它只表示一个字符)。

由于协议是结构化的,因此您可以通过逐行读取传感器的数据来大大简化您的代码。以下内容可帮助您入门:

import serial #import the serial library

from time import sleep #import the sleep command from the time library

import binascii



output_file = open('hygro.txt', 'w') #create a file and allow you to write in it only. The name of this file is hygro.txt



ser = serial.Serial("/dev/tty.usbserial-A400DUTI", 9600) #load into a variable 'ser' the information about the usb you are listening. /dev/tty.usbserial.... is the port after plugging in the hygrometer, 9600 is for bauds, it can be diminished

count = 0

while 1:

  read_byte = ser.read(size=1)

while 1:

  read_byte = ser.read(size=8) #read a byte

  read_byte_hexa =binascii.hexlify(read_byte) #convert the byte into hexadecimal



  trad_hexa = int(read_byte_hexa , 16) #convert the hexadecimal into an int in purpose to compare it with another int

  trad_firstcrchar = int('3c' , 16) #convert the hexadecimal of the '<' into a int to compare it with the first byte  

  if (trad_hexa == trad_firstcrchar ): #compare the first byte with the '<'  

    read_byte = ser.read(size=1) #read the next byte (I am not sure if that really works)

    read_byte_hexa =binascii.hexlify(read_byte)# from now I am doing the same thing as before

    trad_hexa = int(read_byte_hexa , 16)

    trad_scdcrchar = int('63' , 16)

    print(trad_hexa, end='/')# this just show me if it gets in the condition

    print(trad_scdcrchar)  

    if (trad_hexa == trad_scdcrchar ):  

      read_byte = ser.read(size=1) #read the next byte 

      read_byte_hexa =binascii.hexlify(read_byte)

      trad_hexa = int(read_byte_hexa , 16)

      trad_thirdcrchar = int('72' , 16)

      print(trad_hexa, end='///')

      print(trad_thirdcrchar)  

      if (trad_hexa == trad_thirdcrchar ):  

        read_byte = ser.read(size=1) #read the next byte 

        read_byte_hexa =binascii.hexlify(read_byte)

        trad_hexa = int(read_byte_hexa , 16)

        trad_fourthcrchar = int('3e' , 16)

        print(trad_hexa, end='////')

        print(trad_fourthcrchar)  

        if (trad_hexa == trad_fourthcrchar ):  

          print ('end of the line')

import time



def parse_info_line(line):

  # implement to your own liking

  logical_channel, physical_probe, hardware_id, crc = [line[index:index+2] for index in (1, 3, 5, 19)]

  serialno = line[7:19]

  return physical_probe



def parse_value_line(line):

  channel, crc = [line[ind:ind+2] for ind in (1,7)]

  encoded_temp = line[3:7]

  return twos_comp(int(encoded_temp, 16), 16)/100.



def twos_comp(val, bits):

 """compute the 2's compliment of int value `val`"""

  if (val & (1 << (bits - 1))) != 0: # if sign bit is set e.g., 8bit: 128-255

    val = val - (1 << bits)    # compute negative value

  return val             # return positive value as is



def listen_on_serial(ser):

  ser.readline() # do nothing with the first line: you have no idea when you start listening to the data broadcast from the sensor

  while True:

    line = ser.readline()

    try:

      first_char = line[0]

    except IndexError: # got no data from sensor

      break

    else:

      if first_char == '@': # begins a new sensor record

        in_record = True

      elif first_char == '$':

        in_record = False

      elif first_char == 'I':

        parse_info_line(line)

      elif first_char == 'V':

        print(parse_value_line(line))

      else:

        print("Unexpected character at the start of the line:\

{}".format(line))

      time.sleep(2)

>>> import serial

>>> import io

>>> 

>>> ser = serial.serial_for_url('loop://', timeout=1)

>>> serio = io.TextIOWrapper(io.BufferedRWPair(ser, ser), newline='\

', line_buffering=True)

>>> serio.write(u'A1A0\

' # simulation of starting to listen halfway between 2 records

...   '$\

'       # marks the end of the previous record

...   '@\

'       # marks the start of a new sensor record

...   'I0101010000000000001B\

' # info about a sensor's probe

...   'V0109470D\

'       # data matching that probe

...   'I0202010000000000002B\

' # other probe, same sensor

...   'V021BB55C\

')       # data corresponding with 2nd probe

73L

>>> 

>>> listen_on_serial(serio)

23.75

70.93

>>>

twos_comp 函数是由 travc 编写的,如果您有足够的声誉并且打算使用他的代码(即使您不这样做,它仍然是一个不错的选择),我们鼓励您支持他的答案回答,我刚刚投了赞成票)。 listen_on_serial 也可以改进(许多 Python 程序员会识别开关结构并使用字典而不是 if... elif... elif... 来实现它),但这只是为了让您入门。

作为测试,以下代码摘录模拟传感器发送一些数据(以行分隔,使用回车作为行尾标记),这些数据是我从您链接到的 pdf 中复制的 (FAQ_terminalfenster_E. pdf).

import serial #import the serial library

from time import sleep #import the sleep command from the time library

import binascii



output_file = open('hygro.txt', 'w') #create a file and allow you to write in it only. The name of this file is hygro.txt



ser = serial.Serial("/dev/tty.usbserial-A400DUTI", 9600) #load into a variable 'ser' the information about the usb you are listening. /dev/tty.usbserial.... is the port after plugging in the hygrometer, 9600 is for bauds, it can be diminished

count = 0

while 1:

  read_byte = ser.read(size=1)

while 1:

  read_byte = ser.read(size=8) #read a byte

  read_byte_hexa =binascii.hexlify(read_byte) #convert the byte into hexadecimal



  trad_hexa = int(read_byte_hexa , 16) #convert the hexadecimal into an int in purpose to compare it with another int

  trad_firstcrchar = int('3c' , 16) #convert the hexadecimal of the '<' into a int to compare it with the first byte  

  if (trad_hexa == trad_firstcrchar ): #compare the first byte with the '<'  

    read_byte = ser.read(size=1) #read the next byte (I am not sure if that really works)

    read_byte_hexa =binascii.hexlify(read_byte)# from now I am doing the same thing as before

    trad_hexa = int(read_byte_hexa , 16)

    trad_scdcrchar = int('63' , 16)

    print(trad_hexa, end='/')# this just show me if it gets in the condition

    print(trad_scdcrchar)  

    if (trad_hexa == trad_scdcrchar ):  

      read_byte = ser.read(size=1) #read the next byte 

      read_byte_hexa =binascii.hexlify(read_byte)

      trad_hexa = int(read_byte_hexa , 16)

      trad_thirdcrchar = int('72' , 16)

      print(trad_hexa, end='///')

      print(trad_thirdcrchar)  

      if (trad_hexa == trad_thirdcrchar ):  

        read_byte = ser.read(size=1) #read the next byte 

        read_byte_hexa =binascii.hexlify(read_byte)

        trad_hexa = int(read_byte_hexa , 16)

        trad_fourthcrchar = int('3e' , 16)

        print(trad_hexa, end='////')

        print(trad_fourthcrchar)  

        if (trad_hexa == trad_fourthcrchar ):  

          print ('end of the line')

import time



def parse_info_line(line):

  # implement to your own liking

  logical_channel, physical_probe, hardware_id, crc = [line[index:index+2] for index in (1, 3, 5, 19)]

  serialno = line[7:19]

  return physical_probe



def parse_value_line(line):

  channel, crc = [line[ind:ind+2] for ind in (1,7)]

  encoded_temp = line[3:7]

  return twos_comp(int(encoded_temp, 16), 16)/100.



def twos_comp(val, bits):

 """compute the 2's compliment of int value `val`"""

  if (val & (1 << (bits - 1))) != 0: # if sign bit is set e.g., 8bit: 128-255

    val = val - (1 << bits)    # compute negative value

  return val             # return positive value as is



def listen_on_serial(ser):

  ser.readline() # do nothing with the first line: you have no idea when you start listening to the data broadcast from the sensor

  while True:

    line = ser.readline()

    try:

      first_char = line[0]

    except IndexError: # got no data from sensor

      break

    else:

      if first_char == '@': # begins a new sensor record

        in_record = True

      elif first_char == '$':

        in_record = False

      elif first_char == 'I':

        parse_info_line(line)

      elif first_char == 'V':

        print(parse_value_line(line))

      else:

        print("Unexpected character at the start of the line:\

{}".format(line))

      time.sleep(2)

>>> import serial

>>> import io

>>> 

>>> ser = serial.serial_for_url('loop://', timeout=1)

>>> serio = io.TextIOWrapper(io.BufferedRWPair(ser, ser), newline='\

', line_buffering=True)

>>> serio.write(u'A1A0\

' # simulation of starting to listen halfway between 2 records

...   '$\

'       # marks the end of the previous record

...   '@\

'       # marks the start of a new sensor record

...   'I0101010000000000001B\

' # info about a sensor's probe

...   'V0109470D\

'       # data matching that probe

...   'I0202010000000000002B\

' # other probe, same sensor

...   'V021BB55C\

')       # data corresponding with 2nd probe

73L

>>> 

>>> listen_on_serial(serio)

23.75

70.93

>>>

请注意,当行尾字符不是 \

(换行符)时,pyserial 文档建议使用 TextIOWrapper,这里也已回答。


相关推荐

  • Spring部署设置openshift

    Springdeploymentsettingsopenshift我有一个问题让我抓狂了三天。我根据OpenShift帐户上的教程部署了spring-eap6-quickstart代码。我已配置调试选项,并且已将Eclipse工作区与OpehShift服务器同步-服务器上的一切工作正常,但在Eclipse中出现无法消除的错误。我有这个错误:cvc-complex-type.2.4.a:Invali…
    2025-04-161
  • 检查Java中正则表达式中模式的第n次出现

    CheckfornthoccurrenceofpatterninregularexpressioninJava本问题已经有最佳答案,请猛点这里访问。我想使用Java正则表达式检查输入字符串中特定模式的第n次出现。你能建议怎么做吗?这应该可以工作:MatchResultfindNthOccurance(intn,Patternp,CharSequencesrc){Matcherm=p.matcher…
    2025-04-161
  • 如何让 JTable 停留在已编辑的单元格上

    HowtohaveJTablestayingontheeditedcell如果有人编辑JTable的单元格内容并按Enter,则内容会被修改并且表格选择会移动到下一行。是否可以禁止JTable在单元格编辑后转到下一行?原因是我的程序使用ListSelectionListener在单元格选择上同步了其他一些小部件,并且我不想在编辑当前单元格后选择下一行。Enter的默认绑定是名为selectNext…
    2025-04-161
  • Weblogic 12c 部署

    Weblogic12cdeploy我正在尝试将我的应用程序从Tomcat迁移到Weblogic12.2.1.3.0。我能够毫无错误地部署应用程序,但我遇到了与持久性提供程序相关的运行时错误。这是堆栈跟踪:javax.validation.ValidationException:CalltoTraversableResolver.isReachable()threwanexceptionatorg.…
    2025-04-161
  • Resteasy Content-Type 默认值

    ResteasyContent-Typedefaults我正在使用Resteasy编写一个可以返回JSON和XML的应用程序,但可以选择默认为XML。这是我的方法:@GET@Path("/content")@Produces({MediaType.APPLICATION_XML,MediaType.APPLICATION_JSON})publicStringcontentListRequestXm…
    2025-04-161
  • 代码不会停止运行,在 Java 中

    thecodedoesn'tstoprunning,inJava我正在用Java解决项目Euler中的问题10,即"Thesumoftheprimesbelow10is2+3+5+7=17.Findthesumofalltheprimesbelowtwomillion."我的代码是packageprojecteuler_1;importjava.math.BigInteger;importjava…
    2025-04-161
  • Out of memory java heap space

    Outofmemoryjavaheapspace我正在尝试将大量文件从服务器发送到多个客户端。当我尝试发送大小为700mb的文件时,它显示了"OutOfMemoryjavaheapspace"错误。我正在使用Netbeans7.1.2版本。我还在属性中尝试了VMoption。但仍然发生同样的错误。我认为阅读整个文件存在一些问题。下面的代码最多可用于300mb。请给我一些建议。提前致谢publicc…
    2025-04-161
  • Log4j 记录到共享日志文件

    Log4jLoggingtoaSharedLogFile有没有办法将log4j日志记录事件写入也被其他应用程序写入的日志文件。其他应用程序可以是非Java应用程序。有什么缺点?锁定问题?格式化?Log4j有一个SocketAppender,它将向服务发送事件,您可以自己实现或使用与Log4j捆绑的简单实现。它还支持syslogd和Windows事件日志,这对于尝试将日志输出与来自非Java应用程序…
    2025-04-161