1
0
mirror of https://github.com/esphome/esphome.git synced 2025-09-07 05:42:20 +01:00

[wizard] extend the wizard dashboard API to allow upload and empty config options (#10203)

This commit is contained in:
Maxim Raznatovski
2025-09-04 04:02:49 +02:00
committed by GitHub
parent 23c6650902
commit c03d978b46
3 changed files with 164 additions and 33 deletions

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
import asyncio import asyncio
import base64 import base64
import binascii
from collections.abc import Callable, Iterable from collections.abc import Callable, Iterable
import datetime import datetime
import functools import functools
@@ -490,7 +491,17 @@ class WizardRequestHandler(BaseHandler):
kwargs = { kwargs = {
k: v k: v
for k, v in json.loads(self.request.body.decode()).items() for k, v in json.loads(self.request.body.decode()).items()
if k in ("name", "platform", "board", "ssid", "psk", "password") if k
in (
"type",
"name",
"platform",
"board",
"ssid",
"psk",
"password",
"file_content",
)
} }
if not kwargs["name"]: if not kwargs["name"]:
self.set_status(422) self.set_status(422)
@@ -498,19 +509,65 @@ class WizardRequestHandler(BaseHandler):
self.write(json.dumps({"error": "Name is required"})) self.write(json.dumps({"error": "Name is required"}))
return return
if "type" not in kwargs:
# Default to basic wizard type for backwards compatibility
kwargs["type"] = "basic"
kwargs["friendly_name"] = kwargs["name"] kwargs["friendly_name"] = kwargs["name"]
kwargs["name"] = friendly_name_slugify(kwargs["friendly_name"]) kwargs["name"] = friendly_name_slugify(kwargs["friendly_name"])
if kwargs["type"] == "basic":
kwargs["ota_password"] = secrets.token_hex(16) kwargs["ota_password"] = secrets.token_hex(16)
noise_psk = secrets.token_bytes(32) noise_psk = secrets.token_bytes(32)
kwargs["api_encryption_key"] = base64.b64encode(noise_psk).decode() kwargs["api_encryption_key"] = base64.b64encode(noise_psk).decode()
elif kwargs["type"] == "upload":
try:
kwargs["file_text"] = base64.b64decode(kwargs["file_content"]).decode(
"utf-8"
)
except (binascii.Error, UnicodeDecodeError):
self.set_status(422)
self.set_header("content-type", "application/json")
self.write(
json.dumps({"error": "The uploaded file is not correctly encoded."})
)
return
elif kwargs["type"] != "empty":
self.set_status(422)
self.set_header("content-type", "application/json")
self.write(
json.dumps(
{"error": f"Invalid wizard type specified: {kwargs['type']}"}
)
)
return
filename = f"{kwargs['name']}.yaml" filename = f"{kwargs['name']}.yaml"
destination = settings.rel_path(filename) destination = settings.rel_path(filename)
wizard.wizard_write(path=destination, **kwargs)
self.set_status(200) # Check if destination file already exists
self.set_header("content-type", "application/json") if os.path.exists(destination):
self.write(json.dumps({"configuration": filename})) self.set_status(409) # Conflict status code
self.finish() self.set_header("content-type", "application/json")
self.write(
json.dumps({"error": f"Configuration file '{filename}' already exists"})
)
self.finish()
return
success = wizard.wizard_write(path=destination, **kwargs)
if success:
self.set_status(200)
self.set_header("content-type", "application/json")
self.write(json.dumps({"configuration": filename}))
self.finish()
else:
self.set_status(500)
self.set_header("content-type", "application/json")
self.write(
json.dumps(
{"error": "Failed to write configuration, see logs for details"}
)
)
self.finish()
class ImportRequestHandler(BaseHandler): class ImportRequestHandler(BaseHandler):

View File

@@ -189,32 +189,45 @@ def wizard_write(path, **kwargs):
from esphome.components.rtl87xx import boards as rtl87xx_boards from esphome.components.rtl87xx import boards as rtl87xx_boards
name = kwargs["name"] name = kwargs["name"]
board = kwargs["board"] if kwargs["type"] == "empty":
file_text = ""
# Will be updated later after editing the file
hardware = "UNKNOWN"
elif kwargs["type"] == "upload":
file_text = kwargs["file_text"]
hardware = "UNKNOWN"
else: # "basic"
board = kwargs["board"]
for key in ("ssid", "psk", "password", "ota_password"): for key in ("ssid", "psk", "password", "ota_password"):
if key in kwargs: if key in kwargs:
kwargs[key] = sanitize_double_quotes(kwargs[key]) kwargs[key] = sanitize_double_quotes(kwargs[key])
if "platform" not in kwargs:
if board in esp8266_boards.BOARDS:
platform = "ESP8266"
elif board in esp32_boards.BOARDS:
platform = "ESP32"
elif board in rp2040_boards.BOARDS:
platform = "RP2040"
elif board in bk72xx_boards.BOARDS:
platform = "BK72XX"
elif board in ln882x_boards.BOARDS:
platform = "LN882X"
elif board in rtl87xx_boards.BOARDS:
platform = "RTL87XX"
else:
safe_print(color(AnsiFore.RED, f'The board "{board}" is unknown.'))
return False
kwargs["platform"] = platform
hardware = kwargs["platform"]
file_text = wizard_file(**kwargs)
if "platform" not in kwargs: # Check if file already exists to prevent overwriting
if board in esp8266_boards.BOARDS: if os.path.exists(path) and os.path.isfile(path):
platform = "ESP8266" safe_print(color(AnsiFore.RED, f'The file "{path}" already exists.'))
elif board in esp32_boards.BOARDS: return False
platform = "ESP32"
elif board in rp2040_boards.BOARDS:
platform = "RP2040"
elif board in bk72xx_boards.BOARDS:
platform = "BK72XX"
elif board in ln882x_boards.BOARDS:
platform = "LN882X"
elif board in rtl87xx_boards.BOARDS:
platform = "RTL87XX"
else:
safe_print(color(AnsiFore.RED, f'The board "{board}" is unknown.'))
return False
kwargs["platform"] = platform
hardware = kwargs["platform"]
write_file(path, wizard_file(**kwargs)) write_file(path, file_text)
storage = StorageJSON.from_wizard(name, name, f"{name}.local", hardware) storage = StorageJSON.from_wizard(name, name, f"{name}.local", hardware)
storage_path = ext_storage_path(os.path.basename(path)) storage_path = ext_storage_path(os.path.basename(path))
storage.save(storage_path) storage.save(storage_path)

View File

@@ -17,6 +17,7 @@ import esphome.wizard as wz
@pytest.fixture @pytest.fixture
def default_config(): def default_config():
return { return {
"type": "basic",
"name": "test-name", "name": "test-name",
"platform": "ESP8266", "platform": "ESP8266",
"board": "esp01_1m", "board": "esp01_1m",
@@ -125,6 +126,47 @@ def test_wizard_write_sets_platform(default_config, tmp_path, monkeypatch):
assert "esp8266:" in generated_config assert "esp8266:" in generated_config
def test_wizard_empty_config(tmp_path, monkeypatch):
"""
The wizard should be able to create an empty configuration
"""
# Given
empty_config = {
"type": "empty",
"name": "test-empty",
}
monkeypatch.setattr(wz, "write_file", MagicMock())
monkeypatch.setattr(CORE, "config_path", os.path.dirname(tmp_path))
# When
wz.wizard_write(tmp_path, **empty_config)
# Then
generated_config = wz.write_file.call_args.args[1]
assert generated_config == ""
def test_wizard_upload_config(tmp_path, monkeypatch):
"""
The wizard should be able to import an base64 encoded configuration
"""
# Given
empty_config = {
"type": "upload",
"name": "test-upload",
"file_text": "# imported file 📁\n\n",
}
monkeypatch.setattr(wz, "write_file", MagicMock())
monkeypatch.setattr(CORE, "config_path", os.path.dirname(tmp_path))
# When
wz.wizard_write(tmp_path, **empty_config)
# Then
generated_config = wz.write_file.call_args.args[1]
assert generated_config == "# imported file 📁\n\n"
def test_wizard_write_defaults_platform_from_board_esp8266( def test_wizard_write_defaults_platform_from_board_esp8266(
default_config, tmp_path, monkeypatch default_config, tmp_path, monkeypatch
): ):
@@ -471,3 +513,22 @@ def test_wizard_requires_valid_ssid(tmpdir, monkeypatch, wizard_answers):
# Then # Then
assert retval == 0 assert retval == 0
def test_wizard_write_protects_existing_config(tmpdir, default_config, monkeypatch):
"""
The wizard_write function should not overwrite existing config files and return False
"""
# Given
config_file = tmpdir.join("test.yaml")
original_content = "# Original config content\n"
config_file.write(original_content)
monkeypatch.setattr(CORE, "config_path", str(tmpdir))
# When
result = wz.wizard_write(str(config_file), **default_config)
# Then
assert result is False # Should return False when file exists
assert config_file.read() == original_content