mirror of
https://github.com/esphome/esphome.git
synced 2025-01-19 20:34:06 +00:00
207 lines
6.4 KiB
Python
207 lines
6.4 KiB
Python
from dataclasses import dataclass
|
|
import importlib
|
|
import importlib.abc
|
|
import importlib.resources
|
|
import importlib.util
|
|
import logging
|
|
from pathlib import Path
|
|
import sys
|
|
from types import ModuleType
|
|
from typing import Any, Callable, ContextManager, Optional
|
|
|
|
from esphome.const import SOURCE_FILE_EXTENSIONS
|
|
from esphome.core import CORE
|
|
import esphome.core.config
|
|
from esphome.types import ConfigType
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass(frozen=True, order=True)
|
|
class FileResource:
|
|
package: str
|
|
resource: str
|
|
|
|
def path(self) -> ContextManager[Path]:
|
|
return importlib.resources.as_file(
|
|
importlib.resources.files(self.package) / self.resource
|
|
)
|
|
|
|
|
|
class ComponentManifest:
|
|
def __init__(self, module: ModuleType):
|
|
self.module = module
|
|
|
|
@property
|
|
def package(self) -> str:
|
|
"""Return the package name the module is contained in.
|
|
|
|
Examples:
|
|
- esphome/components/gpio/__init__.py -> esphome.components.gpio
|
|
- esphome/components/gpio/switch/__init__.py -> esphome.components.gpio.switch
|
|
- esphome/components/a4988/stepper.py -> esphome.components.a4988
|
|
"""
|
|
return self.module.__package__
|
|
|
|
@property
|
|
def is_platform(self) -> bool:
|
|
return len(self.module.__name__.split(".")) == 4
|
|
|
|
@property
|
|
def is_platform_component(self) -> bool:
|
|
return getattr(self.module, "IS_PLATFORM_COMPONENT", False)
|
|
|
|
@property
|
|
def config_schema(self) -> Optional[Any]:
|
|
return getattr(self.module, "CONFIG_SCHEMA", None)
|
|
|
|
@property
|
|
def multi_conf(self) -> bool:
|
|
return getattr(self.module, "MULTI_CONF", False)
|
|
|
|
@property
|
|
def multi_conf_no_default(self) -> bool:
|
|
return getattr(self.module, "MULTI_CONF_NO_DEFAULT", False)
|
|
|
|
@property
|
|
def to_code(self) -> Optional[Callable[[Any], None]]:
|
|
return getattr(self.module, "to_code", None)
|
|
|
|
@property
|
|
def dependencies(self) -> list[str]:
|
|
return getattr(self.module, "DEPENDENCIES", [])
|
|
|
|
@property
|
|
def conflicts_with(self) -> list[str]:
|
|
return getattr(self.module, "CONFLICTS_WITH", [])
|
|
|
|
@property
|
|
def auto_load(self) -> list[str]:
|
|
al = getattr(self.module, "AUTO_LOAD", [])
|
|
if callable(al):
|
|
return al()
|
|
return al
|
|
|
|
@property
|
|
def codeowners(self) -> list[str]:
|
|
return getattr(self.module, "CODEOWNERS", [])
|
|
|
|
@property
|
|
def final_validate_schema(self) -> Optional[Callable[[ConfigType], None]]:
|
|
"""Components can declare a `FINAL_VALIDATE_SCHEMA` cv.Schema that gets called
|
|
after the main validation. In that function checks across components can be made.
|
|
|
|
Note that the function can't mutate the configuration - no changes are saved
|
|
"""
|
|
return getattr(self.module, "FINAL_VALIDATE_SCHEMA", None)
|
|
|
|
@property
|
|
def resources(self) -> list[FileResource]:
|
|
"""Return a list of all file resources defined in the package of this component.
|
|
|
|
This will return all cpp source files that are located in the same folder as the
|
|
loaded .py file (does not look through subdirectories)
|
|
"""
|
|
ret = []
|
|
|
|
for resource in (
|
|
r.name
|
|
for r in importlib.resources.files(self.package).iterdir()
|
|
if r.is_file()
|
|
):
|
|
if Path(resource).suffix not in SOURCE_FILE_EXTENSIONS:
|
|
continue
|
|
if not importlib.resources.files(self.package).joinpath(resource).is_file():
|
|
# Not a resource = this is a directory (yeah this is confusing)
|
|
continue
|
|
ret.append(FileResource(self.package, resource))
|
|
return ret
|
|
|
|
|
|
class ComponentMetaFinder(importlib.abc.MetaPathFinder):
|
|
def __init__(
|
|
self, components_path: Path, allowed_components: Optional[list[str]] = None
|
|
) -> None:
|
|
self._allowed_components = allowed_components
|
|
self._finders = []
|
|
for hook in sys.path_hooks:
|
|
try:
|
|
finder = hook(str(components_path))
|
|
except ImportError:
|
|
continue
|
|
self._finders.append(finder)
|
|
|
|
def find_spec(self, fullname: str, path: Optional[list[str]], target=None):
|
|
if not fullname.startswith("esphome.components."):
|
|
return None
|
|
parts = fullname.split(".")
|
|
if len(parts) != 3:
|
|
# only handle direct components, not platforms
|
|
# platforms are handled automatically when parent is imported
|
|
return None
|
|
component = parts[2]
|
|
if (
|
|
self._allowed_components is not None
|
|
and component not in self._allowed_components
|
|
):
|
|
return None
|
|
|
|
for finder in self._finders:
|
|
spec = finder.find_spec(fullname, target=target)
|
|
if spec is not None:
|
|
return spec
|
|
return None
|
|
|
|
|
|
def clear_component_meta_finders():
|
|
sys.meta_path = [x for x in sys.meta_path if not isinstance(x, ComponentMetaFinder)]
|
|
|
|
|
|
def install_meta_finder(
|
|
components_path: Path, allowed_components: Optional[list[str]] = None
|
|
):
|
|
sys.meta_path.insert(0, ComponentMetaFinder(components_path, allowed_components))
|
|
|
|
|
|
def install_custom_components_meta_finder():
|
|
custom_components_dir = (Path(CORE.config_dir) / "custom_components").resolve()
|
|
install_meta_finder(custom_components_dir)
|
|
|
|
|
|
def _lookup_module(domain):
|
|
if domain in _COMPONENT_CACHE:
|
|
return _COMPONENT_CACHE[domain]
|
|
|
|
try:
|
|
module = importlib.import_module(f"esphome.components.{domain}")
|
|
except ImportError as e:
|
|
if "No module named" in str(e):
|
|
_LOGGER.error(
|
|
"Unable to import component %s: %s", domain, str(e), exc_info=False
|
|
)
|
|
else:
|
|
_LOGGER.error("Unable to import component %s:", domain, exc_info=True)
|
|
return None
|
|
except Exception: # pylint: disable=broad-except
|
|
_LOGGER.error("Unable to load component %s:", domain, exc_info=True)
|
|
return None
|
|
|
|
manif = ComponentManifest(module)
|
|
_COMPONENT_CACHE[domain] = manif
|
|
return manif
|
|
|
|
|
|
def get_component(domain):
|
|
assert "." not in domain
|
|
return _lookup_module(domain)
|
|
|
|
|
|
def get_platform(domain, platform):
|
|
full = f"{platform}.{domain}"
|
|
return _lookup_module(full)
|
|
|
|
|
|
_COMPONENT_CACHE = {}
|
|
CORE_COMPONENTS_PATH = (Path(__file__).parent / "components").resolve()
|
|
_COMPONENT_CACHE["esphome"] = ComponentManifest(esphome.core.config)
|