mirror of
https://github.com/esphome/esphome.git
synced 2025-09-21 20:52:20 +01:00
remove unreachable code
This commit is contained in:
@@ -57,7 +57,7 @@ UPLOAD_BUFFER_SIZE = UPLOAD_BLOCK_SIZE * 8
|
|||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
# Authentication method lookup table: response -> (hash_func, nonce_size, name)
|
# Authentication method lookup table: response -> (hash_func, nonce_size, name)
|
||||||
_AUTH_METHODS: dict[int, tuple[Callable[[], Any], int, str]] = {
|
_AUTH_METHODS: dict[int, tuple[Callable[..., Any], int, str]] = {
|
||||||
RESPONSE_REQUEST_SHA256_AUTH: (hashlib.sha256, 64, "SHA256"),
|
RESPONSE_REQUEST_SHA256_AUTH: (hashlib.sha256, 64, "SHA256"),
|
||||||
RESPONSE_REQUEST_AUTH: (hashlib.md5, 32, "MD5"),
|
RESPONSE_REQUEST_AUTH: (hashlib.md5, 32, "MD5"),
|
||||||
}
|
}
|
||||||
@@ -94,6 +94,13 @@ class OTAError(EsphomeError):
|
|||||||
def recv_decode(
|
def recv_decode(
|
||||||
sock: socket.socket, amount: int, decode: bool = True
|
sock: socket.socket, amount: int, decode: bool = True
|
||||||
) -> bytes | list[int]:
|
) -> bytes | list[int]:
|
||||||
|
"""Receive data from socket and optionally decode to list of integers.
|
||||||
|
|
||||||
|
:param sock: Socket to receive data from.
|
||||||
|
:param amount: Number of bytes to receive.
|
||||||
|
:param decode: If True, convert bytes to list of integers, otherwise return raw bytes.
|
||||||
|
:return: List of integers if decode=True, otherwise raw bytes.
|
||||||
|
"""
|
||||||
data = sock.recv(amount)
|
data = sock.recv(amount)
|
||||||
if not decode:
|
if not decode:
|
||||||
return data
|
return data
|
||||||
@@ -107,6 +114,16 @@ def receive_exactly(
|
|||||||
expect: int | list[int] | None,
|
expect: int | list[int] | None,
|
||||||
decode: bool = True,
|
decode: bool = True,
|
||||||
) -> list[int] | bytes:
|
) -> list[int] | bytes:
|
||||||
|
"""Receive exactly the specified amount of data from socket with error checking.
|
||||||
|
|
||||||
|
:param sock: Socket to receive data from.
|
||||||
|
:param amount: Exact number of bytes to receive.
|
||||||
|
:param msg: Description of what is being received for error messages.
|
||||||
|
:param expect: Expected response code(s) for validation, None to skip validation.
|
||||||
|
:param decode: If True, return list of integers, otherwise return raw bytes.
|
||||||
|
:return: List of integers if decode=True, otherwise raw bytes.
|
||||||
|
:raises OTAError: If receiving fails or response doesn't match expected.
|
||||||
|
"""
|
||||||
data: list[int] | bytes = [] if decode else b""
|
data: list[int] | bytes = [] if decode else b""
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@@ -129,6 +146,12 @@ def receive_exactly(
|
|||||||
|
|
||||||
|
|
||||||
def check_error(data: list[int] | bytes, expect: int | list[int] | None) -> None:
|
def check_error(data: list[int] | bytes, expect: int | list[int] | None) -> None:
|
||||||
|
"""Check response data for error codes and validate against expected response.
|
||||||
|
|
||||||
|
:param data: Response data from device (first byte is the response code).
|
||||||
|
:param expect: Expected response code(s), None to skip validation.
|
||||||
|
:raises OTAError: If an error code is detected or response doesn't match expected.
|
||||||
|
"""
|
||||||
if not expect:
|
if not expect:
|
||||||
return
|
return
|
||||||
dat = data[0]
|
dat = data[0]
|
||||||
@@ -198,6 +221,13 @@ def check_error(data: list[int] | bytes, expect: int | list[int] | None) -> None
|
|||||||
def send_check(
|
def send_check(
|
||||||
sock: socket.socket, data: list[int] | tuple[int, ...] | int | str | bytes, msg: str
|
sock: socket.socket, data: list[int] | tuple[int, ...] | int | str | bytes, msg: str
|
||||||
) -> None:
|
) -> None:
|
||||||
|
"""Send data to socket with error handling.
|
||||||
|
|
||||||
|
:param sock: Socket to send data to.
|
||||||
|
:param data: Data to send (can be list/tuple of ints, single int, string, or bytes).
|
||||||
|
:param msg: Description of what is being sent for error messages.
|
||||||
|
:raises OTAError: If sending fails.
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
if isinstance(data, (list, tuple)):
|
if isinstance(data, (list, tuple)):
|
||||||
data = bytes(data)
|
data = bytes(data)
|
||||||
@@ -249,7 +279,7 @@ def perform_ota(
|
|||||||
def perform_auth(
|
def perform_auth(
|
||||||
sock: socket.socket,
|
sock: socket.socket,
|
||||||
password: str,
|
password: str,
|
||||||
hash_func: Callable[[], Any],
|
hash_func: Callable[..., Any],
|
||||||
nonce_size: int,
|
nonce_size: int,
|
||||||
hash_name: str,
|
hash_name: str,
|
||||||
) -> None:
|
) -> None:
|
||||||
@@ -257,9 +287,11 @@ def perform_ota(
|
|||||||
if not password:
|
if not password:
|
||||||
raise OTAError("ESP requests password, but no password given!")
|
raise OTAError("ESP requests password, but no password given!")
|
||||||
|
|
||||||
nonce = receive_exactly(
|
nonce_bytes = receive_exactly(
|
||||||
sock, nonce_size, f"{hash_name} authentication nonce", [], decode=False
|
sock, nonce_size, f"{hash_name} authentication nonce", [], decode=False
|
||||||
).decode()
|
)
|
||||||
|
assert isinstance(nonce_bytes, bytes)
|
||||||
|
nonce = nonce_bytes.decode()
|
||||||
_LOGGER.debug("Auth: %s Nonce is %s", hash_name, nonce)
|
_LOGGER.debug("Auth: %s Nonce is %s", hash_name, nonce)
|
||||||
|
|
||||||
# Generate cnonce
|
# Generate cnonce
|
||||||
|
Reference in New Issue
Block a user