diff --git a/esphome/components/api/__init__.py b/esphome/components/api/__init__.py index 5fb84d3c21..b120503a2e 100644 --- a/esphome/components/api/__init__.py +++ b/esphome/components/api/__init__.py @@ -193,6 +193,7 @@ async def to_code(config): if key := encryption_config.get(CONF_KEY): decoded = base64.b64decode(key) cg.add(var.set_noise_psk(list(decoded))) + cg.add_define("USE_API_NOISE_PSK_FROM_YAML") else: # No key provided, but encryption desired # This will allow a plaintext client to provide a noise key, diff --git a/esphome/components/api/api_server.cpp b/esphome/components/api/api_server.cpp index 1f38f4a31a..a12cf13ce2 100644 --- a/esphome/components/api/api_server.cpp +++ b/esphome/components/api/api_server.cpp @@ -37,12 +37,14 @@ void APIServer::setup() { this->noise_pref_ = global_preferences->make_preference(hash, true); +#ifndef USE_API_NOISE_PSK_FROM_YAML + // Only load saved PSK if not set from YAML SavedNoisePsk noise_pref_saved{}; if (this->noise_pref_.load(&noise_pref_saved)) { ESP_LOGD(TAG, "Loaded saved Noise PSK"); - this->set_noise_psk(noise_pref_saved.psk); } +#endif #endif // Schedule reboot if no clients connect within timeout @@ -409,6 +411,12 @@ void APIServer::set_reboot_timeout(uint32_t reboot_timeout) { this->reboot_timeo #ifdef USE_API_NOISE bool APIServer::save_noise_psk(psk_t psk, bool make_active) { +#ifdef USE_API_NOISE_PSK_FROM_YAML + // When PSK is set from YAML, this function should never be called + // but if it is, reject the change + ESP_LOGW(TAG, "Key set in YAML"); + return false; +#else auto &old_psk = this->noise_ctx_->get_psk(); if (std::equal(old_psk.begin(), old_psk.end(), psk.begin())) { ESP_LOGW(TAG, "New PSK matches old"); @@ -437,6 +445,7 @@ bool APIServer::save_noise_psk(psk_t psk, bool make_active) { }); } return true; +#endif } #endif diff --git a/tests/integration/fixtures/noise_encryption_key_protection.yaml b/tests/integration/fixtures/noise_encryption_key_protection.yaml new file mode 100644 index 0000000000..3ce84cd373 --- /dev/null +++ b/tests/integration/fixtures/noise_encryption_key_protection.yaml @@ -0,0 +1,10 @@ +esphome: + name: noise-key-test + +host: + +api: + encryption: + key: "zX9/JHxMKwpP0jUGsF0iESCm1wRvNgR6NkKVOhn7kSs=" + +logger: diff --git a/tests/integration/test_noise_encryption_key_protection.py b/tests/integration/test_noise_encryption_key_protection.py new file mode 100644 index 0000000000..03c43ca8d3 --- /dev/null +++ b/tests/integration/test_noise_encryption_key_protection.py @@ -0,0 +1,51 @@ +"""Integration test for noise encryption key protection from YAML.""" + +from __future__ import annotations + +import base64 + +from aioesphomeapi import InvalidEncryptionKeyAPIError +import pytest + +from .types import APIClientConnectedFactory, RunCompiledFunction + + +@pytest.mark.asyncio +async def test_noise_encryption_key_protection( + yaml_config: str, + run_compiled: RunCompiledFunction, + api_client_connected: APIClientConnectedFactory, +) -> None: + """Test that noise encryption key set in YAML cannot be changed via API.""" + # The key that's set in the YAML fixture + noise_psk = "zX9/JHxMKwpP0jUGsF0iESCm1wRvNgR6NkKVOhn7kSs=" + + # Keep ESPHome process running throughout all tests + async with run_compiled(yaml_config): + # First connection - test key change attempt + async with api_client_connected(noise_psk=noise_psk) as client: + # Verify connection is established + device_info = await client.device_info() + assert device_info is not None + + # Try to set a new encryption key via API + new_key = base64.b64encode( + b"x" * 32 + ) # Valid 32-byte key in base64 as bytes + + # This should fail since key was set in YAML + success = await client.noise_encryption_set_key(new_key) + assert success is False + + # Reconnect with the original key to verify it still works + async with api_client_connected(noise_psk=noise_psk) as client: + # Verify connection is still successful with original key + device_info = await client.device_info() + assert device_info is not None + assert device_info.name == "noise-key-test" + + # Verify that connecting with a wrong key fails + wrong_key = base64.b64encode(b"y" * 32).decode() # Different key + with pytest.raises(InvalidEncryptionKeyAPIError): + async with api_client_connected(noise_psk=wrong_key) as client: + await client.device_info()