1
0
mirror of https://github.com/esphome/esphome.git synced 2026-02-08 08:41:59 +00:00
This commit is contained in:
J. Nick Koston
2025-12-29 21:16:11 -10:00
parent 10b0308bc0
commit 25a4d7ffab

View File

@@ -2811,15 +2811,20 @@ def test_compile_program_no_build_info_when_json_missing_keys(
# Tests for run_miniterm serial log batching
# Sentinel to signal end of mock serial data (raises SerialException)
MOCK_SERIAL_END = object()
class MockSerial:
"""Mock serial port for testing run_miniterm."""
def __init__(self, chunks: list[bytes]) -> None:
def __init__(self, chunks: list[bytes | object]) -> None:
"""Initialize with a list of chunks to return from read().
Args:
chunks: List of byte chunks to return sequentially.
Empty bytes at the end signal end of data.
Use MOCK_SERIAL_END sentinel to signal end of data.
Empty bytes b"" simulate timeout (no data available).
"""
self.chunks = list(chunks)
self.chunk_index = 0
@@ -2841,7 +2846,10 @@ class MockSerial:
def in_waiting(self) -> int:
"""Return number of bytes available."""
if self.chunk_index < len(self.chunks):
return len(self.chunks[self.chunk_index])
chunk = self.chunks[self.chunk_index]
if chunk is MOCK_SERIAL_END:
return 0
return len(chunk) # type: ignore[arg-type]
return 0
def read(self, size: int = 1) -> bytes:
@@ -2849,12 +2857,12 @@ class MockSerial:
if self.chunk_index < len(self.chunks):
chunk = self.chunks[self.chunk_index]
self.chunk_index += 1
if not chunk:
# Empty chunk means we're done - simulate end
if chunk is MOCK_SERIAL_END:
# Sentinel means we're done - simulate port closed
import serial
raise serial.SerialException("Port closed")
return chunk
return chunk # type: ignore[return-value]
import serial
raise serial.SerialException("Port closed")
@@ -2868,7 +2876,7 @@ def test_run_miniterm_batches_lines_with_same_timestamp(
# This is how data arrives over USB - many lines at once
chunk = b"[I][app:100]: Line 1\r\n[I][app:100]: Line 2\r\n[I][app:100]: Line 3\r\n"
mock_serial = MockSerial([chunk, b""])
mock_serial = MockSerial([chunk, MOCK_SERIAL_END])
config = {
CONF_LOGGER: {
@@ -2906,7 +2914,7 @@ def test_run_miniterm_different_chunks_different_timestamps(
chunk1 = b"[I][app:100]: Chunk 1 Line\r\n"
chunk2 = b"[I][app:100]: Chunk 2 Line\r\n"
mock_serial = MockSerial([chunk1, chunk2, b""])
mock_serial = MockSerial([chunk1, chunk2, MOCK_SERIAL_END])
config = {
CONF_LOGGER: {
@@ -2936,7 +2944,7 @@ def test_run_miniterm_handles_split_lines() -> None:
chunk1 = b"[I][app:100]: Start of "
chunk2 = b"line\r\n"
mock_serial = MockSerial([chunk1, chunk2, b""])
mock_serial = MockSerial([chunk1, chunk2, MOCK_SERIAL_END])
config = {
CONF_LOGGER: {
@@ -2974,7 +2982,7 @@ def test_run_miniterm_backtrace_state_maintained() -> None:
b"<<<stack<<<\r\n"
)
mock_serial = MockSerial([backtrace_chunk, b""])
mock_serial = MockSerial([backtrace_chunk, MOCK_SERIAL_END])
config = {
CONF_LOGGER: {
@@ -3028,6 +3036,43 @@ def test_run_miniterm_backtrace_state_maintained() -> None:
assert backtrace_states[3][1] is True
def test_run_miniterm_handles_empty_reads(
capfd: CaptureFixture[str],
) -> None:
"""Test that empty reads (timeouts) are handled correctly.
When read() returns empty bytes, the code should continue waiting
for more data without processing anything.
"""
# Simulate: empty read (timeout), then data, then empty read, then end
chunk = b"[I][app:100]: Test line\r\n"
mock_serial = MockSerial([b"", chunk, b"", MOCK_SERIAL_END])
config = {
CONF_LOGGER: {
CONF_BAUD_RATE: 115200,
"deassert_rts_dtr": False,
}
}
args = MockArgs()
with (
patch("serial.Serial", return_value=mock_serial),
patch.object(platformio_api, "process_stacktrace") as mock_bt,
):
mock_bt.return_value = False
result = run_miniterm(config, "/dev/ttyUSB0", args)
assert result == 0
captured = capfd.readouterr()
lines = [line for line in captured.out.strip().split("\n") if line]
# Should have exactly one line despite empty reads
assert len(lines) == 1
assert "Test line" in lines[0]
def test_run_miniterm_no_logger_returns_early(
caplog: pytest.LogCaptureFixture,
) -> None: