mirror of
				https://github.com/esphome/esphome.git
				synced 2025-10-31 07:03:55 +00:00 
			
		
		
		
	Merge branch 'platformio_cache_tests' into platformio_cache_tests_api
This commit is contained in:
		
							
								
								
									
										19
									
								
								.github/workflows/ci-memory-impact-comment.yml
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										19
									
								
								.github/workflows/ci-memory-impact-comment.yml
									
									
									
									
										vendored
									
									
								
							| @@ -28,20 +28,23 @@ jobs: | ||||
|         run: | | ||||
|           # Get PR details by searching for PR with matching head SHA | ||||
|           # The workflow_run.pull_requests field is often empty for forks | ||||
|           # Use paginate to handle repos with many open PRs | ||||
|           head_sha="${{ github.event.workflow_run.head_sha }}" | ||||
|           pr_data=$(gh api "/repos/${{ github.repository }}/commits/$head_sha/pulls" \ | ||||
|             --jq '.[0] | {number: .number, base_ref: .base.ref}') | ||||
|           if [ -z "$pr_data" ] || [ "$pr_data" == "null" ]; then | ||||
|           pr_data=$(gh api --paginate "/repos/${{ github.repository }}/pulls" \ | ||||
|             --jq ".[] | select(.head.sha == \"$head_sha\") | {number: .number, base_ref: .base.ref}" \ | ||||
|             | head -n 1) | ||||
|  | ||||
|           if [ -z "$pr_data" ]; then | ||||
|             echo "No PR found for SHA $head_sha, skipping" | ||||
|             echo "skip=true" >> $GITHUB_OUTPUT | ||||
|             echo "skip=true" >> "$GITHUB_OUTPUT" | ||||
|             exit 0 | ||||
|           fi | ||||
|  | ||||
|           pr_number=$(echo "$pr_data" | jq -r '.number') | ||||
|           base_ref=$(echo "$pr_data" | jq -r '.base_ref') | ||||
|  | ||||
|           echo "pr_number=$pr_number" >> $GITHUB_OUTPUT | ||||
|           echo "base_ref=$base_ref" >> $GITHUB_OUTPUT | ||||
|           echo "pr_number=$pr_number" >> "$GITHUB_OUTPUT" | ||||
|           echo "base_ref=$base_ref" >> "$GITHUB_OUTPUT" | ||||
|           echo "Found PR #$pr_number targeting base branch: $base_ref" | ||||
|  | ||||
|       - name: Check out code from base repository | ||||
| @@ -87,9 +90,9 @@ jobs: | ||||
|         if: steps.pr.outputs.skip != 'true' | ||||
|         run: | | ||||
|           if [ -f ./memory-analysis/memory-analysis-target.json ] && [ -f ./memory-analysis/memory-analysis-pr.json ]; then | ||||
|             echo "found=true" >> $GITHUB_OUTPUT | ||||
|             echo "found=true" >> "$GITHUB_OUTPUT" | ||||
|           else | ||||
|             echo "found=false" >> $GITHUB_OUTPUT | ||||
|             echo "found=false" >> "$GITHUB_OUTPUT" | ||||
|             echo "Memory analysis artifacts not found, skipping comment" | ||||
|           fi | ||||
|  | ||||
|   | ||||
| @@ -62,6 +62,40 @@ from esphome.util import ( | ||||
|  | ||||
| _LOGGER = logging.getLogger(__name__) | ||||
|  | ||||
| # Special non-component keys that appear in configs | ||||
| _NON_COMPONENT_KEYS = frozenset( | ||||
|     { | ||||
|         CONF_ESPHOME, | ||||
|         "substitutions", | ||||
|         "packages", | ||||
|         "globals", | ||||
|         "external_components", | ||||
|         "<<", | ||||
|     } | ||||
| ) | ||||
|  | ||||
|  | ||||
| def detect_external_components(config: ConfigType) -> set[str]: | ||||
|     """Detect external/custom components in the configuration. | ||||
|  | ||||
|     External components are those that appear in the config but are not | ||||
|     part of ESPHome's built-in components and are not special config keys. | ||||
|  | ||||
|     Args: | ||||
|         config: The ESPHome configuration dictionary | ||||
|  | ||||
|     Returns: | ||||
|         A set of external component names | ||||
|     """ | ||||
|     from esphome.analyze_memory.helpers import get_esphome_components | ||||
|  | ||||
|     builtin_components = get_esphome_components() | ||||
|     return { | ||||
|         key | ||||
|         for key in config | ||||
|         if key not in builtin_components and key not in _NON_COMPONENT_KEYS | ||||
|     } | ||||
|  | ||||
|  | ||||
| class ArgsProtocol(Protocol): | ||||
|     device: list[str] | None | ||||
| @@ -892,6 +926,54 @@ def command_idedata(args: ArgsProtocol, config: ConfigType) -> int: | ||||
|     return 0 | ||||
|  | ||||
|  | ||||
| def command_analyze_memory(args: ArgsProtocol, config: ConfigType) -> int: | ||||
|     """Analyze memory usage by component. | ||||
|  | ||||
|     This command compiles the configuration and performs memory analysis. | ||||
|     Compilation is fast if sources haven't changed (just relinking). | ||||
|     """ | ||||
|     from esphome import platformio_api | ||||
|     from esphome.analyze_memory.cli import MemoryAnalyzerCLI | ||||
|  | ||||
|     # Always compile to ensure fresh data (fast if no changes - just relinks) | ||||
|     exit_code = write_cpp(config) | ||||
|     if exit_code != 0: | ||||
|         return exit_code | ||||
|     exit_code = compile_program(args, config) | ||||
|     if exit_code != 0: | ||||
|         return exit_code | ||||
|     _LOGGER.info("Successfully compiled program.") | ||||
|  | ||||
|     # Get idedata for analysis | ||||
|     idedata = platformio_api.get_idedata(config) | ||||
|     if idedata is None: | ||||
|         _LOGGER.error("Failed to get IDE data for memory analysis") | ||||
|         return 1 | ||||
|  | ||||
|     firmware_elf = Path(idedata.firmware_elf_path) | ||||
|  | ||||
|     # Extract external components from config | ||||
|     external_components = detect_external_components(config) | ||||
|     _LOGGER.debug("Detected external components: %s", external_components) | ||||
|  | ||||
|     # Perform memory analysis | ||||
|     _LOGGER.info("Analyzing memory usage...") | ||||
|     analyzer = MemoryAnalyzerCLI( | ||||
|         str(firmware_elf), | ||||
|         idedata.objdump_path, | ||||
|         idedata.readelf_path, | ||||
|         external_components, | ||||
|     ) | ||||
|     analyzer.analyze() | ||||
|  | ||||
|     # Generate and display report | ||||
|     report = analyzer.generate_report() | ||||
|     print() | ||||
|     print(report) | ||||
|  | ||||
|     return 0 | ||||
|  | ||||
|  | ||||
| def command_rename(args: ArgsProtocol, config: ConfigType) -> int | None: | ||||
|     new_name = args.name | ||||
|     for c in new_name: | ||||
| @@ -1007,6 +1089,7 @@ POST_CONFIG_ACTIONS = { | ||||
|     "idedata": command_idedata, | ||||
|     "rename": command_rename, | ||||
|     "discover": command_discover, | ||||
|     "analyze-memory": command_analyze_memory, | ||||
| } | ||||
|  | ||||
| SIMPLE_CONFIG_ACTIONS = [ | ||||
| @@ -1292,6 +1375,14 @@ def parse_args(argv): | ||||
|     ) | ||||
|     parser_rename.add_argument("name", help="The new name for the device.", type=str) | ||||
|  | ||||
|     parser_analyze_memory = subparsers.add_parser( | ||||
|         "analyze-memory", | ||||
|         help="Analyze memory usage by component.", | ||||
|     ) | ||||
|     parser_analyze_memory.add_argument( | ||||
|         "configuration", help="Your YAML configuration file(s).", nargs="+" | ||||
|     ) | ||||
|  | ||||
|     # Keep backward compatibility with the old command line format of | ||||
|     # esphome <config> <command>. | ||||
|     # | ||||
|   | ||||
| @@ -6,7 +6,7 @@ import esphome.config_validation as cv | ||||
| from esphome.const import CONF_SUBSTITUTIONS, VALID_SUBSTITUTIONS_CHARACTERS | ||||
| from esphome.yaml_util import ESPHomeDataBase, ESPLiteralValue, make_data_base | ||||
|  | ||||
| from .jinja import Jinja, JinjaStr, TemplateError, TemplateRuntimeError, has_jinja | ||||
| from .jinja import Jinja, JinjaError, JinjaStr, has_jinja | ||||
|  | ||||
| CODEOWNERS = ["@esphome/core"] | ||||
| _LOGGER = logging.getLogger(__name__) | ||||
| @@ -57,17 +57,12 @@ def _expand_jinja(value, orig_value, path, jinja, ignore_missing): | ||||
|                     "->".join(str(x) for x in path), | ||||
|                     err.message, | ||||
|                 ) | ||||
|         except ( | ||||
|             TemplateError, | ||||
|             TemplateRuntimeError, | ||||
|             RuntimeError, | ||||
|             ArithmeticError, | ||||
|             AttributeError, | ||||
|             TypeError, | ||||
|         ) as err: | ||||
|         except JinjaError as err: | ||||
|             raise cv.Invalid( | ||||
|                 f"{type(err).__name__} Error evaluating jinja expression '{value}': {str(err)}." | ||||
|                 f" See {'->'.join(str(x) for x in path)}", | ||||
|                 f"{err.error_name()} Error evaluating jinja expression '{value}': {str(err.parent())}." | ||||
|                 f"\nEvaluation stack: (most recent evaluation last)\n{err.stack_trace_str()}" | ||||
|                 f"\nRelevant context:\n{err.context_trace_str()}" | ||||
|                 f"\nSee {'->'.join(str(x) for x in path)}", | ||||
|                 path, | ||||
|             ) | ||||
|     return value | ||||
|   | ||||
| @@ -6,6 +6,8 @@ import re | ||||
| import jinja2 as jinja | ||||
| from jinja2.sandbox import SandboxedEnvironment | ||||
|  | ||||
| from esphome.yaml_util import ESPLiteralValue | ||||
|  | ||||
| TemplateError = jinja.TemplateError | ||||
| TemplateSyntaxError = jinja.TemplateSyntaxError | ||||
| TemplateRuntimeError = jinja.TemplateRuntimeError | ||||
| @@ -26,18 +28,20 @@ def has_jinja(st): | ||||
|     return detect_jinja_re.search(st) is not None | ||||
|  | ||||
|  | ||||
| # SAFE_GLOBAL_FUNCTIONS defines a allowlist of built-in functions that are considered safe to expose | ||||
| # SAFE_GLOBALS defines a allowlist of built-in functions or modules that are considered safe to expose | ||||
| # in Jinja templates or other sandboxed evaluation contexts. Only functions that do not allow | ||||
| # arbitrary code execution, file access, or other security risks are included. | ||||
| # | ||||
| # The following functions are considered safe: | ||||
| #   - math: The entire math module is injected, allowing access to mathematical functions like sin, cos, sqrt, etc. | ||||
| #   - ord: Converts a character to its Unicode code point integer. | ||||
| #   - chr: Converts an integer to its corresponding Unicode character. | ||||
| #   - len: Returns the length of a sequence or collection. | ||||
| # | ||||
| # These functions were chosen because they are pure, have no side effects, and do not provide access | ||||
| # to the file system, environment, or other potentially sensitive resources. | ||||
| SAFE_GLOBAL_FUNCTIONS = { | ||||
| SAFE_GLOBALS = { | ||||
|     "math": math,  # Inject entire math module | ||||
|     "ord": ord, | ||||
|     "chr": chr, | ||||
|     "len": len, | ||||
| @@ -56,22 +60,62 @@ class JinjaStr(str): | ||||
|     later in the main substitutions pass. | ||||
|     """ | ||||
|  | ||||
|     Undefined = object() | ||||
|  | ||||
|     def __new__(cls, value: str, upvalues=None): | ||||
|         obj = super().__new__(cls, value) | ||||
|         obj.upvalues = upvalues or {} | ||||
|         if isinstance(value, JinjaStr): | ||||
|             base = str(value) | ||||
|             merged = {**value.upvalues, **(upvalues or {})} | ||||
|         else: | ||||
|             base = value | ||||
|             merged = dict(upvalues or {}) | ||||
|         obj = super().__new__(cls, base) | ||||
|         obj.upvalues = merged | ||||
|         obj.result = JinjaStr.Undefined | ||||
|         return obj | ||||
|  | ||||
|     def __init__(self, value: str, upvalues=None): | ||||
|         self.upvalues = upvalues or {} | ||||
|  | ||||
| class JinjaError(Exception): | ||||
|     def __init__(self, context_trace: dict, expr: str): | ||||
|         self.context_trace = context_trace | ||||
|         self.eval_stack = [expr] | ||||
|  | ||||
|     def parent(self): | ||||
|         return self.__context__ | ||||
|  | ||||
|     def error_name(self): | ||||
|         return type(self.parent()).__name__ | ||||
|  | ||||
|     def context_trace_str(self): | ||||
|         return "\n".join( | ||||
|             f"  {k} = {repr(v)} ({type(v).__name__})" | ||||
|             for k, v in self.context_trace.items() | ||||
|         ) | ||||
|  | ||||
|     def stack_trace_str(self): | ||||
|         return "\n".join( | ||||
|             f" {len(self.eval_stack) - i}: {expr}{i == 0 and ' <-- ' + self.error_name() or ''}" | ||||
|             for i, expr in enumerate(self.eval_stack) | ||||
|         ) | ||||
|  | ||||
|  | ||||
| class Jinja: | ||||
| class TrackerContext(jinja.runtime.Context): | ||||
|     def resolve_or_missing(self, key): | ||||
|         val = super().resolve_or_missing(key) | ||||
|         if isinstance(val, JinjaStr): | ||||
|             self.environment.context_trace[key] = val | ||||
|             val, _ = self.environment.expand(val) | ||||
|         self.environment.context_trace[key] = val | ||||
|         return val | ||||
|  | ||||
|  | ||||
| class Jinja(SandboxedEnvironment): | ||||
|     """ | ||||
|     Wraps a Jinja environment | ||||
|     """ | ||||
|  | ||||
|     def __init__(self, context_vars): | ||||
|         self.env = SandboxedEnvironment( | ||||
|         super().__init__( | ||||
|             trim_blocks=True, | ||||
|             lstrip_blocks=True, | ||||
|             block_start_string="<%", | ||||
| @@ -82,13 +126,20 @@ class Jinja: | ||||
|             variable_end_string="}", | ||||
|             undefined=jinja.StrictUndefined, | ||||
|         ) | ||||
|         self.env.add_extension("jinja2.ext.do") | ||||
|         self.env.globals["math"] = math  # Inject entire math module | ||||
|         self.context_class = TrackerContext | ||||
|         self.add_extension("jinja2.ext.do") | ||||
|         self.context_trace = {} | ||||
|         self.context_vars = {**context_vars} | ||||
|         self.env.globals = { | ||||
|             **self.env.globals, | ||||
|         for k, v in self.context_vars.items(): | ||||
|             if isinstance(v, ESPLiteralValue): | ||||
|                 continue | ||||
|             if isinstance(v, str) and not isinstance(v, JinjaStr) and has_jinja(v): | ||||
|                 self.context_vars[k] = JinjaStr(v, self.context_vars) | ||||
|  | ||||
|         self.globals = { | ||||
|             **self.globals, | ||||
|             **self.context_vars, | ||||
|             **SAFE_GLOBAL_FUNCTIONS, | ||||
|             **SAFE_GLOBALS, | ||||
|         } | ||||
|  | ||||
|     def safe_eval(self, expr): | ||||
| @@ -110,23 +161,43 @@ class Jinja: | ||||
|         result = None | ||||
|         override_vars = {} | ||||
|         if isinstance(content_str, JinjaStr): | ||||
|             if content_str.result is not JinjaStr.Undefined: | ||||
|                 return content_str.result, None | ||||
|             # If `value` is already a JinjaStr, it means we are trying to evaluate it again | ||||
|             # in a parent pass. | ||||
|             # Hopefully, all required variables are visible now. | ||||
|             override_vars = content_str.upvalues | ||||
|  | ||||
|         old_trace = self.context_trace | ||||
|         self.context_trace = {} | ||||
|         try: | ||||
|             template = self.env.from_string(content_str) | ||||
|             template = self.from_string(content_str) | ||||
|             result = self.safe_eval(template.render(override_vars)) | ||||
|             if isinstance(result, Undefined): | ||||
|                 # This happens when the expression is simply an undefined variable. Jinja does not | ||||
|                 # raise an exception, instead we get "Undefined". | ||||
|                 # Trigger an UndefinedError exception so we skip to below. | ||||
|                 print("" + result) | ||||
|                 print("" + result)  # force a UndefinedError exception | ||||
|         except (TemplateSyntaxError, UndefinedError) as err: | ||||
|             # `content_str` contains a Jinja expression that refers to a variable that is undefined | ||||
|             # in this scope. Perhaps it refers to a root substitution that is not visible yet. | ||||
|             # Therefore, return the original `content_str` as a JinjaStr, which contains the variables | ||||
|             # Therefore, return `content_str` as a JinjaStr, which contains the variables | ||||
|             # that are actually visible to it at this point to postpone evaluation. | ||||
|             return JinjaStr(content_str, {**self.context_vars, **override_vars}), err | ||||
|         except JinjaError as err: | ||||
|             err.context_trace = {**self.context_trace, **err.context_trace} | ||||
|             err.eval_stack.append(content_str) | ||||
|             raise err | ||||
|         except ( | ||||
|             TemplateError, | ||||
|             TemplateRuntimeError, | ||||
|             RuntimeError, | ||||
|             ArithmeticError, | ||||
|             AttributeError, | ||||
|             TypeError, | ||||
|         ) as err: | ||||
|             raise JinjaError(self.context_trace, content_str) from err | ||||
|         finally: | ||||
|             self.context_trace = old_trace | ||||
|  | ||||
|         if isinstance(content_str, JinjaStr): | ||||
|             content_str.result = result | ||||
|  | ||||
|         return result, None | ||||
|   | ||||
| @@ -273,6 +273,8 @@ | ||||
|  | ||||
| #ifdef USE_NRF52 | ||||
| #define USE_NRF52_DFU | ||||
| #define USE_SOFTDEVICE_ID 7 | ||||
| #define USE_SOFTDEVICE_VERSION 1 | ||||
| #endif | ||||
|  | ||||
| // Disabled feature flags | ||||
|   | ||||
| @@ -8,6 +8,7 @@ substitutions: | ||||
|   area: 25 | ||||
|   numberOne: 1 | ||||
|   var1: 79 | ||||
|   double_width: 14 | ||||
| test_list: | ||||
|   - The area is 56 | ||||
|   - 56 | ||||
| @@ -25,3 +26,4 @@ test_list: | ||||
|   - ord("a") = 97 | ||||
|   - chr(97) = a | ||||
|   - len([1,2,3]) = 3 | ||||
|   - width = 7, double_width = 14 | ||||
|   | ||||
| @@ -8,6 +8,7 @@ substitutions: | ||||
|   area: 25 | ||||
|   numberOne: 1 | ||||
|   var1: 79 | ||||
|   double_width: ${width * 2} | ||||
|  | ||||
| test_list: | ||||
|   - "The area is ${width * height}" | ||||
| @@ -23,3 +24,4 @@ test_list: | ||||
|   - ord("a") = ${ ord("a") } | ||||
|   - chr(97) = ${ chr(97) } | ||||
|   - len([1,2,3]) = ${ len([1,2,3]) } | ||||
|   - width = ${width}, double_width = ${double_width} | ||||
|   | ||||
| @@ -17,10 +17,12 @@ from esphome import platformio_api | ||||
| from esphome.__main__ import ( | ||||
|     Purpose, | ||||
|     choose_upload_log_host, | ||||
|     command_analyze_memory, | ||||
|     command_clean_all, | ||||
|     command_rename, | ||||
|     command_update_all, | ||||
|     command_wizard, | ||||
|     detect_external_components, | ||||
|     get_port_type, | ||||
|     has_ip_address, | ||||
|     has_mqtt, | ||||
| @@ -226,13 +228,47 @@ def mock_run_external_process() -> Generator[Mock]: | ||||
|  | ||||
|  | ||||
| @pytest.fixture | ||||
| def mock_run_external_command() -> Generator[Mock]: | ||||
|     """Mock run_external_command for testing.""" | ||||
| def mock_run_external_command_main() -> Generator[Mock]: | ||||
|     """Mock run_external_command in __main__ module (different from platformio_api).""" | ||||
|     with patch("esphome.__main__.run_external_command") as mock: | ||||
|         mock.return_value = 0  # Default to success | ||||
|         yield mock | ||||
|  | ||||
|  | ||||
| @pytest.fixture | ||||
| def mock_write_cpp() -> Generator[Mock]: | ||||
|     """Mock write_cpp for testing.""" | ||||
|     with patch("esphome.__main__.write_cpp") as mock: | ||||
|         mock.return_value = 0  # Default to success | ||||
|         yield mock | ||||
|  | ||||
|  | ||||
| @pytest.fixture | ||||
| def mock_compile_program() -> Generator[Mock]: | ||||
|     """Mock compile_program for testing.""" | ||||
|     with patch("esphome.__main__.compile_program") as mock: | ||||
|         mock.return_value = 0  # Default to success | ||||
|         yield mock | ||||
|  | ||||
|  | ||||
| @pytest.fixture | ||||
| def mock_get_esphome_components() -> Generator[Mock]: | ||||
|     """Mock get_esphome_components for testing.""" | ||||
|     with patch("esphome.analyze_memory.helpers.get_esphome_components") as mock: | ||||
|         mock.return_value = {"logger", "api", "ota"} | ||||
|         yield mock | ||||
|  | ||||
|  | ||||
| @pytest.fixture | ||||
| def mock_memory_analyzer_cli() -> Generator[Mock]: | ||||
|     """Mock MemoryAnalyzerCLI for testing.""" | ||||
|     with patch("esphome.analyze_memory.cli.MemoryAnalyzerCLI") as mock_class: | ||||
|         mock_analyzer = MagicMock() | ||||
|         mock_analyzer.generate_report.return_value = "Mock Memory Report" | ||||
|         mock_class.return_value = mock_analyzer | ||||
|         yield mock_class | ||||
|  | ||||
|  | ||||
| def test_choose_upload_log_host_with_string_default() -> None: | ||||
|     """Test with a single string default device.""" | ||||
|     setup_core() | ||||
| @@ -839,7 +875,7 @@ def test_upload_program_serial_esp8266_with_file( | ||||
|  | ||||
| def test_upload_using_esptool_path_conversion( | ||||
|     tmp_path: Path, | ||||
|     mock_run_external_command: Mock, | ||||
|     mock_run_external_command_main: Mock, | ||||
|     mock_get_idedata: Mock, | ||||
| ) -> None: | ||||
|     """Test upload_using_esptool properly converts Path objects to strings for esptool. | ||||
| @@ -875,10 +911,10 @@ def test_upload_using_esptool_path_conversion( | ||||
|     assert result == 0 | ||||
|  | ||||
|     # Verify that run_external_command was called | ||||
|     assert mock_run_external_command.call_count == 1 | ||||
|     assert mock_run_external_command_main.call_count == 1 | ||||
|  | ||||
|     # Get the actual call arguments | ||||
|     call_args = mock_run_external_command.call_args[0] | ||||
|     call_args = mock_run_external_command_main.call_args[0] | ||||
|  | ||||
|     # The first argument should be esptool.main function, | ||||
|     # followed by the command arguments | ||||
| @@ -917,7 +953,7 @@ def test_upload_using_esptool_path_conversion( | ||||
|  | ||||
| def test_upload_using_esptool_with_file_path( | ||||
|     tmp_path: Path, | ||||
|     mock_run_external_command: Mock, | ||||
|     mock_run_external_command_main: Mock, | ||||
| ) -> None: | ||||
|     """Test upload_using_esptool with a custom file that's a Path object.""" | ||||
|     setup_core(platform=PLATFORM_ESP8266, tmp_path=tmp_path, name="test") | ||||
| @@ -934,10 +970,10 @@ def test_upload_using_esptool_with_file_path( | ||||
|     assert result == 0 | ||||
|  | ||||
|     # Verify that run_external_command was called | ||||
|     mock_run_external_command.assert_called_once() | ||||
|     mock_run_external_command_main.assert_called_once() | ||||
|  | ||||
|     # Get the actual call arguments | ||||
|     call_args = mock_run_external_command.call_args[0] | ||||
|     call_args = mock_run_external_command_main.call_args[0] | ||||
|     cmd_list = list(call_args[1:])  # Skip the esptool.main function | ||||
|  | ||||
|     # Find the firmware path in the command | ||||
| @@ -2273,3 +2309,226 @@ def test_show_logs_api_mqtt_timeout_fallback( | ||||
|  | ||||
|     # Verify run_logs was called with only the static IP (MQTT failed) | ||||
|     mock_run_logs.assert_called_once_with(CORE.config, ["192.168.1.100"]) | ||||
|  | ||||
|  | ||||
| def test_detect_external_components_no_external( | ||||
|     mock_get_esphome_components: Mock, | ||||
| ) -> None: | ||||
|     """Test detect_external_components with no external components.""" | ||||
|     config = { | ||||
|         CONF_ESPHOME: {CONF_NAME: "test_device"}, | ||||
|         "logger": {}, | ||||
|         "api": {}, | ||||
|     } | ||||
|  | ||||
|     result = detect_external_components(config) | ||||
|  | ||||
|     assert result == set() | ||||
|     mock_get_esphome_components.assert_called_once() | ||||
|  | ||||
|  | ||||
| def test_detect_external_components_with_external( | ||||
|     mock_get_esphome_components: Mock, | ||||
| ) -> None: | ||||
|     """Test detect_external_components detects external components.""" | ||||
|     config = { | ||||
|         CONF_ESPHOME: {CONF_NAME: "test_device"}, | ||||
|         "logger": {},  # Built-in | ||||
|         "api": {},  # Built-in | ||||
|         "my_custom_sensor": {},  # External | ||||
|         "another_custom": {},  # External | ||||
|         "external_components": [],  # Special key, not a component | ||||
|         "substitutions": {},  # Special key, not a component | ||||
|     } | ||||
|  | ||||
|     result = detect_external_components(config) | ||||
|  | ||||
|     assert result == {"my_custom_sensor", "another_custom"} | ||||
|     mock_get_esphome_components.assert_called_once() | ||||
|  | ||||
|  | ||||
| def test_detect_external_components_filters_special_keys( | ||||
|     mock_get_esphome_components: Mock, | ||||
| ) -> None: | ||||
|     """Test detect_external_components filters out special config keys.""" | ||||
|     config = { | ||||
|         CONF_ESPHOME: {CONF_NAME: "test_device"}, | ||||
|         "substitutions": {"key": "value"}, | ||||
|         "packages": {}, | ||||
|         "globals": [], | ||||
|         "external_components": [], | ||||
|         "<<": {},  # YAML merge key | ||||
|     } | ||||
|  | ||||
|     result = detect_external_components(config) | ||||
|  | ||||
|     assert result == set() | ||||
|     mock_get_esphome_components.assert_called_once() | ||||
|  | ||||
|  | ||||
| def test_command_analyze_memory_success( | ||||
|     tmp_path: Path, | ||||
|     capfd: CaptureFixture[str], | ||||
|     mock_write_cpp: Mock, | ||||
|     mock_compile_program: Mock, | ||||
|     mock_get_idedata: Mock, | ||||
|     mock_get_esphome_components: Mock, | ||||
|     mock_memory_analyzer_cli: Mock, | ||||
| ) -> None: | ||||
|     """Test command_analyze_memory with successful compilation and analysis.""" | ||||
|     setup_core(platform=PLATFORM_ESP32, tmp_path=tmp_path, name="test_device") | ||||
|  | ||||
|     # Create firmware.elf file | ||||
|     firmware_path = ( | ||||
|         tmp_path / ".esphome" / "build" / "test_device" / ".pioenvs" / "test_device" | ||||
|     ) | ||||
|     firmware_path.mkdir(parents=True, exist_ok=True) | ||||
|     firmware_elf = firmware_path / "firmware.elf" | ||||
|     firmware_elf.write_text("mock elf file") | ||||
|  | ||||
|     # Mock idedata | ||||
|     mock_idedata_obj = MagicMock(spec=platformio_api.IDEData) | ||||
|     mock_idedata_obj.firmware_elf_path = str(firmware_elf) | ||||
|     mock_idedata_obj.objdump_path = "/path/to/objdump" | ||||
|     mock_idedata_obj.readelf_path = "/path/to/readelf" | ||||
|     mock_get_idedata.return_value = mock_idedata_obj | ||||
|  | ||||
|     config = { | ||||
|         CONF_ESPHOME: {CONF_NAME: "test_device"}, | ||||
|         "logger": {}, | ||||
|     } | ||||
|  | ||||
|     args = MockArgs() | ||||
|  | ||||
|     result = command_analyze_memory(args, config) | ||||
|  | ||||
|     assert result == 0 | ||||
|  | ||||
|     # Verify compilation was done | ||||
|     mock_write_cpp.assert_called_once_with(config) | ||||
|     mock_compile_program.assert_called_once_with(args, config) | ||||
|  | ||||
|     # Verify analyzer was created with correct parameters | ||||
|     mock_memory_analyzer_cli.assert_called_once_with( | ||||
|         str(firmware_elf), | ||||
|         "/path/to/objdump", | ||||
|         "/path/to/readelf", | ||||
|         set(),  # No external components | ||||
|     ) | ||||
|  | ||||
|     # Verify analysis was run | ||||
|     mock_analyzer = mock_memory_analyzer_cli.return_value | ||||
|     mock_analyzer.analyze.assert_called_once() | ||||
|     mock_analyzer.generate_report.assert_called_once() | ||||
|  | ||||
|     # Verify report was printed | ||||
|     captured = capfd.readouterr() | ||||
|     assert "Mock Memory Report" in captured.out | ||||
|  | ||||
|  | ||||
| def test_command_analyze_memory_with_external_components( | ||||
|     tmp_path: Path, | ||||
|     mock_write_cpp: Mock, | ||||
|     mock_compile_program: Mock, | ||||
|     mock_get_idedata: Mock, | ||||
|     mock_get_esphome_components: Mock, | ||||
|     mock_memory_analyzer_cli: Mock, | ||||
| ) -> None: | ||||
|     """Test command_analyze_memory detects external components.""" | ||||
|     setup_core(platform=PLATFORM_ESP32, tmp_path=tmp_path, name="test_device") | ||||
|  | ||||
|     # Create firmware.elf file | ||||
|     firmware_path = ( | ||||
|         tmp_path / ".esphome" / "build" / "test_device" / ".pioenvs" / "test_device" | ||||
|     ) | ||||
|     firmware_path.mkdir(parents=True, exist_ok=True) | ||||
|     firmware_elf = firmware_path / "firmware.elf" | ||||
|     firmware_elf.write_text("mock elf file") | ||||
|  | ||||
|     # Mock idedata | ||||
|     mock_idedata_obj = MagicMock(spec=platformio_api.IDEData) | ||||
|     mock_idedata_obj.firmware_elf_path = str(firmware_elf) | ||||
|     mock_idedata_obj.objdump_path = "/path/to/objdump" | ||||
|     mock_idedata_obj.readelf_path = "/path/to/readelf" | ||||
|     mock_get_idedata.return_value = mock_idedata_obj | ||||
|  | ||||
|     config = { | ||||
|         CONF_ESPHOME: {CONF_NAME: "test_device"}, | ||||
|         "logger": {}, | ||||
|         "my_custom_component": {"param": "value"},  # External component | ||||
|         "external_components": [{"source": "github://user/repo"}],  # Not a component | ||||
|     } | ||||
|  | ||||
|     args = MockArgs() | ||||
|  | ||||
|     result = command_analyze_memory(args, config) | ||||
|  | ||||
|     assert result == 0 | ||||
|  | ||||
|     # Verify analyzer was created with external components detected | ||||
|     mock_memory_analyzer_cli.assert_called_once_with( | ||||
|         str(firmware_elf), | ||||
|         "/path/to/objdump", | ||||
|         "/path/to/readelf", | ||||
|         {"my_custom_component"},  # External component detected | ||||
|     ) | ||||
|  | ||||
|  | ||||
| def test_command_analyze_memory_write_cpp_fails( | ||||
|     tmp_path: Path, | ||||
|     mock_write_cpp: Mock, | ||||
| ) -> None: | ||||
|     """Test command_analyze_memory when write_cpp fails.""" | ||||
|     setup_core(platform=PLATFORM_ESP32, tmp_path=tmp_path, name="test_device") | ||||
|  | ||||
|     config = {CONF_ESPHOME: {CONF_NAME: "test_device"}} | ||||
|     args = MockArgs() | ||||
|  | ||||
|     mock_write_cpp.return_value = 1  # Failure | ||||
|  | ||||
|     result = command_analyze_memory(args, config) | ||||
|  | ||||
|     assert result == 1 | ||||
|     mock_write_cpp.assert_called_once_with(config) | ||||
|  | ||||
|  | ||||
| def test_command_analyze_memory_compile_fails( | ||||
|     tmp_path: Path, | ||||
|     mock_write_cpp: Mock, | ||||
|     mock_compile_program: Mock, | ||||
| ) -> None: | ||||
|     """Test command_analyze_memory when compilation fails.""" | ||||
|     setup_core(platform=PLATFORM_ESP32, tmp_path=tmp_path, name="test_device") | ||||
|  | ||||
|     config = {CONF_ESPHOME: {CONF_NAME: "test_device"}} | ||||
|     args = MockArgs() | ||||
|  | ||||
|     mock_compile_program.return_value = 1  # Compilation failed | ||||
|  | ||||
|     result = command_analyze_memory(args, config) | ||||
|  | ||||
|     assert result == 1 | ||||
|     mock_write_cpp.assert_called_once_with(config) | ||||
|     mock_compile_program.assert_called_once_with(args, config) | ||||
|  | ||||
|  | ||||
| def test_command_analyze_memory_no_idedata( | ||||
|     tmp_path: Path, | ||||
|     caplog: pytest.LogCaptureFixture, | ||||
|     mock_write_cpp: Mock, | ||||
|     mock_compile_program: Mock, | ||||
|     mock_get_idedata: Mock, | ||||
| ) -> None: | ||||
|     """Test command_analyze_memory when idedata cannot be retrieved.""" | ||||
|     setup_core(platform=PLATFORM_ESP32, tmp_path=tmp_path, name="test_device") | ||||
|  | ||||
|     config = {CONF_ESPHOME: {CONF_NAME: "test_device"}} | ||||
|     args = MockArgs() | ||||
|  | ||||
|     mock_get_idedata.return_value = None  # Failed to get idedata | ||||
|  | ||||
|     with caplog.at_level(logging.ERROR): | ||||
|         result = command_analyze_memory(args, config) | ||||
|  | ||||
|     assert result == 1 | ||||
|     assert "Failed to get IDE data for memory analysis" in caplog.text | ||||
|   | ||||
		Reference in New Issue
	
	Block a user