mirror of
https://github.com/esphome/esphome.git
synced 2025-09-10 23:32:23 +01:00
[gpio_expander] Add intelligent pin type selection to CachedGpioExpander template (#10577)
This commit is contained in:
@@ -4,6 +4,7 @@
|
|||||||
#include <cstdint>
|
#include <cstdint>
|
||||||
#include <cstring>
|
#include <cstring>
|
||||||
#include <limits>
|
#include <limits>
|
||||||
|
#include <type_traits>
|
||||||
#include "esphome/core/hal.h"
|
#include "esphome/core/hal.h"
|
||||||
|
|
||||||
namespace esphome::gpio_expander {
|
namespace esphome::gpio_expander {
|
||||||
@@ -11,18 +12,27 @@ namespace esphome::gpio_expander {
|
|||||||
/// @brief A class to cache the read state of a GPIO expander.
|
/// @brief A class to cache the read state of a GPIO expander.
|
||||||
/// This class caches reads between GPIO Pins which are on the same bank.
|
/// This class caches reads between GPIO Pins which are on the same bank.
|
||||||
/// This means that for reading whole Port (ex. 8 pins) component needs only one
|
/// This means that for reading whole Port (ex. 8 pins) component needs only one
|
||||||
/// I2C/SPI read per main loop call. It assumes, that one bit in byte identifies one GPIO pin
|
/// I2C/SPI read per main loop call. It assumes that one bit in byte identifies one GPIO pin.
|
||||||
|
///
|
||||||
/// Template parameters:
|
/// Template parameters:
|
||||||
/// T - Type which represents internal register. Could be uint8_t or uint16_t. Adjust to
|
/// T - Type which represents internal bank register. Could be uint8_t or uint16_t.
|
||||||
/// match size of your internal GPIO bank register.
|
/// Choose based on how your I/O expander reads pins:
|
||||||
/// N - Number of pins
|
/// * uint8_t: For chips that read banks separately (8 pins at a time)
|
||||||
template<typename T, T N> class CachedGpioExpander {
|
/// Examples: MCP23017 (2x8-bit banks), TCA9555 (2x8-bit banks)
|
||||||
|
/// * uint16_t: For chips that read all pins at once (up to 16 pins)
|
||||||
|
/// Examples: PCF8574/8575 (8/16 pins), PCA9554/9555 (8/16 pins)
|
||||||
|
/// N - Total number of pins (maximum 65535)
|
||||||
|
/// P - Type for pin number parameters (automatically selected based on N:
|
||||||
|
/// uint8_t for N<=256, uint16_t for N>256). Can be explicitly specified
|
||||||
|
/// if needed (e.g., for components like SN74HC165 with >256 pins)
|
||||||
|
template<typename T, uint16_t N, typename P = typename std::conditional<(N > 256), uint16_t, uint8_t>::type>
|
||||||
|
class CachedGpioExpander {
|
||||||
public:
|
public:
|
||||||
/// @brief Read the state of the given pin. This will invalidate the cache for the given pin number.
|
/// @brief Read the state of the given pin. This will invalidate the cache for the given pin number.
|
||||||
/// @param pin Pin number to read
|
/// @param pin Pin number to read
|
||||||
/// @return Pin state
|
/// @return Pin state
|
||||||
bool digital_read(T pin) {
|
bool digital_read(P pin) {
|
||||||
const uint8_t bank = pin / BANK_SIZE;
|
const P bank = pin / BANK_SIZE;
|
||||||
const T pin_mask = (1 << (pin % BANK_SIZE));
|
const T pin_mask = (1 << (pin % BANK_SIZE));
|
||||||
// Check if specific pin cache is valid
|
// Check if specific pin cache is valid
|
||||||
if (this->read_cache_valid_[bank] & pin_mask) {
|
if (this->read_cache_valid_[bank] & pin_mask) {
|
||||||
@@ -38,21 +48,31 @@ template<typename T, T N> class CachedGpioExpander {
|
|||||||
return this->digital_read_cache(pin);
|
return this->digital_read_cache(pin);
|
||||||
}
|
}
|
||||||
|
|
||||||
void digital_write(T pin, bool value) { this->digital_write_hw(pin, value); }
|
void digital_write(P pin, bool value) { this->digital_write_hw(pin, value); }
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
/// @brief Call component low level function to read GPIO state from device
|
/// @brief Read GPIO bank from hardware into internal state
|
||||||
virtual bool digital_read_hw(T pin) = 0;
|
/// @param pin Pin number (used to determine which bank to read)
|
||||||
/// @brief Call component read function from internal cache.
|
/// @return true if read succeeded, false on communication error
|
||||||
virtual bool digital_read_cache(T pin) = 0;
|
/// @note This does NOT return the pin state. It returns whether the read operation succeeded.
|
||||||
/// @brief Call component low level function to write GPIO state to device
|
/// The actual pin state should be returned by digital_read_cache().
|
||||||
virtual void digital_write_hw(T pin, bool value) = 0;
|
virtual bool digital_read_hw(P pin) = 0;
|
||||||
|
|
||||||
|
/// @brief Get cached pin value from internal state
|
||||||
|
/// @param pin Pin number to read
|
||||||
|
/// @return Pin state (true = HIGH, false = LOW)
|
||||||
|
virtual bool digital_read_cache(P pin) = 0;
|
||||||
|
|
||||||
|
/// @brief Write GPIO state to hardware
|
||||||
|
/// @param pin Pin number to write
|
||||||
|
/// @param value Pin state to write (true = HIGH, false = LOW)
|
||||||
|
virtual void digital_write_hw(P pin, bool value) = 0;
|
||||||
|
|
||||||
/// @brief Invalidate cache. This function should be called in component loop().
|
/// @brief Invalidate cache. This function should be called in component loop().
|
||||||
void reset_pin_cache_() { memset(this->read_cache_valid_, 0x00, CACHE_SIZE_BYTES); }
|
void reset_pin_cache_() { memset(this->read_cache_valid_, 0x00, CACHE_SIZE_BYTES); }
|
||||||
|
|
||||||
static constexpr uint8_t BITS_PER_BYTE = 8;
|
static constexpr uint16_t BITS_PER_BYTE = 8;
|
||||||
static constexpr uint8_t BANK_SIZE = sizeof(T) * BITS_PER_BYTE;
|
static constexpr uint16_t BANK_SIZE = sizeof(T) * BITS_PER_BYTE;
|
||||||
static constexpr size_t BANKS = N / BANK_SIZE;
|
static constexpr size_t BANKS = N / BANK_SIZE;
|
||||||
static constexpr size_t CACHE_SIZE_BYTES = BANKS * sizeof(T);
|
static constexpr size_t CACHE_SIZE_BYTES = BANKS * sizeof(T);
|
||||||
|
|
||||||
|
@@ -27,11 +27,13 @@ void GPIOExpanderTestComponent::setup() {
|
|||||||
|
|
||||||
bool GPIOExpanderTestComponent::digital_read_hw(uint8_t pin) {
|
bool GPIOExpanderTestComponent::digital_read_hw(uint8_t pin) {
|
||||||
ESP_LOGD(TAG, "digital_read_hw pin=%d", pin);
|
ESP_LOGD(TAG, "digital_read_hw pin=%d", pin);
|
||||||
|
// Return true to indicate successful read operation
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool GPIOExpanderTestComponent::digital_read_cache(uint8_t pin) {
|
bool GPIOExpanderTestComponent::digital_read_cache(uint8_t pin) {
|
||||||
ESP_LOGD(TAG, "digital_read_cache pin=%d", pin);
|
ESP_LOGD(TAG, "digital_read_cache pin=%d", pin);
|
||||||
|
// Return the pin state (always HIGH for testing)
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -0,0 +1,24 @@
|
|||||||
|
import esphome.codegen as cg
|
||||||
|
import esphome.config_validation as cv
|
||||||
|
from esphome.const import CONF_ID
|
||||||
|
|
||||||
|
AUTO_LOAD = ["gpio_expander"]
|
||||||
|
|
||||||
|
gpio_expander_test_component_uint16_ns = cg.esphome_ns.namespace(
|
||||||
|
"gpio_expander_test_component_uint16"
|
||||||
|
)
|
||||||
|
|
||||||
|
GPIOExpanderTestUint16Component = gpio_expander_test_component_uint16_ns.class_(
|
||||||
|
"GPIOExpanderTestUint16Component", cg.Component
|
||||||
|
)
|
||||||
|
|
||||||
|
CONFIG_SCHEMA = cv.Schema(
|
||||||
|
{
|
||||||
|
cv.GenerateID(): cv.declare_id(GPIOExpanderTestUint16Component),
|
||||||
|
}
|
||||||
|
).extend(cv.COMPONENT_SCHEMA)
|
||||||
|
|
||||||
|
|
||||||
|
async def to_code(config):
|
||||||
|
var = cg.new_Pvariable(config[CONF_ID])
|
||||||
|
await cg.register_component(var, config)
|
@@ -0,0 +1,43 @@
|
|||||||
|
#include "gpio_expander_test_component_uint16.h"
|
||||||
|
#include "esphome/core/log.h"
|
||||||
|
|
||||||
|
namespace esphome::gpio_expander_test_component_uint16 {
|
||||||
|
|
||||||
|
static const char *const TAG = "gpio_expander_test_uint16";
|
||||||
|
|
||||||
|
void GPIOExpanderTestUint16Component::setup() {
|
||||||
|
ESP_LOGD(TAG, "Testing uint16_t bank (single 16-pin bank)");
|
||||||
|
|
||||||
|
// Test reading all 16 pins - first should trigger hw read, rest use cache
|
||||||
|
for (uint8_t pin = 0; pin < 16; pin++) {
|
||||||
|
this->digital_read(pin);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reset cache and test specific reads
|
||||||
|
ESP_LOGD(TAG, "Resetting cache for uint16_t test");
|
||||||
|
this->reset_pin_cache_();
|
||||||
|
|
||||||
|
// First read triggers hw for entire bank
|
||||||
|
this->digital_read(5);
|
||||||
|
// These should all use cache since they're in the same bank
|
||||||
|
this->digital_read(10);
|
||||||
|
this->digital_read(15);
|
||||||
|
this->digital_read(0);
|
||||||
|
|
||||||
|
ESP_LOGD(TAG, "DONE_UINT16");
|
||||||
|
}
|
||||||
|
|
||||||
|
bool GPIOExpanderTestUint16Component::digital_read_hw(uint8_t pin) {
|
||||||
|
ESP_LOGD(TAG, "uint16_digital_read_hw pin=%d", pin);
|
||||||
|
// In a real component, this would read from I2C/SPI into internal state
|
||||||
|
// For testing, we just return true to indicate successful read
|
||||||
|
return true; // Return true to indicate successful read
|
||||||
|
}
|
||||||
|
|
||||||
|
bool GPIOExpanderTestUint16Component::digital_read_cache(uint8_t pin) {
|
||||||
|
ESP_LOGD(TAG, "uint16_digital_read_cache pin=%d", pin);
|
||||||
|
// Return the actual pin state from our test pattern
|
||||||
|
return (this->test_state_ >> pin) & 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace esphome::gpio_expander_test_component_uint16
|
@@ -0,0 +1,23 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include "esphome/components/gpio_expander/cached_gpio.h"
|
||||||
|
#include "esphome/core/component.h"
|
||||||
|
|
||||||
|
namespace esphome::gpio_expander_test_component_uint16 {
|
||||||
|
|
||||||
|
// Test component using uint16_t bank type (single 16-pin bank)
|
||||||
|
class GPIOExpanderTestUint16Component : public Component,
|
||||||
|
public esphome::gpio_expander::CachedGpioExpander<uint16_t, 16> {
|
||||||
|
public:
|
||||||
|
void setup() override;
|
||||||
|
|
||||||
|
protected:
|
||||||
|
bool digital_read_hw(uint8_t pin) override;
|
||||||
|
bool digital_read_cache(uint8_t pin) override;
|
||||||
|
void digital_write_hw(uint8_t pin, bool value) override{};
|
||||||
|
|
||||||
|
private:
|
||||||
|
uint16_t test_state_{0xAAAA}; // Test pattern: alternating bits
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace esphome::gpio_expander_test_component_uint16
|
@@ -12,6 +12,10 @@ external_components:
|
|||||||
- source:
|
- source:
|
||||||
type: local
|
type: local
|
||||||
path: EXTERNAL_COMPONENT_PATH
|
path: EXTERNAL_COMPONENT_PATH
|
||||||
components: [gpio_expander_test_component]
|
components: [gpio_expander_test_component, gpio_expander_test_component_uint16]
|
||||||
|
|
||||||
|
# Test with uint8_t (multiple banks)
|
||||||
gpio_expander_test_component:
|
gpio_expander_test_component:
|
||||||
|
|
||||||
|
# Test with uint16_t (single bank)
|
||||||
|
gpio_expander_test_component_uint16:
|
||||||
|
@@ -30,9 +30,15 @@ async def test_gpio_expander_cache(
|
|||||||
|
|
||||||
logs_done = asyncio.Event()
|
logs_done = asyncio.Event()
|
||||||
|
|
||||||
# Patterns to match in logs
|
# Patterns to match in logs - match any variation of digital_read
|
||||||
digital_read_hw_pattern = re.compile(r"digital_read_hw pin=(\d+)")
|
read_hw_pattern = re.compile(r"(?:uint16_)?digital_read_hw pin=(\d+)")
|
||||||
digital_read_cache_pattern = re.compile(r"digital_read_cache pin=(\d+)")
|
read_cache_pattern = re.compile(r"(?:uint16_)?digital_read_cache pin=(\d+)")
|
||||||
|
|
||||||
|
# Keep specific patterns for building the expected order
|
||||||
|
digital_read_hw_pattern = re.compile(r"^digital_read_hw pin=(\d+)")
|
||||||
|
digital_read_cache_pattern = re.compile(r"^digital_read_cache pin=(\d+)")
|
||||||
|
uint16_read_hw_pattern = re.compile(r"^uint16_digital_read_hw pin=(\d+)")
|
||||||
|
uint16_read_cache_pattern = re.compile(r"^uint16_digital_read_cache pin=(\d+)")
|
||||||
|
|
||||||
# ensure logs are in the expected order
|
# ensure logs are in the expected order
|
||||||
log_order = [
|
log_order = [
|
||||||
@@ -59,6 +65,17 @@ async def test_gpio_expander_cache(
|
|||||||
(digital_read_cache_pattern, 14),
|
(digital_read_cache_pattern, 14),
|
||||||
(digital_read_hw_pattern, 14),
|
(digital_read_hw_pattern, 14),
|
||||||
(digital_read_cache_pattern, 14),
|
(digital_read_cache_pattern, 14),
|
||||||
|
# uint16_t component tests (single bank of 16 pins)
|
||||||
|
(uint16_read_hw_pattern, 0), # First pin triggers hw read
|
||||||
|
[
|
||||||
|
(uint16_read_cache_pattern, i) for i in range(0, 16)
|
||||||
|
], # All 16 pins return via cache
|
||||||
|
# After cache reset
|
||||||
|
(uint16_read_hw_pattern, 5), # First read after reset triggers hw
|
||||||
|
(uint16_read_cache_pattern, 5),
|
||||||
|
(uint16_read_cache_pattern, 10), # These use cache (same bank)
|
||||||
|
(uint16_read_cache_pattern, 15),
|
||||||
|
(uint16_read_cache_pattern, 0),
|
||||||
]
|
]
|
||||||
# Flatten the log order for easier processing
|
# Flatten the log order for easier processing
|
||||||
log_order: list[tuple[re.Pattern, int]] = [
|
log_order: list[tuple[re.Pattern, int]] = [
|
||||||
@@ -77,17 +94,22 @@ async def test_gpio_expander_cache(
|
|||||||
|
|
||||||
clean_line = re.sub(r"\x1b\[[0-9;]*m", "", line)
|
clean_line = re.sub(r"\x1b\[[0-9;]*m", "", line)
|
||||||
|
|
||||||
if "digital_read" in clean_line:
|
# Extract just the log message part (after the log level)
|
||||||
|
msg = clean_line.split(": ", 1)[-1] if ": " in clean_line else clean_line
|
||||||
|
|
||||||
|
# Check if this line contains a read operation we're tracking
|
||||||
|
if read_hw_pattern.search(msg) or read_cache_pattern.search(msg):
|
||||||
if index >= len(log_order):
|
if index >= len(log_order):
|
||||||
print(f"Received unexpected log line: {clean_line}")
|
print(f"Received unexpected log line: {msg}")
|
||||||
logs_done.set()
|
logs_done.set()
|
||||||
return
|
return
|
||||||
|
|
||||||
pattern, expected_pin = log_order[index]
|
pattern, expected_pin = log_order[index]
|
||||||
match = pattern.search(clean_line)
|
match = pattern.search(msg)
|
||||||
|
|
||||||
if not match:
|
if not match:
|
||||||
print(f"Log line did not match next expected pattern: {clean_line}")
|
print(f"Log line did not match next expected pattern: {msg}")
|
||||||
|
print(f"Expected pattern: {pattern.pattern}")
|
||||||
logs_done.set()
|
logs_done.set()
|
||||||
return
|
return
|
||||||
|
|
||||||
@@ -99,9 +121,10 @@ async def test_gpio_expander_cache(
|
|||||||
|
|
||||||
index += 1
|
index += 1
|
||||||
|
|
||||||
elif "DONE" in clean_line:
|
elif "DONE_UINT16" in clean_line:
|
||||||
# Check if we reached the end of the expected log entries
|
# uint16 component is done, check if we've seen all expected logs
|
||||||
logs_done.set()
|
if index == len(log_order):
|
||||||
|
logs_done.set()
|
||||||
|
|
||||||
# Run with log monitoring
|
# Run with log monitoring
|
||||||
async with (
|
async with (
|
||||||
|
Reference in New Issue
Block a user