diff --git a/esphome/espota2.py b/esphome/espota2.py index 33176cc35f..dc4fa7237b 100644 --- a/esphome/espota2.py +++ b/esphome/espota2.py @@ -1,5 +1,6 @@ from __future__ import annotations +from collections.abc import Callable import gzip import hashlib import io @@ -55,6 +56,12 @@ UPLOAD_BUFFER_SIZE = UPLOAD_BLOCK_SIZE * 8 _LOGGER = logging.getLogger(__name__) +# Authentication method lookup table: response -> (hash_func, nonce_size, name) +_AUTH_METHODS: dict[int, tuple[Callable[[], Any], int, str]] = { + RESPONSE_REQUEST_SHA256_AUTH: (hashlib.sha256, 64, "SHA256"), + RESPONSE_REQUEST_AUTH: (hashlib.md5, 32, "MD5"), +} + class ProgressBar: def __init__(self): @@ -262,18 +269,24 @@ def perform_ota( send_check(sock, result, "auth result") receive_exactly(sock, 1, "auth result", RESPONSE_AUTH_OK) + # Authentication method lookup table + auth_methods = { + RESPONSE_REQUEST_SHA256_AUTH: (hashlib.sha256, 64, "SHA256"), + RESPONSE_REQUEST_AUTH: (hashlib.md5, 32, "MD5"), + } + (auth,) = receive_exactly( sock, 1, "auth", [RESPONSE_REQUEST_AUTH, RESPONSE_REQUEST_SHA256_AUTH, RESPONSE_AUTH_OK], ) - if auth == RESPONSE_REQUEST_SHA256_AUTH: - # SHA256 authentication - perform_auth(sock, password, hashlib.sha256, 64, "SHA256") - elif auth == RESPONSE_REQUEST_AUTH: - # MD5 authentication (backward compatibility) - perform_auth(sock, password, hashlib.md5, 32, "MD5") + + if auth in auth_methods: + hash_func, nonce_size, hash_name = auth_methods[auth] + perform_auth(sock, password, hash_func, nonce_size, hash_name) + elif auth != RESPONSE_AUTH_OK: + raise OTAError(f"Unknown authentication method requested: 0x{auth:02X}") # Set higher timeout during upload sock.settimeout(30.0)