1
0
mirror of https://github.com/esphome/esphome.git synced 2025-10-15 00:03:51 +01:00

dashboard: Add support for firing events (#5775)

* dashboard: fire events when entry is updated or state changes

* dashboard: fire events when entry is updated or state changes

* dashboard: fire events when entry is updated or state changes

* tweaks

* fixes

* remove typing_extensions

* rename for asyncio

* rename for asyncio

* rename for asyncio

* preen

* lint

* lint

* move dict converter

* lint
This commit is contained in:
J. Nick Koston
2023-11-17 18:33:10 -06:00
committed by GitHub
parent 288af1f4d2
commit 3c243e663f
8 changed files with 251 additions and 78 deletions

View File

@@ -3,24 +3,78 @@ from __future__ import annotations
import asyncio
import logging
import os
from typing import TYPE_CHECKING, Any
from esphome import const, util
from esphome.storage_json import StorageJSON, ext_storage_path
from .const import (
EVENT_ENTRY_ADDED,
EVENT_ENTRY_REMOVED,
EVENT_ENTRY_STATE_CHANGED,
EVENT_ENTRY_UPDATED,
)
from .enum import StrEnum
if TYPE_CHECKING:
from .core import ESPHomeDashboard
_LOGGER = logging.getLogger(__name__)
DashboardCacheKeyType = tuple[int, int, float, int]
# Currently EntryState is a simple
# online/offline/unknown enum, but in the future
# it may be expanded to include more states
class EntryState(StrEnum):
ONLINE = "online"
OFFLINE = "offline"
UNKNOWN = "unknown"
_BOOL_TO_ENTRY_STATE = {
True: EntryState.ONLINE,
False: EntryState.OFFLINE,
None: EntryState.UNKNOWN,
}
_ENTRY_STATE_TO_BOOL = {
EntryState.ONLINE: True,
EntryState.OFFLINE: False,
EntryState.UNKNOWN: None,
}
def bool_to_entry_state(value: bool) -> EntryState:
"""Convert a bool to an entry state."""
return _BOOL_TO_ENTRY_STATE[value]
def entry_state_to_bool(value: EntryState) -> bool | None:
"""Convert an entry state to a bool."""
return _ENTRY_STATE_TO_BOOL[value]
class DashboardEntries:
"""Represents all dashboard entries."""
__slots__ = ("_loop", "_config_dir", "_entries", "_loaded_entries", "_update_lock")
__slots__ = (
"_dashboard",
"_loop",
"_config_dir",
"_entries",
"_entry_states",
"_loaded_entries",
"_update_lock",
)
def __init__(self, config_dir: str) -> None:
def __init__(self, dashboard: ESPHomeDashboard) -> None:
"""Initialize the DashboardEntries."""
self._dashboard = dashboard
self._loop = asyncio.get_running_loop()
self._config_dir = config_dir
self._config_dir = dashboard.settings.config_dir
# Entries are stored as
# {
# "path/to/file.yaml": DashboardEntry,
@@ -46,6 +100,25 @@ class DashboardEntries:
"""Return all entries."""
return list(self._entries.values())
def set_state(self, entry: DashboardEntry, state: EntryState) -> None:
"""Set the state for an entry."""
asyncio.run_coroutine_threadsafe(
self._async_set_state(entry, state), self._loop
).result()
async def _async_set_state(self, entry: DashboardEntry, state: EntryState) -> None:
"""Set the state for an entry."""
self.async_set_state(entry, state)
def async_set_state(self, entry: DashboardEntry, state: EntryState) -> None:
"""Set the state for an entry."""
if entry.state == state:
return
entry.state = state
self._dashboard.bus.async_fire(
EVENT_ENTRY_STATE_CHANGED, {"entry": entry, "state": state}
)
async def async_request_update_entries(self) -> None:
"""Request an update of the dashboard entries from disk.
@@ -81,16 +154,17 @@ class DashboardEntries:
path_to_cache_key = await self._loop.run_in_executor(
None, self._get_path_to_cache_key
)
entries = self._entries
added: dict[DashboardEntry, DashboardCacheKeyType] = {}
updated: dict[DashboardEntry, DashboardCacheKeyType] = {}
removed: set[DashboardEntry] = {
entry
for filename, entry in self._entries.items()
for filename, entry in entries.items()
if filename not in path_to_cache_key
}
entries = self._entries
for path, cache_key in path_to_cache_key.items():
if entry := self._entries.get(path):
if entry := entries.get(path):
if entry.cache_key != cache_key:
updated[entry] = cache_key
else:
@@ -102,17 +176,17 @@ class DashboardEntries:
None, self._load_entries, {**added, **updated}
)
bus = self._dashboard.bus
for entry in added:
_LOGGER.debug("Added dashboard entry %s", entry.path)
entries[entry.path] = entry
bus.async_fire(EVENT_ENTRY_ADDED, {"entry": entry})
if entry in removed:
_LOGGER.debug("Removed dashboard entry %s", entry.path)
entries.pop(entry.path)
for entry in removed:
del entries[entry.path]
bus.async_fire(EVENT_ENTRY_REMOVED, {"entry": entry})
for entry in updated:
_LOGGER.debug("Updated dashboard entry %s", entry.path)
# In the future we can fire events when entries are added/removed/updated
bus.async_fire(EVENT_ENTRY_UPDATED, {"entry": entry})
def _get_path_to_cache_key(self) -> dict[str, DashboardCacheKeyType]:
"""Return a dict of path to cache key."""
@@ -152,29 +226,64 @@ class DashboardEntry:
This class is thread-safe and read-only.
"""
__slots__ = ("path", "filename", "_storage_path", "cache_key", "storage")
__slots__ = (
"path",
"filename",
"_storage_path",
"cache_key",
"storage",
"state",
"_to_dict",
)
def __init__(self, path: str, cache_key: DashboardCacheKeyType) -> None:
"""Initialize the DashboardEntry."""
self.path = path
self.filename = os.path.basename(path)
self.filename: str = os.path.basename(path)
self._storage_path = ext_storage_path(self.filename)
self.cache_key = cache_key
self.storage: StorageJSON | None = None
self.state = EntryState.UNKNOWN
self._to_dict: dict[str, Any] | None = None
def __repr__(self):
"""Return the representation of this entry."""
return (
f"DashboardEntry({self.path} "
f"DashboardEntry(path={self.path} "
f"address={self.address} "
f"web_port={self.web_port} "
f"name={self.name} "
f"no_mdns={self.no_mdns})"
f"no_mdns={self.no_mdns} "
f"state={self.state} "
")"
)
def to_dict(self) -> dict[str, Any]:
"""Return a dict representation of this entry.
The dict includes the loaded configuration but not
the current state of the entry.
"""
if self._to_dict is None:
self._to_dict = {
"name": self.name,
"friendly_name": self.friendly_name,
"configuration": self.filename,
"loaded_integrations": self.loaded_integrations,
"deployed_version": self.update_old,
"current_version": self.update_new,
"path": self.path,
"comment": self.comment,
"address": self.address,
"web_port": self.web_port,
"target_platform": self.target_platform,
}
return self._to_dict
def load_from_disk(self, cache_key: DashboardCacheKeyType | None = None) -> None:
"""Load this entry from disk."""
self.storage = StorageJSON.load(self._storage_path)
self._to_dict = None
#
# Currently StorageJSON.load() will return None if the file does not exist
#