mirror of
				https://github.com/esphome/esphome.git
				synced 2025-10-25 21:23:53 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			349 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			349 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """
 | |
| ESPHome's coroutine system.
 | |
| 
 | |
| The Problem: When running the code generationg, components can depend on variables being registered.
 | |
| For example, an i2c-based sensor would need the i2c bus component to first be declared before the
 | |
| codegen can emit code using that variable (or otherwise the C++ won't compile).
 | |
| 
 | |
| ESPHome's codegen system solves this by using coroutine-like methods. When a component depends on
 | |
| a variable, it waits for it to be registered using `await cg.get_variable()`. If the variable
 | |
| hasn't been registered yet, control will be yielded back to another component until the variable
 | |
| is registered. This leads to a topological sort, solving the dependency problem.
 | |
| 
 | |
| Importantly, ESPHome only uses the coroutine *syntax*, no actual asyncio event loop is running in
 | |
| the background. This is so that we can ensure the order of execution is constant for the same
 | |
| YAML configuration, thus main.cpp only has to be recompiled if the configuration actually changes.
 | |
| 
 | |
| There are two syntaxes for ESPHome coroutines ("old style" vs "new style" coroutines).
 | |
| 
 | |
| "new style" - This is very much like coroutines you might be used to:
 | |
| 
 | |
| ```py
 | |
| async def my_coroutine(config):
 | |
|     var = await cg.get_variable(config[CONF_ID])
 | |
|     await some_other_coroutine(xyz)
 | |
|     return var
 | |
| ```
 | |
| 
 | |
| new style coroutines are `async def` methods that use `await` to await the result of another coroutine,
 | |
| and can return values using a `return` statement.
 | |
| 
 | |
| "old style" - This was a hack for when ESPHome still had to run on python 2, but is still compatible
 | |
| 
 | |
| ```py
 | |
| @coroutine
 | |
| def my_coroutine(config):
 | |
|     var = yield cg.get_variable(config[CONF_ID])
 | |
|     yield some_other_coroutine(xyz)
 | |
|     yield var
 | |
| ```
 | |
| 
 | |
| Here everything is combined in `yield` expressions. You await other coroutines using `yield` and
 | |
| the last `yield` expression defines what is returned.
 | |
| """
 | |
| 
 | |
| from __future__ import annotations
 | |
| 
 | |
| from collections.abc import Awaitable, Callable, Generator, Iterator
 | |
| import enum
 | |
| import functools
 | |
| import heapq
 | |
| import inspect
 | |
| import logging
 | |
| import types
 | |
| from typing import Any
 | |
| 
 | |
| _LOGGER = logging.getLogger(__name__)
 | |
| 
 | |
| 
 | |
| class CoroPriority(enum.IntEnum):
 | |
|     """Execution priority stages for ESPHome code generation.
 | |
| 
 | |
|     Higher values run first. These stages ensure proper dependency
 | |
|     resolution during code generation.
 | |
|     """
 | |
| 
 | |
|     # Platform initialization - must run first
 | |
|     # Examples: esp32, esp8266, rp2040
 | |
|     PLATFORM = 1000
 | |
| 
 | |
|     # Network infrastructure setup
 | |
|     # Examples: network (201)
 | |
|     NETWORK = 201
 | |
| 
 | |
|     # Network transport layer
 | |
|     # Examples: async_tcp (200)
 | |
|     NETWORK_TRANSPORT = 200
 | |
| 
 | |
|     # Core system components
 | |
|     # Examples: esphome core, most entity base components (cover, update, datetime,
 | |
|     # valve, alarm_control_panel, lock, event, binary_sensor, button, climate, fan,
 | |
|     # light, media_player, number, select, sensor, switch, text_sensor, text),
 | |
|     # microphone, speaker, audio_dac, touchscreen, stepper
 | |
|     CORE = 100
 | |
| 
 | |
|     # Diagnostic and debugging systems
 | |
|     # Examples: logger (90)
 | |
|     DIAGNOSTICS = 90
 | |
| 
 | |
|     # Status and monitoring systems
 | |
|     # Examples: status_led (80)
 | |
|     STATUS = 80
 | |
| 
 | |
|     # Web server infrastructure
 | |
|     # Examples: web_server_base (65)
 | |
|     WEB_SERVER_BASE = 65
 | |
| 
 | |
|     # Network portal services
 | |
|     # Examples: captive_portal (64)
 | |
|     CAPTIVE_PORTAL = 64
 | |
| 
 | |
|     # Communication protocols and services
 | |
|     # Examples: wifi (60), ethernet (60)
 | |
|     COMMUNICATION = 60
 | |
| 
 | |
|     # Network discovery and management services
 | |
|     # Examples: mdns (55)
 | |
|     NETWORK_SERVICES = 55
 | |
| 
 | |
|     # OTA update services
 | |
|     # Examples: ota_updates (54)
 | |
|     OTA_UPDATES = 54
 | |
| 
 | |
|     # Web-based OTA services
 | |
|     # Examples: web_server_ota (52)
 | |
|     WEB_SERVER_OTA = 52
 | |
