From e47cecc5f0240c52c8111e6b58c076d0df111c57 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Sun, 21 Sep 2025 10:28:09 -0600 Subject: [PATCH] remove unreachable code --- esphome/espota2.py | 40 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 36 insertions(+), 4 deletions(-) diff --git a/esphome/espota2.py b/esphome/espota2.py index 6e8936668b..2a4d21dc3e 100644 --- a/esphome/espota2.py +++ b/esphome/espota2.py @@ -57,7 +57,7 @@ UPLOAD_BUFFER_SIZE = UPLOAD_BLOCK_SIZE * 8 _LOGGER = logging.getLogger(__name__) # Authentication method lookup table: response -> (hash_func, nonce_size, name) -_AUTH_METHODS: dict[int, tuple[Callable[[], Any], int, str]] = { +_AUTH_METHODS: dict[int, tuple[Callable[..., Any], int, str]] = { RESPONSE_REQUEST_SHA256_AUTH: (hashlib.sha256, 64, "SHA256"), RESPONSE_REQUEST_AUTH: (hashlib.md5, 32, "MD5"), } @@ -94,6 +94,13 @@ class OTAError(EsphomeError): def recv_decode( sock: socket.socket, amount: int, decode: bool = True ) -> bytes | list[int]: + """Receive data from socket and optionally decode to list of integers. + + :param sock: Socket to receive data from. + :param amount: Number of bytes to receive. + :param decode: If True, convert bytes to list of integers, otherwise return raw bytes. + :return: List of integers if decode=True, otherwise raw bytes. + """ data = sock.recv(amount) if not decode: return data @@ -107,6 +114,16 @@ def receive_exactly( expect: int | list[int] | None, decode: bool = True, ) -> list[int] | bytes: + """Receive exactly the specified amount of data from socket with error checking. + + :param sock: Socket to receive data from. + :param amount: Exact number of bytes to receive. + :param msg: Description of what is being received for error messages. + :param expect: Expected response code(s) for validation, None to skip validation. + :param decode: If True, return list of integers, otherwise return raw bytes. + :return: List of integers if decode=True, otherwise raw bytes. + :raises OTAError: If receiving fails or response doesn't match expected. + """ data: list[int] | bytes = [] if decode else b"" try: @@ -129,6 +146,12 @@ def receive_exactly( def check_error(data: list[int] | bytes, expect: int | list[int] | None) -> None: + """Check response data for error codes and validate against expected response. + + :param data: Response data from device (first byte is the response code). + :param expect: Expected response code(s), None to skip validation. + :raises OTAError: If an error code is detected or response doesn't match expected. + """ if not expect: return dat = data[0] @@ -198,6 +221,13 @@ def check_error(data: list[int] | bytes, expect: int | list[int] | None) -> None def send_check( sock: socket.socket, data: list[int] | tuple[int, ...] | int | str | bytes, msg: str ) -> None: + """Send data to socket with error handling. + + :param sock: Socket to send data to. + :param data: Data to send (can be list/tuple of ints, single int, string, or bytes). + :param msg: Description of what is being sent for error messages. + :raises OTAError: If sending fails. + """ try: if isinstance(data, (list, tuple)): data = bytes(data) @@ -249,7 +279,7 @@ def perform_ota( def perform_auth( sock: socket.socket, password: str, - hash_func: Callable[[], Any], + hash_func: Callable[..., Any], nonce_size: int, hash_name: str, ) -> None: @@ -257,9 +287,11 @@ def perform_ota( if not password: raise OTAError("ESP requests password, but no password given!") - nonce = receive_exactly( + nonce_bytes = receive_exactly( sock, nonce_size, f"{hash_name} authentication nonce", [], decode=False - ).decode() + ) + assert isinstance(nonce_bytes, bytes) + nonce = nonce_bytes.decode() _LOGGER.debug("Auth: %s Nonce is %s", hash_name, nonce) # Generate cnonce