1
0
mirror of https://github.com/esphome/esphome.git synced 2026-02-08 00:31:58 +00:00

[core] Improve log timestamp accuracy by batching serial reads (#12750)

This commit is contained in:
J. Nick Koston
2026-01-07 08:28:31 -10:00
committed by GitHub
parent 815543b77e
commit b7dbda497a
2 changed files with 397 additions and 14 deletions

View File

@@ -34,6 +34,7 @@ from esphome.__main__ import (
has_non_ip_address,
has_resolvable_address,
mqtt_get_ip,
run_miniterm,
show_logs,
upload_program,
upload_using_esptool,
@@ -41,11 +42,13 @@ from esphome.__main__ import (
from esphome.components.esp32 import KEY_ESP32, KEY_VARIANT, VARIANT_ESP32
from esphome.const import (
CONF_API,
CONF_BAUD_RATE,
CONF_BROKER,
CONF_DISABLED,
CONF_ESPHOME,
CONF_LEVEL,
CONF_LOG_TOPIC,
CONF_LOGGER,
CONF_MDNS,
CONF_MQTT,
CONF_NAME,
@@ -838,6 +841,7 @@ class MockArgs:
configuration: str | None = None
name: str | None = None
dashboard: bool = False
reset: bool = False
def test_upload_program_serial_esp32(
@@ -2804,3 +2808,367 @@ def test_compile_program_no_build_info_when_json_missing_keys(
assert result == 0
assert "Build Info:" not in caplog.text
# Tests for run_miniterm serial log batching
# Sentinel to signal end of mock serial data (raises SerialException)
MOCK_SERIAL_END = object()
class MockSerial:
"""Mock serial port for testing run_miniterm."""
def __init__(self, chunks: list[bytes | object]) -> None:
"""Initialize with a list of chunks to return from read().
Args:
chunks: List of byte chunks to return sequentially.
Use MOCK_SERIAL_END sentinel to signal end of data.
Empty bytes b"" simulate timeout (no data available).
"""
self.chunks = list(chunks)
self.chunk_index = 0
self.baudrate = 0
self.port = ""
self.dtr = True
self.rts = True
self.timeout = 0.1
self._is_open = False
def __enter__(self) -> MockSerial:
self._is_open = True
return self
def __exit__(self, *args: Any) -> None:
self._is_open = False
@property
def in_waiting(self) -> int:
"""Return number of bytes available."""
if self.chunk_index < len(self.chunks):
chunk = self.chunks[self.chunk_index]
if chunk is MOCK_SERIAL_END:
return 0
return len(chunk) # type: ignore[arg-type]
return 0
def read(self, size: int = 1) -> bytes:
"""Read up to size bytes from the current chunk.
This method respects the size argument and keeps any unconsumed
bytes in the current chunk so that subsequent calls to in_waiting
and read see the remaining data.
"""
if self.chunk_index < len(self.chunks):
chunk = self.chunks[self.chunk_index]
if chunk is MOCK_SERIAL_END:
# Sentinel means we're done - simulate port closed
import serial
raise serial.SerialException("Port closed")
# Respect the requested size and keep any remaining bytes
if size <= 0:
return b""
data = chunk[:size] # type: ignore[index]
remaining = chunk[size:] # type: ignore[index]
if remaining:
# Keep remaining bytes for the next read
self.chunks[self.chunk_index] = remaining # type: ignore[assignment]
else:
# Entire chunk consumed; advance to the next one
self.chunk_index += 1
return data # type: ignore[return-value]
import serial
raise serial.SerialException("Port closed")
def test_run_miniterm_batches_lines_with_same_timestamp(
capfd: CaptureFixture[str],
) -> None:
"""Test that lines from the same chunk get the same timestamp."""
# Simulate receiving multiple log lines in a single chunk
# This is how data arrives over USB - many lines at once
chunk = b"[I][app:100]: Line 1\r\n[I][app:100]: Line 2\r\n[I][app:100]: Line 3\r\n"
mock_serial = MockSerial([chunk, MOCK_SERIAL_END])
config = {
CONF_LOGGER: {
CONF_BAUD_RATE: 115200,
"deassert_rts_dtr": False,
}
}
args = MockArgs()
with (
patch("serial.Serial", return_value=mock_serial),
patch.object(platformio_api, "process_stacktrace") as mock_bt,
):
mock_bt.return_value = False
result = run_miniterm(config, "/dev/ttyUSB0", args)
assert result == 0
captured = capfd.readouterr()
lines = [line for line in captured.out.strip().split("\n") if line]
# All 3 lines should have the same timestamp (first 13 chars like "[HH:MM:SS.mmm]")
assert len(lines) == 3
timestamps = [line[:13] for line in lines]
assert timestamps[0] == timestamps[1] == timestamps[2], (
f"Lines from same chunk should have same timestamp: {timestamps}"
)
def test_run_miniterm_different_chunks_different_timestamps(
capfd: CaptureFixture[str],
) -> None:
"""Test that lines from different chunks can have different timestamps."""
# Two separate chunks - could have different timestamps
chunk1 = b"[I][app:100]: Chunk 1 Line\r\n"
chunk2 = b"[I][app:100]: Chunk 2 Line\r\n"
mock_serial = MockSerial([chunk1, chunk2, MOCK_SERIAL_END])
config = {
CONF_LOGGER: {
CONF_BAUD_RATE: 115200,
"deassert_rts_dtr": False,
}
}
args = MockArgs()
with (
patch("serial.Serial", return_value=mock_serial),
patch.object(platformio_api, "process_stacktrace") as mock_bt,
):
mock_bt.return_value = False
result = run_miniterm(config, "/dev/ttyUSB0", args)
assert result == 0
captured = capfd.readouterr()
lines = [line for line in captured.out.strip().split("\n") if line]
assert len(lines) == 2
def test_run_miniterm_handles_split_lines() -> None:
"""Test that partial lines are buffered until complete."""
# Line split across two chunks
chunk1 = b"[I][app:100]: Start of "
chunk2 = b"line\r\n"
mock_serial = MockSerial([chunk1, chunk2, MOCK_SERIAL_END])
config = {
CONF_LOGGER: {
CONF_BAUD_RATE: 115200,
"deassert_rts_dtr": False,
}
}
args = MockArgs()
with (
patch("serial.Serial", return_value=mock_serial),
patch.object(platformio_api, "process_stacktrace") as mock_bt,
patch("esphome.__main__.safe_print") as mock_print,
):
mock_bt.return_value = False
run_miniterm(config, "/dev/ttyUSB0", args)
# Should have printed exactly one complete line
assert mock_print.call_count == 1
printed_line = mock_print.call_args[0][0]
assert "Start of line" in printed_line
def test_run_miniterm_backtrace_state_maintained() -> None:
"""Test that backtrace_state is properly maintained across lines.
ESP8266 backtraces span multiple lines between >>>stack>>> and <<<stack<<<.
The backtrace_state must persist correctly when lines arrive in the same chunk.
"""
# Simulate ESP8266 multi-line backtrace arriving in a single chunk
backtrace_chunk = (
b">>>stack>>>\r\n"
b"3ffffe90: 40220ef8 b66aa8c0 3fff0a4c 40204c84\r\n"
b"3ffffea0: 00000005 0000a635 3fff191c 4020413c\r\n"
b"<<<stack<<<\r\n"
)
mock_serial = MockSerial([backtrace_chunk, MOCK_SERIAL_END])
config = {
CONF_LOGGER: {
CONF_BAUD_RATE: 115200,
"deassert_rts_dtr": False,
}
}
args = MockArgs()
backtrace_states: list[tuple[str, bool]] = []
def track_backtrace_state(
config: dict[str, Any], line: str, backtrace_state: bool
) -> bool:
"""Track the backtrace_state progression."""
backtrace_states.append((line, backtrace_state))
# Simulate actual behavior
if ">>>stack>>>" in line:
return True
if "<<<stack<<<" in line:
return False
return backtrace_state
with (
patch("serial.Serial", return_value=mock_serial),
patch.object(
platformio_api,
"process_stacktrace",
side_effect=track_backtrace_state,
),
):
run_miniterm(config, "/dev/ttyUSB0", args)
# Verify the state progression
assert len(backtrace_states) == 4
# Line 1: >>>stack>>> - state should be False (before processing)
assert ">>>stack>>>" in backtrace_states[0][0]
assert backtrace_states[0][1] is False
# Line 2: stack data - state should be True (after >>>stack>>>)
assert "40220ef8" in backtrace_states[1][0]
assert backtrace_states[1][1] is True
# Line 3: more stack data - state should be True
assert "4020413c" in backtrace_states[2][0]
assert backtrace_states[2][1] is True
# Line 4: <<<stack<<< - state should be True (before processing end marker)
assert "<<<stack<<<" in backtrace_states[3][0]
assert backtrace_states[3][1] is True
def test_run_miniterm_handles_empty_reads(
capfd: CaptureFixture[str],
) -> None:
"""Test that empty reads (timeouts) are handled correctly.
When read() returns empty bytes, the code should continue waiting
for more data without processing anything.
"""
# Simulate: empty read (timeout), then data, then empty read, then end
chunk = b"[I][app:100]: Test line\r\n"
mock_serial = MockSerial([b"", chunk, b"", MOCK_SERIAL_END])
config = {
CONF_LOGGER: {
CONF_BAUD_RATE: 115200,
"deassert_rts_dtr": False,
}
}
args = MockArgs()
with (
patch("serial.Serial", return_value=mock_serial),
patch.object(platformio_api, "process_stacktrace") as mock_bt,
):
mock_bt.return_value = False
result = run_miniterm(config, "/dev/ttyUSB0", args)
assert result == 0
captured = capfd.readouterr()
lines = [line for line in captured.out.strip().split("\n") if line]
# Should have exactly one line despite empty reads
assert len(lines) == 1
assert "Test line" in lines[0]
def test_run_miniterm_no_logger_returns_early(
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test that run_miniterm returns early if logger is not configured."""
config: dict[str, Any] = {} # No logger config
args = MockArgs()
with caplog.at_level(logging.INFO):
result = run_miniterm(config, "/dev/ttyUSB0", args)
assert result == 1
assert "Logger is not enabled" in caplog.text
def test_run_miniterm_baud_rate_zero_returns_early(
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test that run_miniterm returns early if baud_rate is 0."""
config = {
CONF_LOGGER: {
CONF_BAUD_RATE: 0,
"deassert_rts_dtr": False,
}
}
args = MockArgs()
with caplog.at_level(logging.INFO):
result = run_miniterm(config, "/dev/ttyUSB0", args)
assert result == 1
assert "UART logging is disabled" in caplog.text
def test_run_miniterm_buffer_limit_prevents_unbounded_growth() -> None:
"""Test that buffer is limited to prevent unbounded memory growth.
If a device sends data without newlines, the buffer should be truncated
to SERIAL_BUFFER_MAX_SIZE to prevent memory exhaustion.
"""
# Use a small buffer limit for testing
test_buffer_limit = 100
# Create data larger than the limit without newlines
large_data_no_newline = b"X" * 150 # 150 bytes, no newline
final_line = b"END\r\n"
mock_serial = MockSerial([large_data_no_newline, final_line, MOCK_SERIAL_END])
config = {
CONF_LOGGER: {
CONF_BAUD_RATE: 115200,
"deassert_rts_dtr": False,
}
}
args = MockArgs()
with (
patch("serial.Serial", return_value=mock_serial),
patch.object(platformio_api, "process_stacktrace") as mock_bt,
patch("esphome.__main__.safe_print") as mock_print,
patch("esphome.__main__.SERIAL_BUFFER_MAX_SIZE", test_buffer_limit),
):
mock_bt.return_value = False
run_miniterm(config, "/dev/ttyUSB0", args)
# Should have printed exactly one line
assert mock_print.call_count == 1
printed_line = mock_print.call_args[0][0]
# The line should contain "END" and some X's, but not all 150 X's
# because the buffer was truncated
assert "END" in printed_line
assert "X" in printed_line
# Verify truncation happened - we shouldn't have all 150 X's
# The buffer logic is:
# 1. Add 150 X's -> buffer = 150 bytes -> truncate to last 100 = 100 X's
# 2. Add "END\r\n" (5 bytes) -> buffer = 105 bytes -> truncate to last 100
# = 95 X's + "END\r\n"
# 3. Find newline, extract line = "95 X's + END"
x_count = printed_line.count("X")
assert x_count < 150, f"Expected truncation but got {x_count} X's"
assert x_count == 95, f"Expected 95 X's after truncation but got {x_count}"