| 
 | |
|     # Application-level services
 | |
|     # Examples: safe_mode (50)
 | |
|     APPLICATION = 50
 | |
| 
 | |
|     # Web and UI services
 | |
|     # Examples: web_server (40)
 | |
|     WEB = 40
 | |
| 
 | |
|     # Automations and user logic
 | |
|     # Examples: esphome core automations (30)
 | |
|     AUTOMATION = 30
 | |
| 
 | |
|     # Bus and peripheral setup
 | |
|     # Examples: i2c (1)
 | |
|     BUS = 1
 | |
| 
 | |
|     # Standard component priority (default)
 | |
|     # Components without explicit priority run at 0
 | |
|     COMPONENT = 0
 | |
| 
 | |
|     # Components that need others to be registered first
 | |
|     # Examples: globals (-100)
 | |
|     LATE = -100
 | |
| 
 | |
|     # Platform-specific workarounds and fixes
 | |
|     # Examples: add_arduino_global_workaround (-999), esp8266 pin states (-999)
 | |
|     WORKAROUNDS = -999
 | |
| 
 | |
|     # Final setup that requires all components to be registered
 | |
|     # Examples: add_includes, _add_platformio_options, _add_platform_defines (all -1000),
 | |
|     # esp32_ble_tracker feature defines (-1000)
 | |
|     FINAL = -1000
 | |
| 
 | |
| 
 | |
| def coroutine(func: Callable[..., Any]) -> Callable[..., Awaitable[Any]]:
 | |
|     """Decorator to apply to methods to convert them to ESPHome coroutines."""
 | |
|     if getattr(func, "_esphome_coroutine", False):
 | |
|         # If func is already a coroutine, do not re-wrap it (performance)
 | |
|         return func
 | |
|     if inspect.isasyncgenfunction(func):
 | |
|         # Trade-off: In ESPHome, there's not really a use-case for async generators.
 | |
|         # and during the transition to new-style syntax it will happen that a `yield`
 | |
|         # is not replaced properly, so don't accept async generators.
 | |
|         raise ValueError(
 | |
|             f"Async generator functions are not allowed. "
 | |
|             f"Please check whether you've replaced all yields with awaits/returns. "
 | |
|             f"See {func} in {func.__module__}"
 | |
|         )
 | |
|     if inspect.iscoroutinefunction(func):
 | |
|         # A new-style async-def coroutine function, no conversion needed.
 | |
|         return func
 | |
| 
 | |
|     if inspect.isgeneratorfunction(func):
 | |
| 
 | |
|         @functools.wraps(func)
 | |
|         def coro(*args, **kwargs):
 | |
|             gen = func(*args, **kwargs)
 | |
|             ret = yield from _flatten_generator(gen)
 | |
|             return ret
 | |
| 
 | |
|     else:
 | |
|         # A "normal" function with no `yield` statements, convert to generator
 | |
|         # that includes a yield just so it's also a generator function
 | |
|         @functools.wraps(func)
 | |
|         def coro(*args, **kwargs):
 | |
|             res = func(*args, **kwargs)
 | |
|             yield
 | |
|             return res
 | |
| 
 | |
|     # Add coroutine internal python flag so that it can be awaited from new-style coroutines.
 | |
|     coro = types.coroutine(coro)
 | |
|     # pylint: disable=protected-access
 | |
|     coro._esphome_coroutine = True
 | |
|     return coro
 | |
| 
 | |
| 
 | |
| def coroutine_with_priority(priority: float | CoroPriority):
 | |
|     """Decorator to apply to functions to convert them to ESPHome coroutines.
 | |
| 
 | |
|     :param priority: priority with which to schedule the coroutine, higher priorities run first.
 | |
|                      Can be a float or a CoroPriority enum value.
 | |
|     """
 | |
| 
 | |
|     def decorator(func):
 | |
|         coro = coroutine(func)
 | |
|         coro.priority = float(priority)
 | |
|         return coro
 | |
| 
 | |
|     return decorator
 | |
| 
 | |
| 
 | |
| def _flatten_generator(gen: Generator[Any, Any, Any]):
 | |
|     to_send = None
 | |
|     while True:
 | |
|         try:
 | |
|             # Run until next yield expression
 | |
|             val = gen.send(to_send)
 | |
|         except StopIteration as e:
 | |
|             # return statement or end of function
 | |
| 
 | |
|             # From py3.3, return with a value is allowed in generators,
 | |
|             # and return value is transported in the value field of the exception.
 | |
|             # If we find a value in the exception, use that as the return value,
 | |
|             # otherwise use the value from the last yield statement ("old style")
 | |
|             ret = to_send if e.value is None else e.value
 | |
|             return ret
 | |
| 
 | |
|         if isinstance(val, Awaitable):
 | |
|             # yielded object that is awaitable (like `yield some_new_style_method()`)
 | |
|             # yield from __await__() like actual coroutines would.
 | |
|             to_send = yield from val.__await__()
 | |
|         elif inspect.isgenerator(val):
 | |
|             # Old style, like `yield cg.get_variable()`
 | |
