1
0
mirror of https://github.com/esphome/esphome.git synced 2025-10-22 11:43:51 +01:00
This commit is contained in:
J. Nick Koston
2025-07-31 19:39:08 -10:00
parent 1161bfcc93
commit 5fac039a06

View File

@@ -44,6 +44,7 @@ from esphome.const import (
from esphome.core import CORE, EsphomeError, coroutine from esphome.core import CORE, EsphomeError, coroutine
from esphome.helpers import get_bool_env, indent, is_ip_address from esphome.helpers import get_bool_env, indent, is_ip_address
from esphome.log import AnsiFore, color, setup_log from esphome.log import AnsiFore, color, setup_log
from esphome.types import ConfigType
from esphome.util import ( from esphome.util import (
get_serial_ports, get_serial_ports,
list_yaml_files, list_yaml_files,
@@ -123,7 +124,7 @@ def mqtt_logging_enabled(mqtt_config):
return log_topic.get(CONF_LEVEL, None) != "NONE" return log_topic.get(CONF_LEVEL, None) != "NONE"
def get_port_type(port): def get_port_type(port: str) -> str:
if port.startswith("/") or port.startswith("COM"): if port.startswith("/") or port.startswith("COM"):
return "SERIAL" return "SERIAL"
if port == "MQTT": if port == "MQTT":
@@ -131,7 +132,7 @@ def get_port_type(port):
return "NETWORK" return "NETWORK"
def run_miniterm(config, port, args): def run_miniterm(config: ConfigType, port: str, args) -> int:
from aioesphomeapi import LogParser from aioesphomeapi import LogParser
import serial import serial
@@ -249,7 +250,7 @@ def compile_program(args, config):
return 0 if idedata is not None else 1 return 0 if idedata is not None else 1
def upload_using_esptool(config, port, file, speed): def upload_using_esptool(config: ConfigType, port: str, file: str, speed: int):
from esphome import platformio_api from esphome import platformio_api
first_baudrate = speed or config[CONF_ESPHOME][CONF_PLATFORMIO_OPTIONS].get( first_baudrate = speed or config[CONF_ESPHOME][CONF_PLATFORMIO_OPTIONS].get(
@@ -314,7 +315,7 @@ def upload_using_esptool(config, port, file, speed):
return run_esptool(115200) return run_esptool(115200)
def upload_using_platformio(config, port): def upload_using_platformio(config: ConfigType, port: str):
from esphome import platformio_api from esphome import platformio_api
upload_args = ["-t", "upload", "-t", "nobuild"] upload_args = ["-t", "upload", "-t", "nobuild"]
@@ -323,7 +324,7 @@ def upload_using_platformio(config, port):
return platformio_api.run_platformio_cli_run(config, CORE.verbose, *upload_args) return platformio_api.run_platformio_cli_run(config, CORE.verbose, *upload_args)
def check_permissions(port): def check_permissions(port: str):
if os.name == "posix" and get_port_type(port) == "SERIAL": if os.name == "posix" and get_port_type(port) == "SERIAL":
# Check if we can open selected serial port # Check if we can open selected serial port
if not os.access(port, os.F_OK): if not os.access(port, os.F_OK):
@@ -341,7 +342,7 @@ def check_permissions(port):
) )
def upload_program(config, args, host: str): def upload_program(config: ConfigType, args, host: str):
try: try:
module = importlib.import_module("esphome.components." + CORE.target_platform) module = importlib.import_module("esphome.components." + CORE.target_platform)
if getattr(module, "upload_program")(config, args, host): if getattr(module, "upload_program")(config, args, host):
@@ -402,7 +403,7 @@ def upload_program(config, args, host: str):
return espota2.run_ota(host, remote_port, password, CORE.firmware_bin) return espota2.run_ota(host, remote_port, password, CORE.firmware_bin)
def show_logs(config, args, devices: list[str]): def show_logs(config: ConfigType, args, devices: list[str]) -> int | None:
if "logger" not in config: if "logger" not in config:
raise EsphomeError("Logger is not configured!") raise EsphomeError("Logger is not configured!")
@@ -522,21 +523,18 @@ def command_discover(args, config):
raise EsphomeError("No discover method configured (mqtt)") raise EsphomeError("No discover method configured (mqtt)")
def command_logs(args, config) -> int: def command_logs(args, config) -> int | None:
devices = args.device or [] # No devices specified, use the interactive chooser
if not devices: devices = args.device or [
# No devices specified, use the interactive chooser choose_upload_log_host(
devices = [ default=None,
choose_upload_log_host( check_default=None,
default=None, show_ota=False,
check_default=None, show_mqtt=True,
show_ota=False, show_api=True,
show_mqtt=True, purpose="logging",
show_api=True, )
purpose="logging", ]
)
]
return show_logs(config, args, devices) return show_logs(config, args, devices)
@@ -557,22 +555,20 @@ def command_run(args, config):
program_path = idedata.raw["prog_path"] program_path = idedata.raw["prog_path"]
return run_external_process(program_path) return run_external_process(program_path)
devices = args.device or [] # No devices specified, use the interactive chooser
if not devices: devices = args.device or [
# No devices specified, use the interactive chooser choose_upload_log_host(
devices = [ default=None,
choose_upload_log_host( check_default=None,
default=None, show_ota=True,
check_default=None, show_mqtt=False,
show_ota=True, show_api=True,
show_mqtt=False, purpose="uploading",
show_api=True, )
purpose="uploading", ]
)
]
# Try each device for upload until one succeeds # Try each device for upload until one succeeds
successful_device = None successful_device: str | None = None
for device in devices: for device in devices:
_LOGGER.info("Uploading to %s", device) _LOGGER.info("Uploading to %s", device)
exit_code = upload_program(config, args, device) exit_code = upload_program(config, args, device)