from __future__ import annotations import asyncio import base64 import datetime import functools import gzip import hashlib import json import logging import os import secrets import shutil import subprocess import threading import time from collections.abc import Iterable from pathlib import Path from typing import TYPE_CHECKING, Any, Callable, TypeVar import tornado import tornado.concurrent import tornado.gen import tornado.httpserver import tornado.httputil import tornado.ioloop import tornado.iostream import tornado.netutil import tornado.process import tornado.queues import tornado.web import tornado.websocket import yaml from tornado.log import access_log from yaml.nodes import Node from esphome import const, platformio_api, yaml_util from esphome.helpers import get_bool_env, mkdir_p from esphome.storage_json import StorageJSON, ext_storage_path, trash_storage_path from esphome.util import get_serial_ports, shlex_quote from esphome.yaml_util import FastestAvailableSafeLoader from .const import DASHBOARD_COMMAND from .core import DASHBOARD from .entries import EntryState, entry_state_to_bool from .util.file import write_file from .util.subprocess import async_run_system_command from .util.text import friendly_name_slugify if TYPE_CHECKING: from requests import Response _LOGGER = logging.getLogger(__name__) ENV_DEV = "ESPHOME_DASHBOARD_DEV" COOKIE_AUTHENTICATED_YES = b"yes" AUTH_COOKIE_NAME = "authenticated" settings = DASHBOARD.settings def template_args() -> dict[str, Any]: version = const.__version__ if "b" in version: docs_link = "https://beta.esphome.io/" elif "dev" in version: docs_link = "https://next.esphome.io/" else: docs_link = "https://www.esphome.io/" return { "version": version, "docs_link": docs_link, "get_static_file_url": get_static_file_url, "relative_url": settings.relative_url, "streamer_mode": settings.streamer_mode, "config_dir": settings.config_dir, } T = TypeVar("T", bound=Callable[..., Any]) def authenticated(func: T) -> T: @functools.wraps(func) def decorator(self, *args: Any, **kwargs: Any): if not is_authenticated(self): self.redirect("./login") return None return func(self, *args, **kwargs) return decorator def is_authenticated(handler: BaseHandler) -> bool: """Check if the request is authenticated.""" if settings.on_ha_addon: # Handle ingress - disable auth on ingress port # X-HA-Ingress is automatically stripped on the non-ingress server in nginx header = handler.request.headers.get("X-HA-Ingress", "NO") if str(header) == "YES": return True if settings.using_auth: return handler.get_secure_cookie(AUTH_COOKIE_NAME) == COOKIE_AUTHENTICATED_YES return True def bind_config(func): def decorator(self, *args, **kwargs): configuration = self.get_argument("configuration") kwargs = kwargs.copy() kwargs["configuration"] = configuration return func(self, *args, **kwargs) return decorator # pylint: disable=abstract-method class BaseHandler(tornado.web.RequestHandler): pass def websocket_class(cls): # pylint: disable=protected-access if not hasattr(cls, "_message_handlers"): cls._message_handlers = {} for _, method in cls.__dict__.items(): if hasattr(method, "_message_handler"): cls._message_handlers[method._message_handler] = method return cls def websocket_method(name): def wrap(fn): # pylint: disable=protected-access fn._message_handler = name return fn return wrap @websocket_class class EsphomeCommandWebSocket(tornado.websocket.WebSocketHandler): """Base class for ESPHome websocket commands.""" def __init__( self, application: tornado.web.Application, request: tornado.httputil.HTTPServerRequest, **kwargs: Any, ) -> None: """Initialize the websocket.""" super().__init__(application, request, **kwargs) self._proc = None self._queue = None self._is_closed = False # Windows doesn't support non-blocking pipes, # use Popen() with a reading thread instead self._use_popen = os.name == "nt" def open(self, *args: str, **kwargs: str) -> None: """Handle new WebSocket connection.""" # Ensure messages from the subprocess are sent immediately # to avoid a 200-500ms delay when nodelay is not set. self.set_nodelay(True) @authenticated async def on_message( # pylint: disable=invalid-overridden-method self, message: str ) -> None: # Since tornado 4.5, on_message is allowed to be a coroutine # Messages are always JSON, 500 when not json_message = json.loads(message) type_ = json_message["type"] # pylint: disable=no-member handlers = type(self)._message_handlers if type_ not in handlers: _LOGGER.warning("Requested unknown message type %s", type_) return await handlers[type_](self, json_message) @websocket_method("spawn") async def handle_spawn(self, json_message: dict[str, Any]) -> None: if self._proc is not None: # spawn can only be called once return command = await self.build_command(json_message) _LOGGER.info("Running command '%s'", " ".join(shlex_quote(x) for x in command)) if self._use_popen: self._queue = tornado.queues.Queue() # pylint: disable=consider-using-with self._proc = subprocess.Popen( command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ) stdout_thread = threading.Thread(target=self._stdout_thread) stdout_thread.daemon = True stdout_thread.start() else: self._proc = tornado.process.Subprocess( command, stdout=tornado.process.Subprocess.STREAM, stderr=subprocess.STDOUT, stdin=tornado.process.Subprocess.STREAM, close_fds=False, ) self._proc.set_exit_callback(self._proc_on_exit) tornado.ioloop.IOLoop.current().spawn_callback(self._redirect_stdout) @property def is_process_active(self) -> bool: return self._proc is not None and self._proc.returncode is None @websocket_method("stdin") async def handle_stdin(self, json_message: dict[str, Any]) -> None: if not self.is_process_active: return text: str = json_message["data"] data = text.encode("utf-8", "replace") _LOGGER.debug("< stdin: %s", data) self._proc.stdin.write(data) @tornado.gen.coroutine def _redirect_stdout(self) -> None: reg = b"[\n\r]" while True: try: if self._use_popen: data: bytes = yield self._queue.get() if data is None: self._proc_on_exit(self._proc.poll()) break else: data: bytes = yield self._proc.stdout.read_until_regex(reg) except tornado.iostream.StreamClosedError: break text = data.decode("utf-8", "replace") _LOGGER.debug("> stdout: %s", text) self.write_message({"event": "line", "data": text}) def _stdout_thread(self) -> None: if not self._use_popen: return while True: data = self._proc.stdout.readline() if data: data = data.replace(b"\r", b"") self._queue.put_nowait(data) if self._proc.poll() is not None: break self._proc.wait(1.0) self._queue.put_nowait(None) def _proc_on_exit(self, returncode: int) -> None: if not self._is_closed: # Check if the proc was not forcibly closed _LOGGER.info("Process exited with return code %s", returncode) self.write_message({"event": "exit", "code": returncode}) def on_close(self) -> None: # Check if proc exists (if 'start' has been run) if self.is_process_active: _LOGGER.debug("Terminating process") if self._use_popen: self._proc.terminate() else: self._proc.proc.terminate() # Shutdown proc on WS close self._is_closed = True async def build_command(self, json_message: dict[str, Any]) -> list[str]: raise NotImplementedError class EsphomePortCommandWebSocket(EsphomeCommandWebSocket): """Base class for commands that require a port.""" async def build_device_command( self, args: list[str], json_message: dict[str, Any] ) -> list[str]: """Build the command to run.""" dashboard = DASHBOARD entries = dashboard.entries configuration = json_message["configuration"] config_file = settings.rel_path(configuration) port = json_message["port"] if ( port == "OTA" # pylint: disable=too-many-boolean-expressions and (entry := entries.get(config_file)) and entry.loaded_integrations and "api" in entry.loaded_integrations ): if (mdns := dashboard.mdns_status) and ( address := await mdns.async_resolve_host(entry.name) ): # Use the IP address if available but only # if the API is loaded and the device is online # since MQTT logging will not work otherwise port = address elif ( entry.address and ( address_list := await dashboard.dns_cache.async_resolve( entry.address, time.monotonic() ) ) and not isinstance(address_list, Exception) ): # If mdns is not available, try to use the DNS cache port = address_list[0] return [ *DASHBOARD_COMMAND, *args, config_file, "--device", port, ] class EsphomeLogsHandler(EsphomePortCommandWebSocket): async def build_command(self, json_message: dict[str, Any]) -> list[str]: """Build the command to run.""" return await self.build_device_command(["logs"], json_message) class EsphomeRenameHandler(EsphomeCommandWebSocket): old_name: str async def build_command(self, json_message: dict[str, Any]) -> list[str]: config_file = settings.rel_path(json_message["configuration"]) self.old_name = json_message["configuration"] return [ *DASHBOARD_COMMAND, "rename", config_file, json_message["newName"], ] def _proc_on_exit(self, returncode): super()._proc_on_exit(returncode) if returncode != 0: return # Remove the old ping result from the cache entries = DASHBOARD.entries if entry := entries.get(self.old_name): entries.async_set_state(entry, EntryState.UNKNOWN) class EsphomeUploadHandler(EsphomePortCommandWebSocket): async def build_command(self, json_message: dict[str, Any]) -> list[str]: """Build the command to run.""" return await self.build_device_command(["upload"], json_message) class EsphomeRunHandler(EsphomePortCommandWebSocket): async def build_command(self, json_message: dict[str, Any]) -> list[str]: """Build the command to run.""" return await self.build_device_command(["run"], json_message) class EsphomeCompileHandler(EsphomeCommandWebSocket): async def build_command(self, json_message: dict[str, Any]) -> list[str]: config_file = settings.rel_path(json_message["configuration"]) command = [*DASHBOARD_COMMAND, "compile"] if json_message.get("only_generate", False): command.append("--only-generate") command.append(config_file) return command class EsphomeValidateHandler(EsphomeCommandWebSocket): async def build_command(self, json_message: dict[str, Any]) -> list[str]: config_file = settings.rel_path(json_message["configuration"]) command = [*DASHBOARD_COMMAND, "config", config_file] if not settings.streamer_mode: command.append("--show-secrets") return command class EsphomeCleanMqttHandler(EsphomeCommandWebSocket): async def build_command(self, json_message: dict[str, Any]) -> list[str]: config_file = settings.rel_path(json_message["configuration"]) return [*DASHBOARD_COMMAND, "clean-mqtt", config_file] class EsphomeCleanHandler(EsphomeCommandWebSocket): async def build_command(self, json_message: dict[str, Any]) -> list[str]: config_file = settings.rel_path(json_message["configuration"]) return [*DASHBOARD_COMMAND, "clean", config_file] class EsphomeVscodeHandler(EsphomeCommandWebSocket): async def build_command(self, json_message: dict[str, Any]) -> list[str]: return [*DASHBOARD_COMMAND, "-q", "vscode", "dummy"] class EsphomeAceEditorHandler(EsphomeCommandWebSocket): async def build_command(self, json_message: dict[str, Any]) -> list[str]: return [*DASHBOARD_COMMAND, "-q", "vscode", "--ace", settings.config_dir] class EsphomeUpdateAllHandler(EsphomeCommandWebSocket): async def build_command(self, json_message: dict[str, Any]) -> list[str]: return [*DASHBOARD_COMMAND, "update-all", settings.config_dir] class SerialPortRequestHandler(BaseHandler): @authenticated async def get(self) -> None: ports = await asyncio.get_running_loop().run_in_executor(None, get_serial_ports) data = [] for port in ports: desc = port.description if port.path == "/dev/ttyAMA0": desc = "UART pins on GPIO header" split_desc = desc.split(" - ") if len(split_desc) == 2 and split_desc[0] == split_desc[1]: # Some serial ports repeat their values desc = split_desc[0] data.append({"port": port.path, "desc": desc}) data.append({"port": "OTA", "desc": "Over-The-Air"}) data.sort(key=lambda x: x["port"], reverse=True) self.set_header("content-type", "application/json") self.write(json.dumps(data)) class WizardRequestHandler(BaseHandler): @authenticated def post(self) -> None: from esphome import wizard kwargs = { k: v for k, v in json.loads(self.request.body.decode()).items() if k in ("name", "platform", "board", "ssid", "psk", "password") } if not kwargs["name"]: self.set_status(422) self.set_header("content-type", "application/json") self.write(json.dumps({"error": "Name is required"})) return kwargs["friendly_name"] = kwargs["name"] kwargs["name"] = friendly_name_slugify(kwargs["friendly_name"]) kwargs["ota_password"] = secrets.token_hex(16) noise_psk = secrets.token_bytes(32) kwargs["api_encryption_key"] = base64.b64encode(noise_psk).decode() filename = f"{kwargs['name']}.yaml" destination = settings.rel_path(filename) wizard.wizard_write(path=destination, **kwargs) self.set_status(200) self.set_header("content-type", "application/json") self.write(json.dumps({"configuration": filename})) self.finish() class ImportRequestHandler(BaseHandler): @authenticated def post(self) -> None: from esphome.components.dashboard_import import import_config dashboard = DASHBOARD args = json.loads(self.request.body.decode()) try: name = args["name"] friendly_name = args.get("friendly_name") encryption = args.get("encryption", False) imported_device = next( ( res for res in dashboard.import_result.values() if res.device_name == name ), None, ) if imported_device is not None: network = imported_device.network if friendly_name is None: friendly_name = imported_device.friendly_name else: network = const.CONF_WIFI import_config( settings.rel_path(f"{name}.yaml"), name, friendly_name, args["project_name"], args["package_import_url"], network, encryption, ) # Make sure the device gets marked online right away dashboard.ping_request.set() except FileExistsError: self.set_status(500) self.write("File already exists") return except ValueError as e: _LOGGER.error(e) self.set_status(422) self.write("Invalid package url") return self.set_status(200) self.set_header("content-type", "application/json") self.write(json.dumps({"configuration": f"{name}.yaml"})) self.finish() class DownloadListRequestHandler(BaseHandler): @authenticated @bind_config def get(self, configuration: str | None = None) -> None: storage_path = ext_storage_path(configuration) storage_json = StorageJSON.load(storage_path) if storage_json is None: self.send_error(404) return from esphome.components.esp32 import VARIANTS as ESP32_VARIANTS downloads = [] platform: str = storage_json.target_platform.lower() if platform == const.PLATFORM_RP2040: from esphome.components.rp2040 import get_download_types as rp2040_types downloads = rp2040_types(storage_json) elif platform == const.PLATFORM_ESP8266: from esphome.components.esp8266 import get_download_types as esp8266_types downloads = esp8266_types(storage_json) elif platform.upper() in ESP32_VARIANTS: from esphome.components.esp32 import get_download_types as esp32_types downloads = esp32_types(storage_json) elif platform in (const.PLATFORM_RTL87XX, const.PLATFORM_BK72XX): from esphome.components.libretiny import ( get_download_types as libretiny_types, ) downloads = libretiny_types(storage_json) else: raise ValueError(f"Unknown platform {platform}") self.set_status(200) self.set_header("content-type", "application/json") self.write(json.dumps(downloads)) self.finish() return class DownloadBinaryRequestHandler(BaseHandler): def _load_file(self, path: str, compressed: bool) -> bytes: """Load a file from disk and compress it if requested.""" with open(path, "rb") as f: data = f.read() if compressed: return gzip.compress(data, 9) return data @authenticated @bind_config async def get(self, configuration: str | None = None) -> None: """Download a binary file.""" loop = asyncio.get_running_loop() compressed = self.get_argument("compressed", "0") == "1" storage_path = ext_storage_path(configuration) storage_json = StorageJSON.load(storage_path) if storage_json is None: self.send_error(404) return # fallback to type=, but prioritize file= file_name = self.get_argument("type", None) file_name = self.get_argument("file", file_name) if file_name is None: self.send_error(400) return file_name = file_name.replace("..", "").lstrip("/") # get requested download name, or build it based on filename download_name = self.get_argument( "download", f"{storage_json.name}-{file_name}", ) path = os.path.dirname(storage_json.firmware_bin_path) path = os.path.join(path, file_name) if not Path(path).is_file(): args = ["esphome", "idedata", settings.rel_path(configuration)] rc, stdout, _ = await async_run_system_command(args) if rc != 0: self.send_error(404 if rc == 2 else 500) return idedata = platformio_api.IDEData(json.loads(stdout)) found = False for image in idedata.extra_flash_images: if image.path.endswith(file_name): path = image.path download_name = file_name found = True break if not found: self.send_error(404) return download_name = download_name + ".gz" if compressed else download_name self.set_header("Content-Type", "application/octet-stream") self.set_header( "Content-Disposition", f'attachment; filename="{download_name}"' ) self.set_header("Cache-Control", "no-cache") if not Path(path).is_file(): self.send_error(404) return data = await loop.run_in_executor(None, self._load_file, path, compressed) self.write(data) self.finish() class EsphomeVersionHandler(BaseHandler): @authenticated def get(self) -> None: self.set_header("Content-Type", "application/json") self.write(json.dumps({"version": const.__version__})) self.finish() class ListDevicesHandler(BaseHandler): @authenticated async def get(self) -> None: dashboard = DASHBOARD await dashboard.entries.async_request_update_entries() entries = dashboard.entries.async_all() self.set_header("content-type", "application/json") configured = {entry.name for entry in entries} self.write( json.dumps( { "configured": [entry.to_dict() for entry in entries], "importable": [ { "name": res.device_name, "friendly_name": res.friendly_name, "package_import_url": res.package_import_url, "project_name": res.project_name, "project_version": res.project_version, "network": res.network, } for res in dashboard.import_result.values() if res.device_name not in configured ], } ) ) class MainRequestHandler(BaseHandler): @authenticated def get(self) -> None: begin = bool(self.get_argument("begin", False)) if settings.using_password: # Simply accessing the xsrf_token sets the cookie for us self.xsrf_token # pylint: disable=pointless-statement else: self.clear_cookie("_xsrf") self.render( "index.template.html", begin=begin, **template_args(), login_enabled=settings.using_password, ) class PrometheusServiceDiscoveryHandler(BaseHandler): @authenticated async def get(self) -> None: dashboard = DASHBOARD await dashboard.entries.async_request_update_entries() entries = dashboard.entries.async_all() self.set_header("content-type", "application/json") sd = [] for entry in entries: if entry.web_port is None: continue labels = { "__meta_name": entry.name, "__meta_esp_platform": entry.target_platform, "__meta_esphome_version": entry.storage.esphome_version, } for integration in entry.storage.loaded_integrations: labels[f"__meta_integration_{integration}"] = "true" sd.append( { "targets": [ f"{entry.address}:{entry.web_port}", ], "labels": labels, } ) self.write(json.dumps(sd)) class BoardsRequestHandler(BaseHandler): @authenticated def get(self, platform: str) -> None: # filter all ESP32 variants by requested platform if platform.startswith("esp32"): from esphome.components.esp32.boards import BOARDS as ESP32_BOARDS boards = { k: v for k, v in ESP32_BOARDS.items() if v[const.KEY_VARIANT] == platform.upper() } elif platform == const.PLATFORM_ESP8266: from esphome.components.esp8266.boards import BOARDS as ESP8266_BOARDS boards = ESP8266_BOARDS elif platform == const.PLATFORM_RP2040: from esphome.components.rp2040.boards import BOARDS as RP2040_BOARDS boards = RP2040_BOARDS elif platform == const.PLATFORM_BK72XX: from esphome.components.bk72xx.boards import BOARDS as BK72XX_BOARDS boards = BK72XX_BOARDS elif platform == const.PLATFORM_RTL87XX: from esphome.components.rtl87xx.boards import BOARDS as RTL87XX_BOARDS boards = RTL87XX_BOARDS else: raise ValueError(f"Unknown platform {platform}") # map to a {board_name: board_title} dict platform_boards = {key: val[const.KEY_NAME] for key, val in boards.items()} # sort by board title boards_items = sorted(platform_boards.items(), key=lambda item: item[1]) output = [{"items": dict(boards_items)}] self.set_header("content-type", "application/json") self.write(json.dumps(output)) class PingRequestHandler(BaseHandler): @authenticated def get(self) -> None: dashboard = DASHBOARD dashboard.ping_request.set() if settings.status_use_mqtt: dashboard.mqtt_ping_request.set() self.set_header("content-type", "application/json") self.write( json.dumps( { entry.filename: entry_state_to_bool(entry.state) for entry in dashboard.entries.async_all() } ) ) class InfoRequestHandler(BaseHandler): @authenticated @bind_config async def get(self, configuration: str | None = None) -> None: yaml_path = settings.rel_path(configuration) dashboard = DASHBOARD entry = dashboard.entries.get(yaml_path) if not entry: self.set_status(404) return self.set_header("content-type", "application/json") self.write(entry.storage.to_json()) class EditRequestHandler(BaseHandler): @authenticated @bind_config async def get(self, configuration: str | None = None) -> None: """Get the content of a file.""" if not configuration.endswith((".yaml", ".yml")): self.send_error(404) return filename = settings.rel_path(configuration) if Path(filename).resolve().parent != settings.absolute_config_dir: self.send_error(404) return loop = asyncio.get_running_loop() content = await loop.run_in_executor( None, self._read_file, filename, configuration ) if content is not None: self.set_header("Content-Type", "application/yaml") self.write(content) def _read_file(self, filename: str, configuration: str) -> bytes | None: """Read a file and return the content as bytes.""" try: with open(file=filename, encoding="utf-8") as f: return f.read() except FileNotFoundError: if configuration in const.SECRETS_FILES: return "" self.set_status(404) return None def _write_file(self, filename: str, content: bytes) -> None: """Write a file with the given content.""" write_file(filename, content) @authenticated @bind_config async def post(self, configuration: str | None = None) -> None: """Write the content of a file.""" if not configuration.endswith((".yaml", ".yml")): self.send_error(404) return filename = settings.rel_path(configuration) if Path(filename).resolve().parent != settings.absolute_config_dir: self.send_error(404) return loop = asyncio.get_running_loop() await loop.run_in_executor(None, self._write_file, filename, self.request.body) # Ensure the StorageJSON is updated as well DASHBOARD.entries.async_schedule_storage_json_update(filename) self.set_status(200) class DeleteRequestHandler(BaseHandler): @authenticated @bind_config def post(self, configuration: str | None = None) -> None: config_file = settings.rel_path(configuration) storage_path = ext_storage_path(configuration) trash_path = trash_storage_path() mkdir_p(trash_path) shutil.move(config_file, os.path.join(trash_path, configuration)) storage_json = StorageJSON.load(storage_path) if storage_json is not None: # Delete build folder (if exists) name = storage_json.name build_folder = os.path.join(settings.config_dir, name) if build_folder is not None: shutil.rmtree(build_folder, os.path.join(trash_path, name)) class UndoDeleteRequestHandler(BaseHandler): @authenticated @bind_config def post(self, configuration: str | None = None) -> None: config_file = settings.rel_path(configuration) trash_path = trash_storage_path() shutil.move(os.path.join(trash_path, configuration), config_file) class LoginHandler(BaseHandler): def get(self) -> None: if is_authenticated(self): self.redirect("./") else: self.render_login_page() def render_login_page(self, error: str | None = None) -> None: self.render( "login.template.html", error=error, ha_addon=settings.using_ha_addon_auth, has_username=bool(settings.username), **template_args(), ) def _make_supervisor_auth_request(self) -> Response: """Make a request to the supervisor auth endpoint.""" import requests headers = {"X-Supervisor-Token": os.getenv("SUPERVISOR_TOKEN")} data = { "username": self.get_argument("username", ""), "password": self.get_argument("password", ""), } return requests.post( "http://supervisor/auth", headers=headers, json=data, timeout=30 ) async def post_ha_addon_login(self) -> None: loop = asyncio.get_running_loop() try: req = await loop.run_in_executor(None, self._make_supervisor_auth_request) except Exception as err: # pylint: disable=broad-except _LOGGER.warning("Error during Hass.io auth request: %s", err) self.set_status(500) self.render_login_page(error="Internal server error") return if req.status_code == 200: self._set_authenticated() self.redirect("/") return self.set_status(401) self.render_login_page(error="Invalid username or password") def _set_authenticated(self) -> None: """Set the authenticated cookie.""" self.set_secure_cookie(AUTH_COOKIE_NAME, COOKIE_AUTHENTICATED_YES) def post_native_login(self) -> None: username = self.get_argument("username", "") password = self.get_argument("password", "") if settings.check_password(username, password): self._set_authenticated() self.redirect("./") return error_str = ( "Invalid username or password" if settings.username else "Invalid password" ) self.set_status(401) self.render_login_page(error=error_str) async def post(self): if settings.using_ha_addon_auth: await self.post_ha_addon_login() else: self.post_native_login() class LogoutHandler(BaseHandler): @authenticated def get(self) -> None: self.clear_cookie(AUTH_COOKIE_NAME) self.redirect("./login") class SecretKeysRequestHandler(BaseHandler): @authenticated def get(self) -> None: filename = None for secret_filename in const.SECRETS_FILES: relative_filename = settings.rel_path(secret_filename) if os.path.isfile(relative_filename): filename = relative_filename break if filename is None: self.send_error(404) return secret_keys = list(yaml_util.load_yaml(filename, clear_secrets=False)) self.set_header("content-type", "application/json") self.write(json.dumps(secret_keys)) class SafeLoaderIgnoreUnknown(FastestAvailableSafeLoader): def ignore_unknown(self, node: Node) -> str: return f"{node.tag} {node.value}" def construct_yaml_binary(self, node: Node) -> str: return super().construct_yaml_binary(node).decode("ascii") SafeLoaderIgnoreUnknown.add_constructor(None, SafeLoaderIgnoreUnknown.ignore_unknown) SafeLoaderIgnoreUnknown.add_constructor( "tag:yaml.org,2002:binary", SafeLoaderIgnoreUnknown.construct_yaml_binary ) class JsonConfigRequestHandler(BaseHandler): @authenticated @bind_config async def get(self, configuration: str | None = None) -> None: filename = settings.rel_path(configuration) if not os.path.isfile(filename): self.send_error(404) return args = ["esphome", "config", filename, "--show-secrets"] rc, stdout, _ = await async_run_system_command(args) if rc != 0: self.send_error(422) return data = yaml.load(stdout, Loader=SafeLoaderIgnoreUnknown) self.set_header("content-type", "application/json") self.write(json.dumps(data)) self.finish() def get_base_frontend_path() -> str: if ENV_DEV not in os.environ: import esphome_dashboard return esphome_dashboard.where() static_path = os.environ[ENV_DEV] if not static_path.endswith("/"): static_path += "/" # This path can be relative, so resolve against the root or else templates don't work return os.path.abspath(os.path.join(os.getcwd(), static_path, "esphome_dashboard")) def get_static_path(*args: Iterable[str]) -> str: return os.path.join(get_base_frontend_path(), "static", *args) @functools.cache def get_static_file_url(name: str) -> str: base = f"./static/{name}" if ENV_DEV in os.environ: return base # Module imports can't deduplicate if stuff added to url if name == "js/esphome/index.js": import esphome_dashboard return base.replace("index.js", esphome_dashboard.entrypoint()) path = get_static_path(name) with open(path, "rb") as f_handle: hash_ = hashlib.md5(f_handle.read()).hexdigest()[:8] return f"{base}?hash={hash_}" def make_app(debug=get_bool_env(ENV_DEV)) -> tornado.web.Application: def log_function(handler: tornado.web.RequestHandler) -> None: if handler.get_status() < 400: log_method = access_log.info if isinstance(handler, SerialPortRequestHandler) and not debug: return if isinstance(handler, PingRequestHandler) and not debug: return elif handler.get_status() < 500: log_method = access_log.warning else: log_method = access_log.error request_time = 1000.0 * handler.request.request_time() # pylint: disable=protected-access log_method( "%d %s %.2fms", handler.get_status(), handler._request_summary(), request_time, ) class StaticFileHandler(tornado.web.StaticFileHandler): def get_cache_time( self, path: str, modified: datetime.datetime | None, mime_type: str ) -> int: """Override to customize cache control behavior.""" if debug: return 0 # Assets that are hashed have ?hash= in the URL, all javascript # filenames hashed so we can cache them for a long time if "hash" in self.request.arguments or "/javascript" in mime_type: return self.CACHE_MAX_AGE return super().get_cache_time(path, modified, mime_type) app_settings = { "debug": debug, "cookie_secret": settings.cookie_secret, "log_function": log_function, "websocket_ping_interval": 30.0, "template_path": get_base_frontend_path(), "xsrf_cookies": settings.using_password, } rel = settings.relative_url return tornado.web.Application( [ (f"{rel}", MainRequestHandler), (f"{rel}login", LoginHandler), (f"{rel}logout", LogoutHandler), (f"{rel}logs", EsphomeLogsHandler), (f"{rel}upload", EsphomeUploadHandler), (f"{rel}run", EsphomeRunHandler), (f"{rel}compile", EsphomeCompileHandler), (f"{rel}validate", EsphomeValidateHandler), (f"{rel}clean-mqtt", EsphomeCleanMqttHandler), (f"{rel}clean", EsphomeCleanHandler), (f"{rel}vscode", EsphomeVscodeHandler), (f"{rel}ace", EsphomeAceEditorHandler), (f"{rel}update-all", EsphomeUpdateAllHandler), (f"{rel}info", InfoRequestHandler), (f"{rel}edit", EditRequestHandler), (f"{rel}downloads", DownloadListRequestHandler), (f"{rel}download.bin", DownloadBinaryRequestHandler), (f"{rel}serial-ports", SerialPortRequestHandler), (f"{rel}ping", PingRequestHandler), (f"{rel}delete", DeleteRequestHandler), (f"{rel}undo-delete", UndoDeleteRequestHandler), (f"{rel}wizard", WizardRequestHandler), (f"{rel}static/(.*)", StaticFileHandler, {"path": get_static_path()}), (f"{rel}devices", ListDevicesHandler), (f"{rel}import", ImportRequestHandler), (f"{rel}secret_keys", SecretKeysRequestHandler), (f"{rel}json-config", JsonConfigRequestHandler), (f"{rel}rename", EsphomeRenameHandler), (f"{rel}prometheus-sd", PrometheusServiceDiscoveryHandler), (f"{rel}boards/([a-z0-9]+)", BoardsRequestHandler), (f"{rel}version", EsphomeVersionHandler), ], **app_settings, ) def start_web_server( app: tornado.web.Application, socket: str | None, address: str | None, port: int | None, config_dir: str, ) -> None: """Start the web server listener.""" if socket is None: _LOGGER.info( "Starting dashboard web server on http://%s:%s and configuration dir %s...", address, port, config_dir, ) app.listen(port, address) return _LOGGER.info( "Starting dashboard web server on unix socket %s and configuration dir %s...", socket, config_dir, ) server = tornado.httpserver.HTTPServer(app) socket = tornado.netutil.bind_unix_socket(socket, mode=0o666) server.add_socket(socket)