From b7dbda497a9da5e12bf96496999dd291f1f64d5f Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Wed, 7 Jan 2026 08:28:31 -1000 Subject: [PATCH] [core] Improve log timestamp accuracy by batching serial reads (#12750) --- esphome/__main__.py | 43 ++-- tests/unit_tests/test_main.py | 368 ++++++++++++++++++++++++++++++++++ 2 files changed, 397 insertions(+), 14 deletions(-) diff --git a/esphome/__main__.py b/esphome/__main__.py index 73fdef6735..3849a585ca 100644 --- a/esphome/__main__.py +++ b/esphome/__main__.py @@ -62,6 +62,9 @@ from esphome.util import ( _LOGGER = logging.getLogger(__name__) +# Maximum buffer size for serial log reading to prevent unbounded memory growth +SERIAL_BUFFER_MAX_SIZE = 65536 + # Special non-component keys that appear in configs _NON_COMPONENT_KEYS = frozenset( { @@ -431,25 +434,37 @@ def run_miniterm(config: ConfigType, port: str, args) -> int: while tries < 5: try: with ser: + buffer = b"" + ser.timeout = 0.1 # 100ms timeout for non-blocking reads while True: try: - raw = ser.readline() + # Read all available data and timestamp it + chunk = ser.read(ser.in_waiting or 1) + if not chunk: + continue + time_ = datetime.now() + milliseconds = time_.microsecond // 1000 + time_str = f"[{time_.hour:02}:{time_.minute:02}:{time_.second:02}.{milliseconds:03}]" + + # Add to buffer and process complete lines + # Limit buffer size to prevent unbounded memory growth + # if device sends data without newlines + buffer += chunk + if len(buffer) > SERIAL_BUFFER_MAX_SIZE: + buffer = buffer[-SERIAL_BUFFER_MAX_SIZE:] + while b"\n" in buffer: + raw_line, buffer = buffer.split(b"\n", 1) + line = raw_line.replace(b"\r", b"").decode( + "utf8", "backslashreplace" + ) + safe_print(parser.parse_line(line, time_str)) + + backtrace_state = platformio_api.process_stacktrace( + config, line, backtrace_state=backtrace_state + ) except serial.SerialException: _LOGGER.error("Serial port closed!") return 0 - line = ( - raw.replace(b"\r", b"") - .replace(b"\n", b"") - .decode("utf8", "backslashreplace") - ) - time_ = datetime.now() - nanoseconds = time_.microsecond // 1000 - time_str = f"[{time_.hour:02}:{time_.minute:02}:{time_.second:02}.{nanoseconds:03}]" - safe_print(parser.parse_line(line, time_str)) - - backtrace_state = platformio_api.process_stacktrace( - config, line, backtrace_state=backtrace_state - ) except serial.SerialException: tries += 1 time.sleep(1) diff --git a/tests/unit_tests/test_main.py b/tests/unit_tests/test_main.py index f173c53636..fd8f04ded5 100644 --- a/tests/unit_tests/test_main.py +++ b/tests/unit_tests/test_main.py @@ -34,6 +34,7 @@ from esphome.__main__ import ( has_non_ip_address, has_resolvable_address, mqtt_get_ip, + run_miniterm, show_logs, upload_program, upload_using_esptool, @@ -41,11 +42,13 @@ from esphome.__main__ import ( from esphome.components.esp32 import KEY_ESP32, KEY_VARIANT, VARIANT_ESP32 from esphome.const import ( CONF_API, + CONF_BAUD_RATE, CONF_BROKER, CONF_DISABLED, CONF_ESPHOME, CONF_LEVEL, CONF_LOG_TOPIC, + CONF_LOGGER, CONF_MDNS, CONF_MQTT, CONF_NAME, @@ -838,6 +841,7 @@ class MockArgs: configuration: str | None = None name: str | None = None dashboard: bool = False + reset: bool = False def test_upload_program_serial_esp32( @@ -2804,3 +2808,367 @@ def test_compile_program_no_build_info_when_json_missing_keys( assert result == 0 assert "Build Info:" not in caplog.text + + +# Tests for run_miniterm serial log batching + + +# Sentinel to signal end of mock serial data (raises SerialException) +MOCK_SERIAL_END = object() + + +class MockSerial: + """Mock serial port for testing run_miniterm.""" + + def __init__(self, chunks: list[bytes | object]) -> None: + """Initialize with a list of chunks to return from read(). + + Args: + chunks: List of byte chunks to return sequentially. + Use MOCK_SERIAL_END sentinel to signal end of data. + Empty bytes b"" simulate timeout (no data available). + """ + self.chunks = list(chunks) + self.chunk_index = 0 + self.baudrate = 0 + self.port = "" + self.dtr = True + self.rts = True + self.timeout = 0.1 + self._is_open = False + + def __enter__(self) -> MockSerial: + self._is_open = True + return self + + def __exit__(self, *args: Any) -> None: + self._is_open = False + + @property + def in_waiting(self) -> int: + """Return number of bytes available.""" + if self.chunk_index < len(self.chunks): + chunk = self.chunks[self.chunk_index] + if chunk is MOCK_SERIAL_END: + return 0 + return len(chunk) # type: ignore[arg-type] + return 0 + + def read(self, size: int = 1) -> bytes: + """Read up to size bytes from the current chunk. + + This method respects the size argument and keeps any unconsumed + bytes in the current chunk so that subsequent calls to in_waiting + and read see the remaining data. + """ + if self.chunk_index < len(self.chunks): + chunk = self.chunks[self.chunk_index] + if chunk is MOCK_SERIAL_END: + # Sentinel means we're done - simulate port closed + import serial + + raise serial.SerialException("Port closed") + # Respect the requested size and keep any remaining bytes + if size <= 0: + return b"" + data = chunk[:size] # type: ignore[index] + remaining = chunk[size:] # type: ignore[index] + if remaining: + # Keep remaining bytes for the next read + self.chunks[self.chunk_index] = remaining # type: ignore[assignment] + else: + # Entire chunk consumed; advance to the next one + self.chunk_index += 1 + return data # type: ignore[return-value] + import serial + + raise serial.SerialException("Port closed") + + +def test_run_miniterm_batches_lines_with_same_timestamp( + capfd: CaptureFixture[str], +) -> None: + """Test that lines from the same chunk get the same timestamp.""" + # Simulate receiving multiple log lines in a single chunk + # This is how data arrives over USB - many lines at once + chunk = b"[I][app:100]: Line 1\r\n[I][app:100]: Line 2\r\n[I][app:100]: Line 3\r\n" + + mock_serial = MockSerial([chunk, MOCK_SERIAL_END]) + + config = { + CONF_LOGGER: { + CONF_BAUD_RATE: 115200, + "deassert_rts_dtr": False, + } + } + args = MockArgs() + + with ( + patch("serial.Serial", return_value=mock_serial), + patch.object(platformio_api, "process_stacktrace") as mock_bt, + ): + mock_bt.return_value = False + result = run_miniterm(config, "/dev/ttyUSB0", args) + + assert result == 0 + + captured = capfd.readouterr() + lines = [line for line in captured.out.strip().split("\n") if line] + + # All 3 lines should have the same timestamp (first 13 chars like "[HH:MM:SS.mmm]") + assert len(lines) == 3 + timestamps = [line[:13] for line in lines] + assert timestamps[0] == timestamps[1] == timestamps[2], ( + f"Lines from same chunk should have same timestamp: {timestamps}" + ) + + +def test_run_miniterm_different_chunks_different_timestamps( + capfd: CaptureFixture[str], +) -> None: + """Test that lines from different chunks can have different timestamps.""" + # Two separate chunks - could have different timestamps + chunk1 = b"[I][app:100]: Chunk 1 Line\r\n" + chunk2 = b"[I][app:100]: Chunk 2 Line\r\n" + + mock_serial = MockSerial([chunk1, chunk2, MOCK_SERIAL_END]) + + config = { + CONF_LOGGER: { + CONF_BAUD_RATE: 115200, + "deassert_rts_dtr": False, + } + } + args = MockArgs() + + with ( + patch("serial.Serial", return_value=mock_serial), + patch.object(platformio_api, "process_stacktrace") as mock_bt, + ): + mock_bt.return_value = False + result = run_miniterm(config, "/dev/ttyUSB0", args) + + assert result == 0 + + captured = capfd.readouterr() + lines = [line for line in captured.out.strip().split("\n") if line] + assert len(lines) == 2 + + +def test_run_miniterm_handles_split_lines() -> None: + """Test that partial lines are buffered until complete.""" + # Line split across two chunks + chunk1 = b"[I][app:100]: Start of " + chunk2 = b"line\r\n" + + mock_serial = MockSerial([chunk1, chunk2, MOCK_SERIAL_END]) + + config = { + CONF_LOGGER: { + CONF_BAUD_RATE: 115200, + "deassert_rts_dtr": False, + } + } + args = MockArgs() + + with ( + patch("serial.Serial", return_value=mock_serial), + patch.object(platformio_api, "process_stacktrace") as mock_bt, + patch("esphome.__main__.safe_print") as mock_print, + ): + mock_bt.return_value = False + run_miniterm(config, "/dev/ttyUSB0", args) + + # Should have printed exactly one complete line + assert mock_print.call_count == 1 + printed_line = mock_print.call_args[0][0] + assert "Start of line" in printed_line + + +def test_run_miniterm_backtrace_state_maintained() -> None: + """Test that backtrace_state is properly maintained across lines. + + ESP8266 backtraces span multiple lines between >>>stack>>> and <<>>stack>>>\r\n" + b"3ffffe90: 40220ef8 b66aa8c0 3fff0a4c 40204c84\r\n" + b"3ffffea0: 00000005 0000a635 3fff191c 4020413c\r\n" + b"<< bool: + """Track the backtrace_state progression.""" + backtrace_states.append((line, backtrace_state)) + # Simulate actual behavior + if ">>>stack>>>" in line: + return True + if "<<>>stack>>> - state should be False (before processing) + assert ">>>stack>>>" in backtrace_states[0][0] + assert backtrace_states[0][1] is False + + # Line 2: stack data - state should be True (after >>>stack>>>) + assert "40220ef8" in backtrace_states[1][0] + assert backtrace_states[1][1] is True + + # Line 3: more stack data - state should be True + assert "4020413c" in backtrace_states[2][0] + assert backtrace_states[2][1] is True + + # Line 4: << None: + """Test that empty reads (timeouts) are handled correctly. + + When read() returns empty bytes, the code should continue waiting + for more data without processing anything. + """ + # Simulate: empty read (timeout), then data, then empty read, then end + chunk = b"[I][app:100]: Test line\r\n" + + mock_serial = MockSerial([b"", chunk, b"", MOCK_SERIAL_END]) + + config = { + CONF_LOGGER: { + CONF_BAUD_RATE: 115200, + "deassert_rts_dtr": False, + } + } + args = MockArgs() + + with ( + patch("serial.Serial", return_value=mock_serial), + patch.object(platformio_api, "process_stacktrace") as mock_bt, + ): + mock_bt.return_value = False + result = run_miniterm(config, "/dev/ttyUSB0", args) + + assert result == 0 + + captured = capfd.readouterr() + lines = [line for line in captured.out.strip().split("\n") if line] + # Should have exactly one line despite empty reads + assert len(lines) == 1 + assert "Test line" in lines[0] + + +def test_run_miniterm_no_logger_returns_early( + caplog: pytest.LogCaptureFixture, +) -> None: + """Test that run_miniterm returns early if logger is not configured.""" + config: dict[str, Any] = {} # No logger config + args = MockArgs() + + with caplog.at_level(logging.INFO): + result = run_miniterm(config, "/dev/ttyUSB0", args) + + assert result == 1 + assert "Logger is not enabled" in caplog.text + + +def test_run_miniterm_baud_rate_zero_returns_early( + caplog: pytest.LogCaptureFixture, +) -> None: + """Test that run_miniterm returns early if baud_rate is 0.""" + config = { + CONF_LOGGER: { + CONF_BAUD_RATE: 0, + "deassert_rts_dtr": False, + } + } + args = MockArgs() + + with caplog.at_level(logging.INFO): + result = run_miniterm(config, "/dev/ttyUSB0", args) + + assert result == 1 + assert "UART logging is disabled" in caplog.text + + +def test_run_miniterm_buffer_limit_prevents_unbounded_growth() -> None: + """Test that buffer is limited to prevent unbounded memory growth. + + If a device sends data without newlines, the buffer should be truncated + to SERIAL_BUFFER_MAX_SIZE to prevent memory exhaustion. + """ + # Use a small buffer limit for testing + test_buffer_limit = 100 + + # Create data larger than the limit without newlines + large_data_no_newline = b"X" * 150 # 150 bytes, no newline + final_line = b"END\r\n" + + mock_serial = MockSerial([large_data_no_newline, final_line, MOCK_SERIAL_END]) + + config = { + CONF_LOGGER: { + CONF_BAUD_RATE: 115200, + "deassert_rts_dtr": False, + } + } + args = MockArgs() + + with ( + patch("serial.Serial", return_value=mock_serial), + patch.object(platformio_api, "process_stacktrace") as mock_bt, + patch("esphome.__main__.safe_print") as mock_print, + patch("esphome.__main__.SERIAL_BUFFER_MAX_SIZE", test_buffer_limit), + ): + mock_bt.return_value = False + run_miniterm(config, "/dev/ttyUSB0", args) + + # Should have printed exactly one line + assert mock_print.call_count == 1 + printed_line = mock_print.call_args[0][0] + + # The line should contain "END" and some X's, but not all 150 X's + # because the buffer was truncated + assert "END" in printed_line + assert "X" in printed_line + # Verify truncation happened - we shouldn't have all 150 X's + # The buffer logic is: + # 1. Add 150 X's -> buffer = 150 bytes -> truncate to last 100 = 100 X's + # 2. Add "END\r\n" (5 bytes) -> buffer = 105 bytes -> truncate to last 100 + # = 95 X's + "END\r\n" + # 3. Find newline, extract line = "95 X's + END" + x_count = printed_line.count("X") + assert x_count < 150, f"Expected truncation but got {x_count} X's" + assert x_count == 95, f"Expected 95 X's after truncation but got {x_count}"