From 5f9080dac97f5783c34be5e88b1b8d0cac290e26 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 4 Aug 2025 15:57:29 -1000 Subject: [PATCH] fix --device OTA --- esphome/__main__.py | 126 ++++++++++++++++++++++++++------------------ 1 file changed, 74 insertions(+), 52 deletions(-) diff --git a/esphome/__main__.py b/esphome/__main__.py index 6bb50864b1..61dee02e49 100644 --- a/esphome/__main__.py +++ b/esphome/__main__.py @@ -107,30 +107,50 @@ def choose_prompt(options, purpose: str = None): def choose_upload_log_host( - default, check_default, show_ota, show_mqtt, show_api, purpose: str = None -): + default: list[str] | str | None, + check_default: str | None, + show_ota: bool, + show_mqtt: bool, + show_api: bool, + purpose: str | None = None, +) -> list[str]: + # Convert to list for uniform handling + defaults = [default] if isinstance(default, str) else default or [] + + # If devices specified, resolve them + if defaults: + resolved: list[str] = [] + for device in defaults: + if device == "SERIAL": + options = [ + (f"{port.path} ({port.description})", port.path) + for port in get_serial_ports() + ] + resolved.append(choose_prompt(options, purpose=purpose)) + elif device == "OTA": + if (show_ota and "ota" in CORE.config) or ( + show_api and "api" in CORE.config + ): + resolved.append(CORE.address) + elif show_mqtt and has_mqtt_logging(): + resolved.append("MQTT") + else: + resolved.append(device) + return resolved + + # No devices specified, show interactive chooser options = [ (f"{port.path} ({port.description})", port.path) for port in get_serial_ports() ] - if default == "SERIAL": - return choose_prompt(options, purpose=purpose) if (show_ota and "ota" in CORE.config) or (show_api and "api" in CORE.config): options.append((f"Over The Air ({CORE.address})", CORE.address)) - if default == "OTA": - return CORE.address - if ( - show_mqtt - and (mqtt_config := CORE.config.get(CONF_MQTT)) - and mqtt_logging_enabled(mqtt_config) - ): + if show_mqtt and has_mqtt_logging(): + mqtt_config = CORE.config[CONF_MQTT] options.append((f"MQTT ({mqtt_config[CONF_BROKER]})", "MQTT")) - if default == "OTA": - return "MQTT" - if default is not None: - return default + if check_default is not None and check_default in [opt[1] for opt in options]: - return check_default - return choose_prompt(options, purpose=purpose) + return [check_default] + return [choose_prompt(options, purpose=purpose)] def mqtt_logging_enabled(mqtt_config): @@ -142,6 +162,13 @@ def mqtt_logging_enabled(mqtt_config): return log_topic.get(CONF_LEVEL, None) != "NONE" +def has_mqtt_logging() -> bool: + """Check if MQTT logging is available.""" + return (mqtt_config := CORE.config.get(CONF_MQTT)) and mqtt_logging_enabled( + mqtt_config + ) + + def get_port_type(port: str) -> str: if port.startswith("/") or port.startswith("COM"): return "SERIAL" @@ -507,19 +534,18 @@ def command_compile(args: ArgsProtocol, config: ConfigType) -> int | None: def command_upload(args: ArgsProtocol, config: ConfigType) -> int | None: - # No devices specified, use the interactive chooser - devices: list[str] = args.device or [ - choose_upload_log_host( - default=None, - check_default=None, - show_ota=True, - show_mqtt=False, - show_api=False, - purpose="uploading", - ) - ] + # Get devices, resolving special identifiers like OTA + devices = choose_upload_log_host( + default=args.device, + check_default=None, + show_ota=True, + show_mqtt=False, + show_api=False, + purpose="uploading", + ) # Try each device until one succeeds + exit_code = 1 for device in devices: _LOGGER.info("Uploading to %s", device) exit_code = upload_program(config, args, device) @@ -542,17 +568,15 @@ def command_discover(args: ArgsProtocol, config: ConfigType) -> int | None: def command_logs(args: ArgsProtocol, config: ConfigType) -> int | None: - # No devices specified, use the interactive chooser - devices = args.device or [ - choose_upload_log_host( - default=None, - check_default=None, - show_ota=False, - show_mqtt=True, - show_api=True, - purpose="logging", - ) - ] + # Get devices, resolving special identifiers like OTA + devices = choose_upload_log_host( + default=args.device, + check_default=None, + show_ota=False, + show_mqtt=True, + show_api=True, + purpose="logging", + ) return show_logs(config, args, devices) @@ -573,17 +597,15 @@ def command_run(args: ArgsProtocol, config: ConfigType) -> int | None: program_path = idedata.raw["prog_path"] return run_external_process(program_path) - # No devices specified, use the interactive chooser - devices = args.device or [ - choose_upload_log_host( - default=None, - check_default=None, - show_ota=True, - show_mqtt=False, - show_api=True, - purpose="uploading", - ) - ] + # Get devices, resolving special identifiers like OTA + devices = choose_upload_log_host( + default=args.device, + check_default=None, + show_ota=True, + show_mqtt=False, + show_api=True, + purpose="uploading", + ) # Try each device for upload until one succeeds successful_device: str | None = None @@ -604,7 +626,7 @@ def command_run(args: ArgsProtocol, config: ConfigType) -> int | None: return 0 # For logs, prefer the device we successfully uploaded to - port = choose_upload_log_host( + devices = choose_upload_log_host( default=successful_device, check_default=successful_device, show_ota=False, @@ -612,7 +634,7 @@ def command_run(args: ArgsProtocol, config: ConfigType) -> int | None: show_api=True, purpose="logging", ) - return show_logs(config, args, [port]) + return show_logs(config, args, devices) def command_clean_mqtt(args: ArgsProtocol, config: ConfigType) -> int | None: