From abf522bbb9edb4ede8141628cfc28dfd911817ef Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Fri, 26 Sep 2025 17:50:27 -0500 Subject: [PATCH] [ota] Add SHA256 password authentication with backward compatibility (#10809) Co-authored-by: Jesse Hills <3060199+jesserockz@users.noreply.github.com> --- esphome/components/esphome/ota/__init__.py | 23 +- .../components/esphome/ota/ota_esphome.cpp | 199 +++-- esphome/components/esphome/ota/ota_esphome.h | 6 + esphome/components/ota/ota_backend.h | 1 + esphome/core/defines.h | 2 + esphome/espota2.py | 129 ++- tests/unit_tests/test_espota2.py | 738 ++++++++++++++++++ 7 files changed, 1022 insertions(+), 76 deletions(-) create mode 100644 tests/unit_tests/test_espota2.py diff --git a/esphome/components/esphome/ota/__init__.py b/esphome/components/esphome/ota/__init__.py index 93216f9425..e6f249e021 100644 --- a/esphome/components/esphome/ota/__init__.py +++ b/esphome/components/esphome/ota/__init__.py @@ -16,7 +16,7 @@ from esphome.const import ( CONF_SAFE_MODE, CONF_VERSION, ) -from esphome.core import coroutine_with_priority +from esphome.core import CORE, coroutine_with_priority from esphome.coroutine import CoroPriority import esphome.final_validate as fv @@ -24,9 +24,22 @@ _LOGGER = logging.getLogger(__name__) CODEOWNERS = ["@esphome/core"] -AUTO_LOAD = ["md5", "socket"] DEPENDENCIES = ["network"] + +def supports_sha256() -> bool: + """Check if the current platform supports SHA256 for OTA authentication.""" + return bool(CORE.is_esp32 or CORE.is_esp8266 or CORE.is_rp2040 or CORE.is_libretiny) + + +def AUTO_LOAD() -> list[str]: + """Conditionally auto-load sha256 only on platforms that support it.""" + base_components = ["md5", "socket"] + if supports_sha256(): + return base_components + ["sha256"] + return base_components + + esphome = cg.esphome_ns.namespace("esphome") ESPHomeOTAComponent = esphome.class_("ESPHomeOTAComponent", OTAComponent) @@ -126,9 +139,15 @@ FINAL_VALIDATE_SCHEMA = ota_esphome_final_validate async def to_code(config): var = cg.new_Pvariable(config[CONF_ID]) cg.add(var.set_port(config[CONF_PORT])) + if CONF_PASSWORD in config: cg.add(var.set_auth_password(config[CONF_PASSWORD])) cg.add_define("USE_OTA_PASSWORD") + # Only include hash algorithms when password is configured + cg.add_define("USE_OTA_MD5") + # Only include SHA256 support on platforms that have it + if supports_sha256(): + cg.add_define("USE_OTA_SHA256") cg.add_define("USE_OTA_VERSION", config[CONF_VERSION]) await cg.register_component(var, config) diff --git a/esphome/components/esphome/ota/ota_esphome.cpp b/esphome/components/esphome/ota/ota_esphome.cpp index 6654ef8748..6ffeeedb1a 100644 --- a/esphome/components/esphome/ota/ota_esphome.cpp +++ b/esphome/components/esphome/ota/ota_esphome.cpp @@ -1,6 +1,11 @@ #include "ota_esphome.h" #ifdef USE_OTA +#ifdef USE_OTA_MD5 #include "esphome/components/md5/md5.h" +#endif +#ifdef USE_OTA_SHA256 +#include "esphome/components/sha256/sha256.h" +#endif #include "esphome/components/network/util.h" #include "esphome/components/ota/ota_backend.h" #include "esphome/components/ota/ota_backend_arduino_esp32.h" @@ -10,6 +15,7 @@ #include "esphome/components/ota/ota_backend_esp_idf.h" #include "esphome/core/application.h" #include "esphome/core/hal.h" +#include "esphome/core/helpers.h" #include "esphome/core/log.h" #include "esphome/core/util.h" @@ -95,6 +101,15 @@ void ESPHomeOTAComponent::loop() { } static const uint8_t FEATURE_SUPPORTS_COMPRESSION = 0x01; +#ifdef USE_OTA_SHA256 +static const uint8_t FEATURE_SUPPORTS_SHA256_AUTH = 0x02; +#endif + +// Temporary flag to allow MD5 downgrade for ~3 versions (until 2026.1.0) +// This allows users to downgrade via OTA if they encounter issues after updating. +// Without this, users would need to do a serial flash to downgrade. +// TODO: Remove this flag and all associated code in 2026.1.0 +#define ALLOW_OTA_DOWNGRADE_MD5 void ESPHomeOTAComponent::handle_handshake_() { /// Handle the initial OTA handshake. @@ -225,57 +240,67 @@ void ESPHomeOTAComponent::handle_data_() { #ifdef USE_OTA_PASSWORD if (!this->password_.empty()) { - buf[0] = ota::OTA_RESPONSE_REQUEST_AUTH; - this->writeall_(buf, 1); - md5::MD5Digest md5{}; - md5.init(); - sprintf(sbuf, "%08" PRIx32, random_uint32()); - md5.add(sbuf, 8); - md5.calculate(); - md5.get_hex(sbuf); - ESP_LOGV(TAG, "Auth: Nonce is %s", sbuf); + bool auth_success = false; - // Send nonce, 32 bytes hex MD5 - if (!this->writeall_(reinterpret_cast(sbuf), 32)) { - ESP_LOGW(TAG, "Auth: Writing nonce failed"); +#ifdef USE_OTA_SHA256 + // SECURITY HARDENING: Prefer SHA256 authentication on platforms that support it. + // + // This is a hardening measure to prevent future downgrade attacks where an attacker + // could force the use of MD5 authentication by manipulating the feature flags. + // + // While MD5 is currently still acceptable for our OTA authentication use case + // (where the password is a shared secret and we're only authenticating, not + // encrypting), at some point in the future MD5 will likely become so weak that + // it could be practically attacked. + // + // We enforce SHA256 now on capable platforms because: + // 1. We can't retroactively update device firmware in the field + // 2. Clients (like esphome CLI) can always be updated to support SHA256 + // 3. This prevents any possibility of downgrade attacks in the future + // + // Devices that don't support SHA256 (due to platform limitations) will + // continue to use MD5 as their only option (see #else branch below). + + bool client_supports_sha256 = (ota_features & FEATURE_SUPPORTS_SHA256_AUTH) != 0; + +#ifdef ALLOW_OTA_DOWNGRADE_MD5 + // Temporary compatibility mode: Allow MD5 for ~3 versions to enable OTA downgrades + // This prevents users from being locked out if they need to downgrade after updating + // TODO: Remove this entire ifdef block in 2026.1.0 + if (client_supports_sha256) { + sha256::SHA256 sha_hasher; + auth_success = this->perform_hash_auth_(&sha_hasher, this->password_, ota::OTA_RESPONSE_REQUEST_SHA256_AUTH, + LOG_STR("SHA256"), sbuf); + } else { +#ifdef USE_OTA_MD5 + ESP_LOGW(TAG, "Using MD5 auth for compatibility (deprecated)"); + md5::MD5Digest md5_hasher; + auth_success = + this->perform_hash_auth_(&md5_hasher, this->password_, ota::OTA_RESPONSE_REQUEST_AUTH, LOG_STR("MD5"), sbuf); +#endif // USE_OTA_MD5 + } +#else + // Strict mode: SHA256 required on capable platforms (future default) + if (!client_supports_sha256) { + ESP_LOGW(TAG, "Client requires SHA256"); + error_code = ota::OTA_RESPONSE_ERROR_AUTH_INVALID; goto error; // NOLINT(cppcoreguidelines-avoid-goto) } + sha256::SHA256 sha_hasher; + auth_success = this->perform_hash_auth_(&sha_hasher, this->password_, ota::OTA_RESPONSE_REQUEST_SHA256_AUTH, + LOG_STR("SHA256"), sbuf); +#endif // ALLOW_OTA_DOWNGRADE_MD5 +#else + // Platform only supports MD5 - use it as the only available option + // This is not a security downgrade as the platform cannot support SHA256 +#ifdef USE_OTA_MD5 + md5::MD5Digest md5_hasher; + auth_success = + this->perform_hash_auth_(&md5_hasher, this->password_, ota::OTA_RESPONSE_REQUEST_AUTH, LOG_STR("MD5"), sbuf); +#endif // USE_OTA_MD5 +#endif // USE_OTA_SHA256 - // prepare challenge - md5.init(); - md5.add(this->password_.c_str(), this->password_.length()); - // add nonce - md5.add(sbuf, 32); - - // Receive cnonce, 32 bytes hex MD5 - if (!this->readall_(buf, 32)) { - ESP_LOGW(TAG, "Auth: Reading cnonce failed"); - goto error; // NOLINT(cppcoreguidelines-avoid-goto) - } - sbuf[32] = '\0'; - ESP_LOGV(TAG, "Auth: CNonce is %s", sbuf); - // add cnonce - md5.add(sbuf, 32); - - // calculate result - md5.calculate(); - md5.get_hex(sbuf); - ESP_LOGV(TAG, "Auth: Result is %s", sbuf); - - // Receive result, 32 bytes hex MD5 - if (!this->readall_(buf + 64, 32)) { - ESP_LOGW(TAG, "Auth: Reading response failed"); - goto error; // NOLINT(cppcoreguidelines-avoid-goto) - } - sbuf[64 + 32] = '\0'; - ESP_LOGV(TAG, "Auth: Response is %s", sbuf + 64); - - bool matches = true; - for (uint8_t i = 0; i < 32; i++) - matches = matches && buf[i] == buf[64 + i]; - - if (!matches) { - ESP_LOGW(TAG, "Auth failed! Passwords do not match"); + if (!auth_success) { error_code = ota::OTA_RESPONSE_ERROR_AUTH_INVALID; goto error; // NOLINT(cppcoreguidelines-avoid-goto) } @@ -499,5 +524,85 @@ void ESPHomeOTAComponent::yield_and_feed_watchdog_() { delay(1); } +#ifdef USE_OTA_PASSWORD +void ESPHomeOTAComponent::log_auth_warning_(const LogString *action, const LogString *hash_name) { + ESP_LOGW(TAG, "Auth: %s %s failed", LOG_STR_ARG(action), LOG_STR_ARG(hash_name)); +} + +// Non-template function definition to reduce binary size +bool ESPHomeOTAComponent::perform_hash_auth_(HashBase *hasher, const std::string &password, uint8_t auth_request, + const LogString *name, char *buf) { + // Get sizes from the hasher + const size_t hex_size = hasher->get_size() * 2; // Hex is twice the byte size + const size_t nonce_len = hasher->get_size() / 4; // Nonce is 1/4 of hash size in bytes + + // Use the provided buffer for all hex operations + + // Small stack buffer for nonce seed bytes + uint8_t nonce_bytes[8]; // Max 8 bytes (2 x uint32_t for SHA256) + + // Send auth request type + this->writeall_(&auth_request, 1); + + hasher->init(); + + // Generate nonce seed bytes using random_bytes + if (!random_bytes(nonce_bytes, nonce_len)) { + this->log_auth_warning_(LOG_STR("Random bytes generation failed"), name); + return false; + } + hasher->add(nonce_bytes, nonce_len); + hasher->calculate(); + + // Generate and send nonce + hasher->get_hex(buf); + buf[hex_size] = '\0'; + ESP_LOGV(TAG, "Auth: %s Nonce is %s", LOG_STR_ARG(name), buf); + + if (!this->writeall_(reinterpret_cast(buf), hex_size)) { + this->log_auth_warning_(LOG_STR("Writing nonce"), name); + return false; + } + + // Start challenge: password + nonce + hasher->init(); + hasher->add(password.c_str(), password.length()); + hasher->add(buf, hex_size); + + // Read cnonce and add to hash + if (!this->readall_(reinterpret_cast(buf), hex_size)) { + this->log_auth_warning_(LOG_STR("Reading cnonce"), name); + return false; + } + buf[hex_size] = '\0'; + ESP_LOGV(TAG, "Auth: %s CNonce is %s", LOG_STR_ARG(name), buf); + + hasher->add(buf, hex_size); + hasher->calculate(); + + // Log expected result (digest is already in hasher) + hasher->get_hex(buf); + buf[hex_size] = '\0'; + ESP_LOGV(TAG, "Auth: %s Result is %s", LOG_STR_ARG(name), buf); + + // Read response into the buffer + if (!this->readall_(reinterpret_cast(buf), hex_size)) { + this->log_auth_warning_(LOG_STR("Reading response"), name); + return false; + } + buf[hex_size] = '\0'; + ESP_LOGV(TAG, "Auth: %s Response is %s", LOG_STR_ARG(name), buf); + + // Compare response directly with digest in hasher + bool matches = hasher->equals_hex(buf); + + if (!matches) { + this->log_auth_warning_(LOG_STR("Password mismatch"), name); + } + + return matches; +} +#endif // USE_OTA_PASSWORD + } // namespace esphome #endif diff --git a/esphome/components/esphome/ota/ota_esphome.h b/esphome/components/esphome/ota/ota_esphome.h index f5a3e43ae3..5bacb60706 100644 --- a/esphome/components/esphome/ota/ota_esphome.h +++ b/esphome/components/esphome/ota/ota_esphome.h @@ -7,6 +7,7 @@ #include "esphome/core/helpers.h" #include "esphome/core/log.h" #include "esphome/core/preferences.h" +#include "esphome/core/hash_base.h" namespace esphome { @@ -30,6 +31,11 @@ class ESPHomeOTAComponent : public ota::OTAComponent { protected: void handle_handshake_(); void handle_data_(); +#ifdef USE_OTA_PASSWORD + bool perform_hash_auth_(HashBase *hasher, const std::string &password, uint8_t auth_request, const LogString *name, + char *buf); + void log_auth_warning_(const LogString *action, const LogString *hash_name); +#endif // USE_OTA_PASSWORD bool readall_(uint8_t *buf, size_t len); bool writeall_(const uint8_t *buf, size_t len); void log_socket_error_(const LogString *msg); diff --git a/esphome/components/ota/ota_backend.h b/esphome/components/ota/ota_backend.h index 372f24df5e..64ee0b9f7c 100644 --- a/esphome/components/ota/ota_backend.h +++ b/esphome/components/ota/ota_backend.h @@ -14,6 +14,7 @@ namespace ota { enum OTAResponseTypes { OTA_RESPONSE_OK = 0x00, OTA_RESPONSE_REQUEST_AUTH = 0x01, + OTA_RESPONSE_REQUEST_SHA256_AUTH = 0x02, OTA_RESPONSE_HEADER_OK = 0x40, OTA_RESPONSE_AUTH_OK = 0x41, diff --git a/esphome/core/defines.h b/esphome/core/defines.h index ef93fd0b65..067ef4a4d0 100644 --- a/esphome/core/defines.h +++ b/esphome/core/defines.h @@ -123,7 +123,9 @@ #define USE_ONLINE_IMAGE_PNG_SUPPORT #define USE_ONLINE_IMAGE_JPEG_SUPPORT #define USE_OTA +#define USE_OTA_MD5 #define USE_OTA_PASSWORD +#define USE_OTA_SHA256 #define USE_OTA_STATE_CALLBACK #define USE_OTA_VERSION 2 #define USE_TIME_TIMEZONE diff --git a/esphome/espota2.py b/esphome/espota2.py index 84f53377e7..2712d00127 100644 --- a/esphome/espota2.py +++ b/esphome/espota2.py @@ -1,5 +1,6 @@ from __future__ import annotations +from collections.abc import Callable import gzip import hashlib import io @@ -9,12 +10,14 @@ import random import socket import sys import time +from typing import Any from esphome.core import EsphomeError from esphome.helpers import resolve_ip_address RESPONSE_OK = 0x00 RESPONSE_REQUEST_AUTH = 0x01 +RESPONSE_REQUEST_SHA256_AUTH = 0x02 RESPONSE_HEADER_OK = 0x40 RESPONSE_AUTH_OK = 0x41 @@ -45,6 +48,7 @@ OTA_VERSION_2_0 = 2 MAGIC_BYTES = [0x6C, 0x26, 0xF7, 0x5C, 0x45] FEATURE_SUPPORTS_COMPRESSION = 0x01 +FEATURE_SUPPORTS_SHA256_AUTH = 0x02 UPLOAD_BLOCK_SIZE = 8192 @@ -52,6 +56,12 @@ UPLOAD_BUFFER_SIZE = UPLOAD_BLOCK_SIZE * 8 _LOGGER = logging.getLogger(__name__) +# Authentication method lookup table: response -> (hash_func, nonce_size, name) +_AUTH_METHODS: dict[int, tuple[Callable[..., Any], int, str]] = { + RESPONSE_REQUEST_SHA256_AUTH: (hashlib.sha256, 64, "SHA256"), + RESPONSE_REQUEST_AUTH: (hashlib.md5, 32, "MD5"), +} + class ProgressBar: def __init__(self): @@ -81,18 +91,43 @@ class OTAError(EsphomeError): pass -def recv_decode(sock, amount, decode=True): +def recv_decode( + sock: socket.socket, amount: int, decode: bool = True +) -> bytes | list[int]: + """Receive data from socket and optionally decode to list of integers. + + :param sock: Socket to receive data from. + :param amount: Number of bytes to receive. + :param decode: If True, convert bytes to list of integers, otherwise return raw bytes. + :return: List of integers if decode=True, otherwise raw bytes. + """ data = sock.recv(amount) if not decode: return data return list(data) -def receive_exactly(sock, amount, msg, expect, decode=True): - data = [] if decode else b"" +def receive_exactly( + sock: socket.socket, + amount: int, + msg: str, + expect: int | list[int] | None, + decode: bool = True, +) -> list[int] | bytes: + """Receive exactly the specified amount of data from socket with error checking. + + :param sock: Socket to receive data from. + :param amount: Exact number of bytes to receive. + :param msg: Description of what is being received for error messages. + :param expect: Expected response code(s) for validation, None to skip validation. + :param decode: If True, return list of integers, otherwise return raw bytes. + :return: List of integers if decode=True, otherwise raw bytes. + :raises OTAError: If receiving fails or response doesn't match expected. + """ + data: list[int] | bytes = [] if decode else b"" try: - data += recv_decode(sock, 1, decode=decode) + data += recv_decode(sock, 1, decode=decode) # type: ignore[operator] except OSError as err: raise OTAError(f"Error receiving acknowledge {msg}: {err}") from err @@ -104,13 +139,19 @@ def receive_exactly(sock, amount, msg, expect, decode=True): while len(data) < amount: try: - data += recv_decode(sock, amount - len(data), decode=decode) + data += recv_decode(sock, amount - len(data), decode=decode) # type: ignore[operator] except OSError as err: raise OTAError(f"Error receiving {msg}: {err}") from err return data -def check_error(data, expect): +def check_error(data: list[int] | bytes, expect: int | list[int] | None) -> None: + """Check response data for error codes and validate against expected response. + + :param data: Response data from device (first byte is the response code). + :param expect: Expected response code(s), None to skip validation. + :raises OTAError: If an error code is detected or response doesn't match expected. + """ if not expect: return dat = data[0] @@ -125,7 +166,7 @@ def check_error(data, expect): raise OTAError("Error: Authentication invalid. Is the password correct?") if dat == RESPONSE_ERROR_WRITING_FLASH: raise OTAError( - "Error: Wring OTA data to flash memory failed. See USB logs for more " + "Error: Writing OTA data to flash memory failed. See USB logs for more " "information." ) if dat == RESPONSE_ERROR_UPDATE_END: @@ -177,7 +218,16 @@ def check_error(data, expect): raise OTAError(f"Unexpected response from ESP: 0x{data[0]:02X}") -def send_check(sock, data, msg): +def send_check( + sock: socket.socket, data: list[int] | tuple[int, ...] | int | str | bytes, msg: str +) -> None: + """Send data to socket with error handling. + + :param sock: Socket to send data to. + :param data: Data to send (can be list/tuple of ints, single int, string, or bytes). + :param msg: Description of what is being sent for error messages. + :raises OTAError: If sending fails. + """ try: if isinstance(data, (list, tuple)): data = bytes(data) @@ -210,10 +260,14 @@ def perform_ota( f"Device uses unsupported OTA version {version}, this ESPHome supports {supported_versions}" ) - # Features - send_check(sock, FEATURE_SUPPORTS_COMPRESSION, "features") + # Features - send both compression and SHA256 auth support + features_to_send = FEATURE_SUPPORTS_COMPRESSION | FEATURE_SUPPORTS_SHA256_AUTH + send_check(sock, features_to_send, "features") features = receive_exactly( - sock, 1, "features", [RESPONSE_HEADER_OK, RESPONSE_SUPPORTS_COMPRESSION] + sock, + 1, + "features", + None, # Accept any response )[0] if features == RESPONSE_SUPPORTS_COMPRESSION: @@ -222,31 +276,52 @@ def perform_ota( else: upload_contents = file_contents - (auth,) = receive_exactly( - sock, 1, "auth", [RESPONSE_REQUEST_AUTH, RESPONSE_AUTH_OK] - ) - if auth == RESPONSE_REQUEST_AUTH: + def perform_auth( + sock: socket.socket, + password: str, + hash_func: Callable[..., Any], + nonce_size: int, + hash_name: str, + ) -> None: + """Perform challenge-response authentication using specified hash algorithm.""" if not password: raise OTAError("ESP requests password, but no password given!") - nonce = receive_exactly( - sock, 32, "authentication nonce", [], decode=False - ).decode() - _LOGGER.debug("Auth: Nonce is %s", nonce) - cnonce = hashlib.md5(str(random.random()).encode()).hexdigest() - _LOGGER.debug("Auth: CNonce is %s", cnonce) + + nonce_bytes = receive_exactly( + sock, nonce_size, f"{hash_name} authentication nonce", [], decode=False + ) + assert isinstance(nonce_bytes, bytes) + nonce = nonce_bytes.decode() + _LOGGER.debug("Auth: %s Nonce is %s", hash_name, nonce) + + # Generate cnonce + cnonce = hash_func(str(random.random()).encode()).hexdigest() + _LOGGER.debug("Auth: %s CNonce is %s", hash_name, cnonce) send_check(sock, cnonce, "auth cnonce") - result_md5 = hashlib.md5() - result_md5.update(password.encode("utf-8")) - result_md5.update(nonce.encode()) - result_md5.update(cnonce.encode()) - result = result_md5.hexdigest() - _LOGGER.debug("Auth: Result is %s", result) + # Calculate challenge response + hasher = hash_func() + hasher.update(password.encode("utf-8")) + hasher.update(nonce.encode()) + hasher.update(cnonce.encode()) + result = hasher.hexdigest() + _LOGGER.debug("Auth: %s Result is %s", hash_name, result) send_check(sock, result, "auth result") receive_exactly(sock, 1, "auth result", RESPONSE_AUTH_OK) + (auth,) = receive_exactly( + sock, + 1, + "auth", + [RESPONSE_REQUEST_AUTH, RESPONSE_REQUEST_SHA256_AUTH, RESPONSE_AUTH_OK], + ) + + if auth != RESPONSE_AUTH_OK: + hash_func, nonce_size, hash_name = _AUTH_METHODS[auth] + perform_auth(sock, password, hash_func, nonce_size, hash_name) + # Set higher timeout during upload sock.settimeout(30.0) diff --git a/tests/unit_tests/test_espota2.py b/tests/unit_tests/test_espota2.py new file mode 100644 index 0000000000..bd1a6bde81 --- /dev/null +++ b/tests/unit_tests/test_espota2.py @@ -0,0 +1,738 @@ +"""Unit tests for esphome.espota2 module.""" + +from __future__ import annotations + +from collections.abc import Generator +import gzip +import hashlib +import io +from pathlib import Path +import socket +import struct +from unittest.mock import Mock, call, patch + +import pytest +from pytest import CaptureFixture + +from esphome import espota2 +from esphome.core import EsphomeError + +# Test constants +MOCK_RANDOM_VALUE = 0.123456 +MOCK_RANDOM_BYTES = b"0.123456" +MOCK_MD5_NONCE = b"12345678901234567890123456789012" # 32 char nonce for MD5 +MOCK_SHA256_NONCE = b"1234567890123456789012345678901234567890123456789012345678901234" # 64 char nonce for SHA256 + + +@pytest.fixture +def mock_socket() -> Mock: + """Create a mock socket for testing.""" + socket_mock = Mock() + socket_mock.close = Mock() + socket_mock.recv = Mock() + socket_mock.sendall = Mock() + socket_mock.settimeout = Mock() + socket_mock.connect = Mock() + socket_mock.setsockopt = Mock() + return socket_mock + + +@pytest.fixture +def mock_file() -> io.BytesIO: + """Create a mock firmware file for testing.""" + return io.BytesIO(b"firmware content here") + + +@pytest.fixture +def mock_time() -> Generator[None]: + """Mock time-related functions for consistent testing.""" + # Provide enough values for multiple calls (tests may call perform_ota multiple times) + with ( + patch("time.sleep"), + patch("time.perf_counter", side_effect=[0, 1, 0, 1, 0, 1]), + ): + yield + + +@pytest.fixture +def mock_random() -> Generator[Mock]: + """Mock random for predictable test values.""" + with patch("random.random", return_value=MOCK_RANDOM_VALUE) as mock_rand: + yield mock_rand + + +@pytest.fixture +def mock_resolve_ip() -> Generator[Mock]: + """Mock resolve_ip_address for testing.""" + with patch("esphome.espota2.resolve_ip_address") as mock: + mock.return_value = [ + (socket.AF_INET, socket.SOCK_STREAM, 0, "", ("192.168.1.100", 3232)) + ] + yield mock + + +@pytest.fixture +def mock_perform_ota() -> Generator[Mock]: + """Mock perform_ota function for testing.""" + with patch("esphome.espota2.perform_ota") as mock: + yield mock + + +@pytest.fixture +def mock_run_ota_impl() -> Generator[Mock]: + """Mock run_ota_impl_ function for testing.""" + with patch("esphome.espota2.run_ota_impl_") as mock: + mock.return_value = (0, "192.168.1.100") + yield mock + + +@pytest.fixture +def mock_socket_constructor(mock_socket: Mock) -> Generator[Mock]: + """Mock socket.socket constructor to return our mock socket.""" + with patch("socket.socket", return_value=mock_socket) as mock_constructor: + yield mock_constructor + + +def test_recv_decode_with_decode(mock_socket: Mock) -> None: + """Test recv_decode with decode=True returns list.""" + mock_socket.recv.return_value = b"\x01\x02\x03" + + result = espota2.recv_decode(mock_socket, 3, decode=True) + + assert result == [1, 2, 3] + mock_socket.recv.assert_called_once_with(3) + + +def test_recv_decode_without_decode(mock_socket: Mock) -> None: + """Test recv_decode with decode=False returns bytes.""" + mock_socket.recv.return_value = b"\x01\x02\x03" + + result = espota2.recv_decode(mock_socket, 3, decode=False) + + assert result == b"\x01\x02\x03" + mock_socket.recv.assert_called_once_with(3) + + +def test_receive_exactly_success(mock_socket: Mock) -> None: + """Test receive_exactly successfully receives expected data.""" + mock_socket.recv.side_effect = [b"\x00", b"\x01\x02"] + + result = espota2.receive_exactly(mock_socket, 3, "test", espota2.RESPONSE_OK) + + assert result == [0, 1, 2] + assert mock_socket.recv.call_count == 2 + + +def test_receive_exactly_with_error_response(mock_socket: Mock) -> None: + """Test receive_exactly raises OTAError on error response.""" + mock_socket.recv.return_value = bytes([espota2.RESPONSE_ERROR_AUTH_INVALID]) + + with pytest.raises(espota2.OTAError, match="Error auth:.*Authentication invalid"): + espota2.receive_exactly(mock_socket, 1, "auth", [espota2.RESPONSE_OK]) + + mock_socket.close.assert_called_once() + + +def test_receive_exactly_socket_error(mock_socket: Mock) -> None: + """Test receive_exactly handles socket errors.""" + mock_socket.recv.side_effect = OSError("Connection reset") + + with pytest.raises(espota2.OTAError, match="Error receiving acknowledge test"): + espota2.receive_exactly(mock_socket, 1, "test", espota2.RESPONSE_OK) + + +@pytest.mark.parametrize( + ("error_code", "expected_msg"), + [ + (espota2.RESPONSE_ERROR_MAGIC, "Error: Invalid magic byte"), + (espota2.RESPONSE_ERROR_UPDATE_PREPARE, "Error: Couldn't prepare flash memory"), + (espota2.RESPONSE_ERROR_AUTH_INVALID, "Error: Authentication invalid"), + ( + espota2.RESPONSE_ERROR_WRITING_FLASH, + "Error: Writing OTA data to flash memory failed", + ), + (espota2.RESPONSE_ERROR_UPDATE_END, "Error: Finishing update failed"), + ( + espota2.RESPONSE_ERROR_INVALID_BOOTSTRAPPING, + "Error: Please press the reset button", + ), + ( + espota2.RESPONSE_ERROR_WRONG_CURRENT_FLASH_CONFIG, + "Error: ESP has been flashed with wrong flash size", + ), + ( + espota2.RESPONSE_ERROR_WRONG_NEW_FLASH_CONFIG, + "Error: ESP does not have the requested flash size", + ), + ( + espota2.RESPONSE_ERROR_ESP8266_NOT_ENOUGH_SPACE, + "Error: ESP does not have enough space", + ), + ( + espota2.RESPONSE_ERROR_ESP32_NOT_ENOUGH_SPACE, + "Error: The OTA partition on the ESP is too small", + ), + ( + espota2.RESPONSE_ERROR_NO_UPDATE_PARTITION, + "Error: The OTA partition on the ESP couldn't be found", + ), + (espota2.RESPONSE_ERROR_MD5_MISMATCH, "Error: Application MD5 code mismatch"), + (espota2.RESPONSE_ERROR_UNKNOWN, "Unknown error from ESP"), + ], +) +def test_check_error_with_various_errors(error_code: int, expected_msg: str) -> None: + """Test check_error raises appropriate errors for different error codes.""" + with pytest.raises(espota2.OTAError, match=expected_msg): + espota2.check_error([error_code], [espota2.RESPONSE_OK]) + + +def test_check_error_unexpected_response() -> None: + """Test check_error raises error for unexpected response.""" + with pytest.raises(espota2.OTAError, match="Unexpected response from ESP: 0x7F"): + espota2.check_error([0x7F], [espota2.RESPONSE_OK, espota2.RESPONSE_AUTH_OK]) + + +def test_send_check_with_various_data_types(mock_socket: Mock) -> None: + """Test send_check handles different data types.""" + + # Test with list/tuple + espota2.send_check(mock_socket, [0x01, 0x02], "list") + mock_socket.sendall.assert_called_with(b"\x01\x02") + + # Test with int + espota2.send_check(mock_socket, 0x42, "int") + mock_socket.sendall.assert_called_with(b"\x42") + + # Test with string + espota2.send_check(mock_socket, "hello", "string") + mock_socket.sendall.assert_called_with(b"hello") + + # Test with bytes (should pass through) + espota2.send_check(mock_socket, b"\xaa\xbb", "bytes") + mock_socket.sendall.assert_called_with(b"\xaa\xbb") + + +def test_send_check_socket_error(mock_socket: Mock) -> None: + """Test send_check handles socket errors.""" + mock_socket.sendall.side_effect = OSError("Broken pipe") + + with pytest.raises(espota2.OTAError, match="Error sending test"): + espota2.send_check(mock_socket, b"data", "test") + + +@pytest.mark.usefixtures("mock_time") +def test_perform_ota_successful_md5_auth( + mock_socket: Mock, mock_file: io.BytesIO, mock_random: Mock +) -> None: + """Test successful OTA with MD5 authentication.""" + # Setup socket responses for recv calls + recv_responses = [ + bytes([espota2.RESPONSE_OK]), # First byte of version response + bytes([espota2.OTA_VERSION_2_0]), # Version number + bytes([espota2.RESPONSE_HEADER_OK]), # Features response + bytes([espota2.RESPONSE_REQUEST_AUTH]), # Auth request + MOCK_MD5_NONCE, # 32 char hex nonce + bytes([espota2.RESPONSE_AUTH_OK]), # Auth result + bytes([espota2.RESPONSE_UPDATE_PREPARE_OK]), # Binary size OK + bytes([espota2.RESPONSE_BIN_MD5_OK]), # MD5 checksum OK + bytes([espota2.RESPONSE_CHUNK_OK]), # Chunk OK + bytes([espota2.RESPONSE_RECEIVE_OK]), # Receive OK + bytes([espota2.RESPONSE_UPDATE_END_OK]), # Update end OK + ] + + mock_socket.recv.side_effect = recv_responses + + # Run OTA + espota2.perform_ota(mock_socket, "testpass", mock_file, "test.bin") + + # Verify magic bytes were sent + assert mock_socket.sendall.call_args_list[0] == call(bytes(espota2.MAGIC_BYTES)) + + # Verify features were sent (compression + SHA256 support) + assert mock_socket.sendall.call_args_list[1] == call( + bytes( + [ + espota2.FEATURE_SUPPORTS_COMPRESSION + | espota2.FEATURE_SUPPORTS_SHA256_AUTH + ] + ) + ) + + # Verify cnonce was sent (MD5 of random.random()) + cnonce = hashlib.md5(MOCK_RANDOM_BYTES).hexdigest() + assert mock_socket.sendall.call_args_list[2] == call(cnonce.encode()) + + # Verify auth result was computed correctly + expected_hash = hashlib.md5() + expected_hash.update(b"testpass") + expected_hash.update(MOCK_MD5_NONCE) + expected_hash.update(cnonce.encode()) + expected_result = expected_hash.hexdigest() + assert mock_socket.sendall.call_args_list[3] == call(expected_result.encode()) + + +@pytest.mark.usefixtures("mock_time") +def test_perform_ota_no_auth(mock_socket: Mock, mock_file: io.BytesIO) -> None: + """Test OTA without authentication.""" + recv_responses = [ + bytes([espota2.RESPONSE_OK]), # First byte of version response + bytes([espota2.OTA_VERSION_1_0]), # Version number + bytes([espota2.RESPONSE_HEADER_OK]), # Features response + bytes([espota2.RESPONSE_AUTH_OK]), # No auth required + bytes([espota2.RESPONSE_UPDATE_PREPARE_OK]), # Binary size OK + bytes([espota2.RESPONSE_BIN_MD5_OK]), # MD5 checksum OK + bytes([espota2.RESPONSE_RECEIVE_OK]), # Receive OK + bytes([espota2.RESPONSE_UPDATE_END_OK]), # Update end OK + ] + + mock_socket.recv.side_effect = recv_responses + + espota2.perform_ota(mock_socket, "", mock_file, "test.bin") + + # Should not send any auth-related data + auth_calls = [ + call + for call in mock_socket.sendall.call_args_list + if "cnonce" in str(call) or "result" in str(call) + ] + assert len(auth_calls) == 0 + + +@pytest.mark.usefixtures("mock_time") +def test_perform_ota_with_compression(mock_socket: Mock) -> None: + """Test OTA with compression support.""" + original_content = b"firmware" * 100 # Repeating content for compression + mock_file = io.BytesIO(original_content) + recv_responses = [ + bytes([espota2.RESPONSE_OK]), # First byte of version response + bytes([espota2.OTA_VERSION_2_0]), # Version number + bytes([espota2.RESPONSE_SUPPORTS_COMPRESSION]), # Device supports compression + bytes([espota2.RESPONSE_AUTH_OK]), # No auth required + bytes([espota2.RESPONSE_UPDATE_PREPARE_OK]), # Binary size OK + bytes([espota2.RESPONSE_BIN_MD5_OK]), # MD5 checksum OK + bytes([espota2.RESPONSE_CHUNK_OK]), # Chunk OK + bytes([espota2.RESPONSE_RECEIVE_OK]), # Receive OK + bytes([espota2.RESPONSE_UPDATE_END_OK]), # Update end OK + ] + + mock_socket.recv.side_effect = recv_responses + + espota2.perform_ota(mock_socket, "", mock_file, "test.bin") + + # Verify compressed content was sent + # Get the binary size that was sent (4 bytes after features) + size_bytes = mock_socket.sendall.call_args_list[2][0][0] + sent_size = struct.unpack(">I", size_bytes)[0] + + # Size should be less than original due to compression + assert sent_size < len(original_content) + + # Verify the content sent was gzipped + compressed = gzip.compress(original_content, compresslevel=9) + assert sent_size == len(compressed) + + +def test_perform_ota_auth_without_password(mock_socket: Mock) -> None: + """Test OTA fails when auth is required but no password provided.""" + mock_file = io.BytesIO(b"firmware") + + responses = [ + bytes([espota2.RESPONSE_OK, espota2.OTA_VERSION_2_0]), + bytes([espota2.RESPONSE_HEADER_OK]), + bytes([espota2.RESPONSE_REQUEST_AUTH]), + ] + + mock_socket.recv.side_effect = responses + + with pytest.raises( + espota2.OTAError, match="ESP requests password, but no password given" + ): + espota2.perform_ota(mock_socket, "", mock_file, "test.bin") + + +@pytest.mark.usefixtures("mock_time") +def test_perform_ota_md5_auth_wrong_password( + mock_socket: Mock, mock_file: io.BytesIO, mock_random: Mock +) -> None: + """Test OTA fails when MD5 authentication is rejected due to wrong password.""" + # Setup socket responses for recv calls + recv_responses = [ + bytes([espota2.RESPONSE_OK]), # First byte of version response + bytes([espota2.OTA_VERSION_2_0]), # Version number + bytes([espota2.RESPONSE_HEADER_OK]), # Features response + bytes([espota2.RESPONSE_REQUEST_AUTH]), # Auth request + MOCK_MD5_NONCE, # 32 char hex nonce + bytes([espota2.RESPONSE_ERROR_AUTH_INVALID]), # Auth rejected! + ] + + mock_socket.recv.side_effect = recv_responses + + with pytest.raises(espota2.OTAError, match="Error auth.*Authentication invalid"): + espota2.perform_ota(mock_socket, "wrongpassword", mock_file, "test.bin") + + # Verify the socket was closed after auth failure + mock_socket.close.assert_called() + + +@pytest.mark.usefixtures("mock_time") +def test_perform_ota_sha256_auth_wrong_password( + mock_socket: Mock, mock_file: io.BytesIO, mock_random: Mock +) -> None: + """Test OTA fails when SHA256 authentication is rejected due to wrong password.""" + # Setup socket responses for recv calls + recv_responses = [ + bytes([espota2.RESPONSE_OK]), # First byte of version response + bytes([espota2.OTA_VERSION_2_0]), # Version number + bytes([espota2.RESPONSE_HEADER_OK]), # Features response + bytes([espota2.RESPONSE_REQUEST_SHA256_AUTH]), # SHA256 Auth request + MOCK_SHA256_NONCE, # 64 char hex nonce + bytes([espota2.RESPONSE_ERROR_AUTH_INVALID]), # Auth rejected! + ] + + mock_socket.recv.side_effect = recv_responses + + with pytest.raises(espota2.OTAError, match="Error auth.*Authentication invalid"): + espota2.perform_ota(mock_socket, "wrongpassword", mock_file, "test.bin") + + # Verify the socket was closed after auth failure + mock_socket.close.assert_called() + + +def test_perform_ota_sha256_auth_without_password(mock_socket: Mock) -> None: + """Test OTA fails when SHA256 auth is required but no password provided.""" + mock_file = io.BytesIO(b"firmware") + + responses = [ + bytes([espota2.RESPONSE_OK, espota2.OTA_VERSION_2_0]), + bytes([espota2.RESPONSE_HEADER_OK]), + bytes([espota2.RESPONSE_REQUEST_SHA256_AUTH]), + ] + + mock_socket.recv.side_effect = responses + + with pytest.raises( + espota2.OTAError, match="ESP requests password, but no password given" + ): + espota2.perform_ota(mock_socket, "", mock_file, "test.bin") + + +def test_perform_ota_unexpected_auth_response(mock_socket: Mock) -> None: + """Test OTA fails when device sends an unexpected auth response.""" + mock_file = io.BytesIO(b"firmware") + + # Use 0x03 which is not in the expected auth responses + # This will be caught by check_error and raise "Unexpected response from ESP" + UNKNOWN_AUTH_METHOD = 0x03 + + responses = [ + bytes([espota2.RESPONSE_OK, espota2.OTA_VERSION_2_0]), + bytes([espota2.RESPONSE_HEADER_OK]), + bytes([UNKNOWN_AUTH_METHOD]), # Unknown auth method + ] + + mock_socket.recv.side_effect = responses + + # This will actually raise "Unexpected response from ESP" from check_error + with pytest.raises( + espota2.OTAError, match=r"Error auth: Unexpected response from ESP: 0x03" + ): + espota2.perform_ota(mock_socket, "password", mock_file, "test.bin") + + +def test_perform_ota_unsupported_version(mock_socket: Mock) -> None: + """Test OTA fails with unsupported version.""" + mock_file = io.BytesIO(b"firmware") + + responses = [ + bytes([espota2.RESPONSE_OK, 99]), # Unsupported version + ] + + mock_socket.recv.side_effect = responses + + with pytest.raises(espota2.OTAError, match="Device uses unsupported OTA version"): + espota2.perform_ota(mock_socket, "", mock_file, "test.bin") + + +@pytest.mark.usefixtures("mock_time") +def test_perform_ota_upload_error(mock_socket: Mock, mock_file: io.BytesIO) -> None: + """Test OTA handles upload errors.""" + # Setup responses - provide enough for the recv calls + recv_responses = [ + bytes([espota2.RESPONSE_OK]), # First byte of version response + bytes([espota2.OTA_VERSION_2_0]), # Version number + bytes([espota2.RESPONSE_HEADER_OK]), # Features response + bytes([espota2.RESPONSE_AUTH_OK]), # No auth required + bytes([espota2.RESPONSE_UPDATE_PREPARE_OK]), # Binary size OK + bytes([espota2.RESPONSE_BIN_MD5_OK]), # MD5 checksum OK + ] + # Add OSError to recv to simulate connection loss during chunk read + recv_responses.append(OSError("Connection lost")) + + mock_socket.recv.side_effect = recv_responses + + with pytest.raises(espota2.OTAError, match="Error receiving acknowledge chunk OK"): + espota2.perform_ota(mock_socket, "", mock_file, "test.bin") + + +@pytest.mark.usefixtures("mock_socket_constructor", "mock_resolve_ip") +def test_run_ota_impl_successful( + mock_socket: Mock, tmp_path: Path, mock_perform_ota: Mock +) -> None: + """Test run_ota_impl_ with successful upload.""" + # Create a real firmware file + firmware_file = tmp_path / "firmware.bin" + firmware_file.write_bytes(b"firmware content") + + # Run OTA with real file path + result_code, result_host = espota2.run_ota_impl_( + "test.local", 3232, "password", str(firmware_file) + ) + + # Verify success + assert result_code == 0 + assert result_host == "192.168.1.100" + + # Verify socket was configured correctly + mock_socket.settimeout.assert_called_with(10.0) + mock_socket.connect.assert_called_once_with(("192.168.1.100", 3232)) + mock_socket.close.assert_called_once() + + # Verify perform_ota was called with real file + mock_perform_ota.assert_called_once() + call_args = mock_perform_ota.call_args[0] + assert call_args[0] == mock_socket + assert call_args[1] == "password" + # Verify the file object is a proper file handle + assert isinstance(call_args[2], io.IOBase) + assert call_args[3] == str(firmware_file) + + +@pytest.mark.usefixtures("mock_socket_constructor", "mock_resolve_ip") +def test_run_ota_impl_connection_failed(mock_socket: Mock, tmp_path: Path) -> None: + """Test run_ota_impl_ when connection fails.""" + mock_socket.connect.side_effect = OSError("Connection refused") + + # Create a real firmware file + firmware_file = tmp_path / "firmware.bin" + firmware_file.write_bytes(b"firmware content") + + result_code, result_host = espota2.run_ota_impl_( + "test.local", 3232, "password", str(firmware_file) + ) + + assert result_code == 1 + assert result_host is None + mock_socket.close.assert_called_once() + + +def test_run_ota_impl_resolve_failed(tmp_path: Path, mock_resolve_ip: Mock) -> None: + """Test run_ota_impl_ when DNS resolution fails.""" + # Create a real firmware file + firmware_file = tmp_path / "firmware.bin" + firmware_file.write_bytes(b"firmware content") + + mock_resolve_ip.side_effect = EsphomeError("DNS resolution failed") + + with pytest.raises(espota2.OTAError, match="DNS resolution failed"): + result_code, result_host = espota2.run_ota_impl_( + "unknown.host", 3232, "password", str(firmware_file) + ) + + +def test_run_ota_wrapper(mock_run_ota_impl: Mock) -> None: + """Test run_ota wrapper function.""" + # Test successful case + mock_run_ota_impl.return_value = (0, "192.168.1.100") + result = espota2.run_ota("test.local", 3232, "pass", "fw.bin") + assert result == (0, "192.168.1.100") + + # Test error case + mock_run_ota_impl.side_effect = espota2.OTAError("Test error") + result = espota2.run_ota("test.local", 3232, "pass", "fw.bin") + assert result == (1, None) + + +def test_progress_bar(capsys: CaptureFixture[str]) -> None: + """Test ProgressBar functionality.""" + progress = espota2.ProgressBar() + + # Test initial update + progress.update(0.0) + captured = capsys.readouterr() + assert "0%" in captured.err + assert "[" in captured.err + + # Test progress update + progress.update(0.5) + captured = capsys.readouterr() + assert "50%" in captured.err + + # Test completion + progress.update(1.0) + captured = capsys.readouterr() + assert "100%" in captured.err + assert "Done" in captured.err + + # Test done method + progress.done() + captured = capsys.readouterr() + assert captured.err == "\n" + + # Test same progress doesn't update + progress.update(0.5) + progress.update(0.5) + captured = capsys.readouterr() + # Should only see one update (second call shouldn't write) + assert captured.err.count("50%") == 1 + + +# Tests for SHA256 authentication +@pytest.mark.usefixtures("mock_time") +def test_perform_ota_successful_sha256_auth( + mock_socket: Mock, mock_file: io.BytesIO, mock_random: Mock +) -> None: + """Test successful OTA with SHA256 authentication.""" + # Setup socket responses for recv calls + recv_responses = [ + bytes([espota2.RESPONSE_OK]), # First byte of version response + bytes([espota2.OTA_VERSION_2_0]), # Version number + bytes([espota2.RESPONSE_HEADER_OK]), # Features response + bytes([espota2.RESPONSE_REQUEST_SHA256_AUTH]), # SHA256 Auth request + MOCK_SHA256_NONCE, # 64 char hex nonce + bytes([espota2.RESPONSE_AUTH_OK]), # Auth result + bytes([espota2.RESPONSE_UPDATE_PREPARE_OK]), # Binary size OK + bytes([espota2.RESPONSE_BIN_MD5_OK]), # MD5 checksum OK + bytes([espota2.RESPONSE_CHUNK_OK]), # Chunk OK + bytes([espota2.RESPONSE_RECEIVE_OK]), # Receive OK + bytes([espota2.RESPONSE_UPDATE_END_OK]), # Update end OK + ] + + mock_socket.recv.side_effect = recv_responses + + # Run OTA + espota2.perform_ota(mock_socket, "testpass", mock_file, "test.bin") + + # Verify magic bytes were sent + assert mock_socket.sendall.call_args_list[0] == call(bytes(espota2.MAGIC_BYTES)) + + # Verify features were sent (compression + SHA256 support) + assert mock_socket.sendall.call_args_list[1] == call( + bytes( + [ + espota2.FEATURE_SUPPORTS_COMPRESSION + | espota2.FEATURE_SUPPORTS_SHA256_AUTH + ] + ) + ) + + # Verify cnonce was sent (SHA256 of random.random()) + cnonce = hashlib.sha256(MOCK_RANDOM_BYTES).hexdigest() + assert mock_socket.sendall.call_args_list[2] == call(cnonce.encode()) + + # Verify auth result was computed correctly with SHA256 + expected_hash = hashlib.sha256() + expected_hash.update(b"testpass") + expected_hash.update(MOCK_SHA256_NONCE) + expected_hash.update(cnonce.encode()) + expected_result = expected_hash.hexdigest() + assert mock_socket.sendall.call_args_list[3] == call(expected_result.encode()) + + +@pytest.mark.usefixtures("mock_time") +def test_perform_ota_sha256_fallback_to_md5( + mock_socket: Mock, mock_file: io.BytesIO, mock_random: Mock +) -> None: + """Test SHA256-capable client falls back to MD5 for compatibility.""" + # This test verifies the temporary backward compatibility + # where a SHA256-capable client can still authenticate with MD5 + # This compatibility will be removed in 2026.1.0 + recv_responses = [ + bytes([espota2.RESPONSE_OK]), # First byte of version response + bytes([espota2.OTA_VERSION_2_0]), # Version number + bytes([espota2.RESPONSE_HEADER_OK]), # Features response + bytes( + [espota2.RESPONSE_REQUEST_AUTH] + ), # MD5 Auth request (device doesn't support SHA256) + MOCK_MD5_NONCE, # 32 char hex nonce for MD5 + bytes([espota2.RESPONSE_AUTH_OK]), # Auth result + bytes([espota2.RESPONSE_UPDATE_PREPARE_OK]), # Binary size OK + bytes([espota2.RESPONSE_BIN_MD5_OK]), # MD5 checksum OK + bytes([espota2.RESPONSE_CHUNK_OK]), # Chunk OK + bytes([espota2.RESPONSE_RECEIVE_OK]), # Receive OK + bytes([espota2.RESPONSE_UPDATE_END_OK]), # Update end OK + ] + + mock_socket.recv.side_effect = recv_responses + + # Run OTA - should work even though device requested MD5 + espota2.perform_ota(mock_socket, "testpass", mock_file, "test.bin") + + # Verify client still advertised SHA256 support + assert mock_socket.sendall.call_args_list[1] == call( + bytes( + [ + espota2.FEATURE_SUPPORTS_COMPRESSION + | espota2.FEATURE_SUPPORTS_SHA256_AUTH + ] + ) + ) + + # But authentication was done with MD5 + cnonce = hashlib.md5(MOCK_RANDOM_BYTES).hexdigest() + expected_hash = hashlib.md5() + expected_hash.update(b"testpass") + expected_hash.update(MOCK_MD5_NONCE) + expected_hash.update(cnonce.encode()) + expected_result = expected_hash.hexdigest() + assert mock_socket.sendall.call_args_list[3] == call(expected_result.encode()) + + +@pytest.mark.usefixtures("mock_time") +def test_perform_ota_version_differences( + mock_socket: Mock, mock_file: io.BytesIO +) -> None: + """Test OTA behavior differences between version 1.0 and 2.0.""" + # Test version 1.0 - no chunk acknowledgments + recv_responses = [ + bytes([espota2.RESPONSE_OK]), # First byte of version response + bytes([espota2.OTA_VERSION_1_0]), # Version number + bytes([espota2.RESPONSE_HEADER_OK]), # Features response + bytes([espota2.RESPONSE_AUTH_OK]), # No auth required + bytes([espota2.RESPONSE_UPDATE_PREPARE_OK]), # Binary size OK + bytes([espota2.RESPONSE_BIN_MD5_OK]), # MD5 checksum OK + # No RESPONSE_CHUNK_OK for v1 + bytes([espota2.RESPONSE_RECEIVE_OK]), # Receive OK + bytes([espota2.RESPONSE_UPDATE_END_OK]), # Update end OK + ] + + mock_socket.recv.side_effect = recv_responses + espota2.perform_ota(mock_socket, "", mock_file, "test.bin") + + # For v1.0, verify that we only get the expected number of recv calls + # v1.0 doesn't have chunk acknowledgments, so fewer recv calls + assert mock_socket.recv.call_count == 8 # v1.0 has 8 recv calls + + # Reset mock for v2.0 test + mock_socket.reset_mock() + + # Reset file position for second test + mock_file.seek(0) + + # Test version 2.0 - with chunk acknowledgments + recv_responses_v2 = [ + bytes([espota2.RESPONSE_OK]), # First byte of version response + bytes([espota2.OTA_VERSION_2_0]), # Version number + bytes([espota2.RESPONSE_HEADER_OK]), # Features response + bytes([espota2.RESPONSE_AUTH_OK]), # No auth required + bytes([espota2.RESPONSE_UPDATE_PREPARE_OK]), # Binary size OK + bytes([espota2.RESPONSE_BIN_MD5_OK]), # MD5 checksum OK + bytes([espota2.RESPONSE_CHUNK_OK]), # v2.0 has chunk acknowledgment + bytes([espota2.RESPONSE_RECEIVE_OK]), # Receive OK + bytes([espota2.RESPONSE_UPDATE_END_OK]), # Update end OK + ] + + mock_socket.recv.side_effect = recv_responses_v2 + espota2.perform_ota(mock_socket, "", mock_file, "test.bin") + + # For v2.0, verify more recv calls due to chunk acknowledgments + assert mock_socket.recv.call_count == 9 # v2.0 has 9 recv calls (includes chunk OK)