|             to_send = yield from _flatten_generator(val)
 | |
|         else:
 | |
|             # Could be the last expression from this generator, record this as the return value
 | |
|             to_send = val
 | |
|             # perform a yield so that expressions like `while some_condition(): yield None`
 | |
|             # do not run without yielding control back to the top
 | |
|             yield
 | |
| 
 | |
| 
 | |
| class FakeAwaitable:
 | |
|     """Convert a generator to an awaitable object.
 | |
| 
 | |
|     Needed for internals of `cg.get_variable`. There we can't use @coroutine because
 | |
|     native coroutines await from types.coroutine() directly without yielding back control to the top
 | |
|     (likely as a performance enhancement).
 | |
| 
 | |
|     If we instead wrap the generator in this FakeAwaitable, control is yielded back to the top
 | |
|     (reason unknown).
 | |
|     """
 | |
| 
 | |
|     def __init__(self, gen: Generator[Any, Any, Any]) -> None:
 | |
|         self._gen = gen
 | |
| 
 | |
|     def __await__(self):
 | |
|         ret = yield from self._gen
 | |
|         return ret
 | |
| 
 | |
| 
 | |
| @functools.total_ordering
 | |
| class _Task:
 | |
|     def __init__(
 | |
|         self,
 | |
|         priority: float,
 | |
|         id_number: int,
 | |
|         iterator: Iterator[None],
 | |
|         original_function: Any,
 | |
|     ):
 | |
|         self.priority = priority
 | |
|         self.id_number = id_number
 | |
|         self.iterator = iterator
 | |
|         self.original_function = original_function
 | |
| 
 | |
|     def with_priority(self, priority: float) -> _Task:
 | |
|         return _Task(priority, self.id_number, self.iterator, self.original_function)
 | |
| 
 | |
|     @property
 | |
|     def _cmp_tuple(self) -> tuple[float, int]:
 | |
|         return (-self.priority, self.id_number)
 | |
| 
 | |
|     def __eq__(self, other):
 | |
|         return self._cmp_tuple == other._cmp_tuple
 | |
| 
 | |
|     def __ne__(self, other):
 | |
|         return not (self == other)
 | |
| 
 | |
|     def __lt__(self, other):
 | |
|         return self._cmp_tuple < other._cmp_tuple
 | |
| 
 | |
| 
 | |
| class FakeEventLoop:
 | |
|     """Emulate an asyncio EventLoop to run some registered coroutine jobs in sequence."""
 | |
| 
 | |
|     def __init__(self):
 | |
|         self._pending_tasks: list[_Task] = []
 | |
|         self._task_counter = 0
 | |
| 
 | |
|     def add_job(self, func, *args, **kwargs):
 | |
|         """Add a job to the task queue,
 | |
| 
 | |
|         Optionally retrieves priority from the function object, and schedules according to that.
 | |
|         """
 | |
|         if inspect.iscoroutine(func):
 | |
|             raise ValueError("Can only add coroutine functions, not coroutine objects")
 | |
|         if inspect.iscoroutinefunction(func):
 | |
|             coro = func
 | |
|             gen = coro(*args, **kwargs).__await__()
 | |
|         else:
 | |
|             coro = coroutine(func)
 | |
|             gen = coro(*args, **kwargs)
 | |
|         prio = getattr(coro, "priority", 0.0)
 | |
|         task = _Task(prio, self._task_counter, gen, func)
 | |
|         self._task_counter += 1
 | |
|         heapq.heappush(self._pending_tasks, task)
 | |
| 
 | |
|     def flush_tasks(self):
 | |
|         """Run until all tasks have been completed.
 | |
| 
 | |
|         :raises RuntimeError: if a deadlock is detected.
 | |
|         """
 | |
|         i = 0
 | |
|         while self._pending_tasks:
 | |
|             i += 1
 | |
|             if i > 1000000:
 | |
|                 # Detect deadlock/circular dependency by measuring how many times tasks have been
 | |
|                 # executed. On the big tests/test1.yaml we only get to a fraction of this, so
 | |
|                 # this shouldn't be a problem.
 | |
|                 raise RuntimeError(
 | |
|                     "Circular dependency detected! "
 | |
|                     "Please run with -v option to see what functions failed to "
 | |
|                     "complete."
 | |
|                 )
 | |
| 
 | |
|             task: _Task = heapq.heappop(self._pending_tasks)
 | |
|             _LOGGER.debug(
 | |
|                 "Running %s in %s (num %s)",
 | |
|                 task.original_function.__qualname__,
 | |
|                 task.original_function.__module__,
 | |
|                 task.id_number,
 | |
|             )
 | |
| 
 | |
|             try:
 | |
|                 next(task.iterator)
 | |
|                 # Decrease priority over time, so that if this task is blocked
 | |
|                 # due to a dependency others will clear the dependency
 | |
|                 # This could be improved with a less naive approach
 | |
|                 new_task = task.with_priority(task.priority - 1)
 | |
|                 heapq.heappush(self._pending_tasks, new_task)
 | |
|             except StopIteration:
 | |
|                 _LOGGER.debug(" -> finished")
 |