mirror of
				https://github.com/esphome/esphome.git
				synced 2025-10-25 21:23:53 +01:00 
			
		
		
		
	Try to fix serial ports listing (#1155)
This commit is contained in:
		| @@ -12,24 +12,12 @@ from esphome.const import CONF_BAUD_RATE, CONF_BROKER, CONF_LOGGER, CONF_OTA, \ | |||||||
|     CONF_PASSWORD, CONF_PORT, CONF_ESPHOME, CONF_PLATFORMIO_OPTIONS |     CONF_PASSWORD, CONF_PORT, CONF_ESPHOME, CONF_PLATFORMIO_OPTIONS | ||||||
| from esphome.core import CORE, EsphomeError, coroutine, coroutine_with_priority | from esphome.core import CORE, EsphomeError, coroutine, coroutine_with_priority | ||||||
| from esphome.helpers import color, indent | from esphome.helpers import color, indent | ||||||
| from esphome.util import run_external_command, run_external_process, safe_print, list_yaml_files | from esphome.util import run_external_command, run_external_process, safe_print, list_yaml_files, \ | ||||||
|  |     get_serial_ports | ||||||
|  |  | ||||||
| _LOGGER = logging.getLogger(__name__) | _LOGGER = logging.getLogger(__name__) | ||||||
|  |  | ||||||
|  |  | ||||||
| def get_serial_ports(): |  | ||||||
|     # from https://github.com/pyserial/pyserial/blob/master/serial/tools/list_ports.py |  | ||||||
|     from serial.tools.list_ports import comports |  | ||||||
|     result = [] |  | ||||||
|     for port, desc, info in comports(include_links=True): |  | ||||||
|         if not port: |  | ||||||
|             continue |  | ||||||
|         if "VID:PID" in info: |  | ||||||
|             result.append((port, desc)) |  | ||||||
|     result.sort(key=lambda x: x[0]) |  | ||||||
|     return result |  | ||||||
|  |  | ||||||
|  |  | ||||||
| def choose_prompt(options): | def choose_prompt(options): | ||||||
|     if not options: |     if not options: | ||||||
|         raise EsphomeError("Found no valid options for upload/logging, please make sure relevant " |         raise EsphomeError("Found no valid options for upload/logging, please make sure relevant " | ||||||
| @@ -60,8 +48,8 @@ def choose_prompt(options): | |||||||
|  |  | ||||||
| def choose_upload_log_host(default, check_default, show_ota, show_mqtt, show_api): | def choose_upload_log_host(default, check_default, show_ota, show_mqtt, show_api): | ||||||
|     options = [] |     options = [] | ||||||
|     for res, desc in get_serial_ports(): |     for port in get_serial_ports(): | ||||||
|         options.append((f"{res} ({desc})", res)) |         options.append((f"{port.path} ({port.description})", port.path)) | ||||||
|     if (show_ota and 'ota' in CORE.config) or (show_api and 'api' in CORE.config): |     if (show_ota and 'ota' in CORE.config) or (show_api and 'api' in CORE.config): | ||||||
|         options.append((f"Over The Air ({CORE.address})", CORE.address)) |         options.append((f"Over The Air ({CORE.address})", CORE.address)) | ||||||
|         if default == 'OTA': |         if default == 'OTA': | ||||||
|   | |||||||
| @@ -26,11 +26,10 @@ import tornado.web | |||||||
| import tornado.websocket | import tornado.websocket | ||||||
|  |  | ||||||
| from esphome import const, util | from esphome import const, util | ||||||
| from esphome.__main__ import get_serial_ports |  | ||||||
| from esphome.helpers import mkdir_p, get_bool_env, run_system_command | from esphome.helpers import mkdir_p, get_bool_env, run_system_command | ||||||
| from esphome.storage_json import EsphomeStorageJSON, StorageJSON, \ | from esphome.storage_json import EsphomeStorageJSON, StorageJSON, \ | ||||||
|     esphome_storage_path, ext_storage_path, trash_storage_path |     esphome_storage_path, ext_storage_path, trash_storage_path | ||||||
| from esphome.util import shlex_quote | from esphome.util import shlex_quote, get_serial_ports | ||||||
|  |  | ||||||
| # pylint: disable=unused-import, wrong-import-order | # pylint: disable=unused-import, wrong-import-order | ||||||
| from typing import Optional  # noqa | from typing import Optional  # noqa | ||||||
| @@ -313,14 +312,15 @@ class SerialPortRequestHandler(BaseHandler): | |||||||
|     def get(self): |     def get(self): | ||||||
|         ports = get_serial_ports() |         ports = get_serial_ports() | ||||||
|         data = [] |         data = [] | ||||||
|         for port, desc in ports: |         for port in ports: | ||||||
|             if port == '/dev/ttyAMA0': |             desc = port.description | ||||||
|  |             if port.path == '/dev/ttyAMA0': | ||||||
|                 desc = 'UART pins on GPIO header' |                 desc = 'UART pins on GPIO header' | ||||||
|             split_desc = desc.split(' - ') |             split_desc = desc.split(' - ') | ||||||
|             if len(split_desc) == 2 and split_desc[0] == split_desc[1]: |             if len(split_desc) == 2 and split_desc[0] == split_desc[1]: | ||||||
|                 # Some serial ports repeat their values |                 # Some serial ports repeat their values | ||||||
|                 desc = split_desc[0] |                 desc = split_desc[0] | ||||||
|             data.append({'port': port, 'desc': desc}) |             data.append({'port': port.path, 'desc': desc}) | ||||||
|         data.append({'port': 'OTA', 'desc': 'Over-The-Air'}) |         data.append({'port': 'OTA', 'desc': 'Over-The-Air'}) | ||||||
|         data.sort(key=lambda x: x['port'], reverse=True) |         data.sort(key=lambda x: x['port'], reverse=True) | ||||||
|         self.write(json.dumps(data)) |         self.write(json.dumps(data)) | ||||||
|   | |||||||
| @@ -1,4 +1,4 @@ | |||||||
| from typing import Union | from typing import Union, List | ||||||
|  |  | ||||||
| import collections | import collections | ||||||
| import io | import io | ||||||
| @@ -7,6 +7,7 @@ import os | |||||||
| import re | import re | ||||||
| import subprocess | import subprocess | ||||||
| import sys | import sys | ||||||
|  | from pathlib import Path | ||||||
|  |  | ||||||
| from esphome import const | from esphome import const | ||||||
|  |  | ||||||
| @@ -256,3 +257,34 @@ def filter_yaml_files(files): | |||||||
|     files = [f for f in files if os.path.basename(f) != 'secrets.yaml'] |     files = [f for f in files if os.path.basename(f) != 'secrets.yaml'] | ||||||
|     files = [f for f in files if not os.path.basename(f).startswith('.')] |     files = [f for f in files if not os.path.basename(f).startswith('.')] | ||||||
|     return files |     return files | ||||||
|  |  | ||||||
|  |  | ||||||
|  | class SerialPort: | ||||||
|  |     def __init__(self, path: str, description: str): | ||||||
|  |         self.path = path | ||||||
|  |         self.description = description | ||||||
|  |  | ||||||
|  |  | ||||||
|  | # from https://github.com/pyserial/pyserial/blob/master/serial/tools/list_ports.py | ||||||
|  | def get_serial_ports() -> List[SerialPort]: | ||||||
|  |     from serial.tools.list_ports import comports | ||||||
|  |     result = [] | ||||||
|  |     for port, desc, info in comports(include_links=True): | ||||||
|  |         if not port: | ||||||
|  |             continue | ||||||
|  |         if "VID:PID" in info: | ||||||
|  |             result.append(SerialPort(path=port, description=desc)) | ||||||
|  |     # Also add objects in /dev/serial/by-id/ | ||||||
|  |     # ref: https://github.com/esphome/issues/issues/1346 | ||||||
|  |  | ||||||
|  |     by_id_path = Path('/dev/serial/by-id') | ||||||
|  |     if sys.platform.lower().startswith('linux') and by_id_path.exists(): | ||||||
|  |         from serial.tools.list_ports_linux import SysFS | ||||||
|  |  | ||||||
|  |         for path in by_id_path.glob('*'): | ||||||
|  |             device = SysFS(path) | ||||||
|  |             if device.subsystem == 'platform': | ||||||
|  |                 result.append(SerialPort(path=str(path), description=info[1])) | ||||||
|  |  | ||||||
|  |     result.sort(key=lambda x: x.path) | ||||||
|  |     return result | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user