mirror of
				https://github.com/esphome/esphome.git
				synced 2025-10-30 22:53:59 +00:00 
			
		
		
		
	[gpio_expander] Fix bank caching (#10077)
Co-authored-by: J. Nick Koston <nick@koston.org> Co-authored-by: pre-commit-ci-lite[bot] <117423508+pre-commit-ci-lite[bot]@users.noreply.github.com>
This commit is contained in:
		| @@ -0,0 +1,25 @@ | ||||
| import esphome.codegen as cg | ||||
| import esphome.config_validation as cv | ||||
| from esphome.const import CONF_ID | ||||
|  | ||||
| AUTO_LOAD = ["gpio_expander"] | ||||
|  | ||||
| gpio_expander_test_component_ns = cg.esphome_ns.namespace( | ||||
|     "gpio_expander_test_component" | ||||
| ) | ||||
|  | ||||
| GPIOExpanderTestComponent = gpio_expander_test_component_ns.class_( | ||||
|     "GPIOExpanderTestComponent", cg.Component | ||||
| ) | ||||
|  | ||||
|  | ||||
| CONFIG_SCHEMA = cv.Schema( | ||||
|     { | ||||
|         cv.GenerateID(): cv.declare_id(GPIOExpanderTestComponent), | ||||
|     } | ||||
| ).extend(cv.COMPONENT_SCHEMA) | ||||
|  | ||||
|  | ||||
| async def to_code(config): | ||||
|     var = cg.new_Pvariable(config[CONF_ID]) | ||||
|     await cg.register_component(var, config) | ||||
| @@ -0,0 +1,38 @@ | ||||
| #include "gpio_expander_test_component.h" | ||||
|  | ||||
| #include "esphome/core/application.h" | ||||
| #include "esphome/core/log.h" | ||||
|  | ||||
| namespace esphome::gpio_expander_test_component { | ||||
|  | ||||
| static const char *const TAG = "gpio_expander_test"; | ||||
|  | ||||
| void GPIOExpanderTestComponent::setup() { | ||||
|   for (uint8_t pin = 0; pin < 32; pin++) { | ||||
|     this->digital_read(pin); | ||||
|   } | ||||
|  | ||||
|   this->digital_read(3); | ||||
|   this->digital_read(3); | ||||
|   this->digital_read(4); | ||||
|   this->digital_read(3); | ||||
|   this->digital_read(10); | ||||
|   this->reset_pin_cache_();  // Reset cache to ensure next read is from hardware | ||||
|   this->digital_read(15); | ||||
|   this->digital_read(14); | ||||
|   this->digital_read(14); | ||||
|  | ||||
|   ESP_LOGD(TAG, "DONE"); | ||||
| } | ||||
|  | ||||
| bool GPIOExpanderTestComponent::digital_read_hw(uint8_t pin) { | ||||
|   ESP_LOGD(TAG, "digital_read_hw pin=%d", pin); | ||||
|   return true; | ||||
| } | ||||
|  | ||||
| bool GPIOExpanderTestComponent::digital_read_cache(uint8_t pin) { | ||||
|   ESP_LOGD(TAG, "digital_read_cache pin=%d", pin); | ||||
|   return true; | ||||
| } | ||||
|  | ||||
| }  // namespace esphome::gpio_expander_test_component | ||||
| @@ -0,0 +1,18 @@ | ||||
| #pragma once | ||||
|  | ||||
| #include "esphome/components/gpio_expander/cached_gpio.h" | ||||
| #include "esphome/core/component.h" | ||||
|  | ||||
| namespace esphome::gpio_expander_test_component { | ||||
|  | ||||
| class GPIOExpanderTestComponent : public Component, public esphome::gpio_expander::CachedGpioExpander<uint8_t, 32> { | ||||
|  public: | ||||
|   void setup() override; | ||||
|  | ||||
|  protected: | ||||
|   bool digital_read_hw(uint8_t pin) override; | ||||
|   bool digital_read_cache(uint8_t pin) override; | ||||
|   void digital_write_hw(uint8_t pin, bool value) override{}; | ||||
| }; | ||||
|  | ||||
| }  // namespace esphome::gpio_expander_test_component | ||||
							
								
								
									
										17
									
								
								tests/integration/fixtures/gpio_expander_cache.yaml
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										17
									
								
								tests/integration/fixtures/gpio_expander_cache.yaml
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,17 @@ | ||||
| esphome: | ||||
|   name: gpio-expander-cache | ||||
| host: | ||||
|  | ||||
| logger: | ||||
|   level: DEBUG | ||||
|  | ||||
| api: | ||||
|  | ||||
| # External component that uses gpio_expander::CachedGpioExpander | ||||
| external_components: | ||||
|   - source: | ||||
|       type: local | ||||
|       path: EXTERNAL_COMPONENT_PATH | ||||
|     components: [gpio_expander_test_component] | ||||
|  | ||||
| gpio_expander_test_component: | ||||
							
								
								
									
										123
									
								
								tests/integration/test_gpio_expander_cache.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										123
									
								
								tests/integration/test_gpio_expander_cache.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,123 @@ | ||||
| """Integration test for CachedGPIOExpander to ensure correct behavior.""" | ||||
|  | ||||
| from __future__ import annotations | ||||
|  | ||||
| import asyncio | ||||
| from pathlib import Path | ||||
| import re | ||||
|  | ||||
| import pytest | ||||
|  | ||||
| from .types import APIClientConnectedFactory, RunCompiledFunction | ||||
|  | ||||
|  | ||||
| @pytest.mark.asyncio | ||||
| async def test_gpio_expander_cache( | ||||
|     yaml_config: str, | ||||
|     run_compiled: RunCompiledFunction, | ||||
|     api_client_connected: APIClientConnectedFactory, | ||||
| ) -> None: | ||||
|     """Test gpio_expander::CachedGpioExpander correctly calls hardware functions.""" | ||||
|     # Get the path to the external components directory | ||||
|     external_components_path = str( | ||||
|         Path(__file__).parent / "fixtures" / "external_components" | ||||
|     ) | ||||
|  | ||||
|     # Replace the placeholder in the YAML config with the actual path | ||||
|     yaml_config = yaml_config.replace( | ||||
|         "EXTERNAL_COMPONENT_PATH", external_components_path | ||||
|     ) | ||||
|  | ||||
|     logs_done = asyncio.Event() | ||||
|  | ||||
|     # Patterns to match in logs | ||||
|     digital_read_hw_pattern = re.compile(r"digital_read_hw pin=(\d+)") | ||||
|     digital_read_cache_pattern = re.compile(r"digital_read_cache pin=(\d+)") | ||||
|  | ||||
|     # ensure logs are in the expected order | ||||
|     log_order = [ | ||||
|         (digital_read_hw_pattern, 0), | ||||
|         [(digital_read_cache_pattern, i) for i in range(0, 8)], | ||||
|         (digital_read_hw_pattern, 8), | ||||
|         [(digital_read_cache_pattern, i) for i in range(8, 16)], | ||||
|         (digital_read_hw_pattern, 16), | ||||
|         [(digital_read_cache_pattern, i) for i in range(16, 24)], | ||||
|         (digital_read_hw_pattern, 24), | ||||
|         [(digital_read_cache_pattern, i) for i in range(24, 32)], | ||||
|         (digital_read_hw_pattern, 3), | ||||
|         (digital_read_cache_pattern, 3), | ||||
|         (digital_read_hw_pattern, 3), | ||||
|         (digital_read_cache_pattern, 3), | ||||
|         (digital_read_cache_pattern, 4), | ||||
|         (digital_read_hw_pattern, 3), | ||||
|         (digital_read_cache_pattern, 3), | ||||
|         (digital_read_hw_pattern, 10), | ||||
|         (digital_read_cache_pattern, 10), | ||||
|         # full cache reset here for testing | ||||
|         (digital_read_hw_pattern, 15), | ||||
|         (digital_read_cache_pattern, 15), | ||||
|         (digital_read_cache_pattern, 14), | ||||
|         (digital_read_hw_pattern, 14), | ||||
|         (digital_read_cache_pattern, 14), | ||||
|     ] | ||||
|     # Flatten the log order for easier processing | ||||
|     log_order: list[tuple[re.Pattern, int]] = [ | ||||
|         item | ||||
|         for sublist in log_order | ||||
|         for item in (sublist if isinstance(sublist, list) else [sublist]) | ||||
|     ] | ||||
|  | ||||
|     index = 0 | ||||
|  | ||||
|     def check_output(line: str) -> None: | ||||
|         """Check log output for expected messages.""" | ||||
|         nonlocal index | ||||
|         if logs_done.is_set(): | ||||
|             return | ||||
|  | ||||
|         clean_line = re.sub(r"\x1b\[[0-9;]*m", "", line) | ||||
|  | ||||
|         if "digital_read" in clean_line: | ||||
|             if index >= len(log_order): | ||||
|                 print(f"Received unexpected log line: {clean_line}") | ||||
|                 logs_done.set() | ||||
|                 return | ||||
|  | ||||
|             pattern, expected_pin = log_order[index] | ||||
|             match = pattern.search(clean_line) | ||||
|  | ||||
|             if not match: | ||||
|                 print(f"Log line did not match next expected pattern: {clean_line}") | ||||
|                 logs_done.set() | ||||
|                 return | ||||
|  | ||||
|             pin = int(match.group(1)) | ||||
|             if pin != expected_pin: | ||||
|                 print(f"Unexpected pin number. Expected {expected_pin}, got {pin}") | ||||
|                 logs_done.set() | ||||
|                 return | ||||
|  | ||||
|             index += 1 | ||||
|  | ||||
|         elif "DONE" in clean_line: | ||||
|             # Check if we reached the end of the expected log entries | ||||
|             logs_done.set() | ||||
|  | ||||
|     # Run with log monitoring | ||||
|     async with ( | ||||
|         run_compiled(yaml_config, line_callback=check_output), | ||||
|         api_client_connected() as client, | ||||
|     ): | ||||
|         # Verify device info | ||||
|         device_info = await client.device_info() | ||||
|         assert device_info is not None | ||||
|         assert device_info.name == "gpio-expander-cache" | ||||
|  | ||||
|         try: | ||||
|             await asyncio.wait_for(logs_done.wait(), timeout=5.0) | ||||
|         except TimeoutError: | ||||
|             pytest.fail("Timeout waiting for logs to complete") | ||||
|  | ||||
|         assert index == len(log_order), ( | ||||
|             f"Expected {len(log_order)} log entries, but got {index}" | ||||
|         ) | ||||
		Reference in New Issue
	
	Block a user