1
0
mirror of https://github.com/esphome/esphome.git synced 2026-02-09 09:11:52 +00:00

Compare commits

..

4 Commits

Author SHA1 Message Date
J. Nick Koston
4795971f1c Use usefixtures for tests that don't reference mock_auth_settings
Replace unused mock_auth_settings parameter with
@pytest.mark.usefixtures decorator to avoid PLW0613 lint warnings.
2026-02-08 07:25:19 -06:00
J. Nick Koston
82d9616f1b Add explicit binascii.Error catch and bad-padding test
binascii.Error is already a subclass of ValueError, but listing it
explicitly makes the intent clear. Added test for incorrect base64
padding (e.g. "Basic abc").
2026-02-08 07:18:29 -06:00
J. Nick Koston
806a86a6ad Add test coverage for is_authenticated base64 handling
Tests malformed base64, invalid UTF-8, missing colon separator,
valid credentials, wrong credentials, and auth-disabled cases.
2026-02-08 07:06:24 -06:00
J. Nick Koston
2829f7b485 [dashboard] Handle malformed Basic Auth headers gracefully
Wrap base64 decode and split in try/except so malformed
Authorization headers return a clean 401 instead of an
unhandled exception producing a 500 response with stack
trace in logs.

Catches ValueError (covers binascii.Error from b64decode)
and UnicodeDecodeError (from .decode()).
2026-02-08 06:47:49 -06:00
3 changed files with 102 additions and 27 deletions

View File

@@ -219,50 +219,39 @@ void Modbus::send(uint8_t address, uint8_t function_code, uint16_t start_address
return;
}
static constexpr size_t ADDR_SIZE = 1;
static constexpr size_t FC_SIZE = 1;
static constexpr size_t START_ADDR_SIZE = 2;
static constexpr size_t NUM_ENTITIES_SIZE = 2;
static constexpr size_t BYTE_COUNT_SIZE = 1;
static constexpr size_t MAX_PAYLOAD_SIZE = std::numeric_limits<uint8_t>::max();
static constexpr size_t CRC_SIZE = 2;
static constexpr size_t MAX_FRAME_SIZE =
ADDR_SIZE + FC_SIZE + START_ADDR_SIZE + NUM_ENTITIES_SIZE + BYTE_COUNT_SIZE + MAX_PAYLOAD_SIZE + CRC_SIZE;
uint8_t data[MAX_FRAME_SIZE];
size_t pos = 0;
data[pos++] = address;
data[pos++] = function_code;
std::vector<uint8_t> data;
data.push_back(address);
data.push_back(function_code);
if (this->role == ModbusRole::CLIENT) {
data[pos++] = start_address >> 8;
data[pos++] = start_address >> 0;
data.push_back(start_address >> 8);
data.push_back(start_address >> 0);
if (function_code != ModbusFunctionCode::WRITE_SINGLE_COIL &&
function_code != ModbusFunctionCode::WRITE_SINGLE_REGISTER) {
data[pos++] = number_of_entities >> 8;
data[pos++] = number_of_entities >> 0;
data.push_back(number_of_entities >> 8);
data.push_back(number_of_entities >> 0);
}
}
if (payload != nullptr) {
if (this->role == ModbusRole::SERVER || function_code == ModbusFunctionCode::WRITE_MULTIPLE_COILS ||
function_code == ModbusFunctionCode::WRITE_MULTIPLE_REGISTERS) { // Write multiple
data[pos++] = payload_len; // Byte count is required for write
data.push_back(payload_len); // Byte count is required for write
} else {
payload_len = 2; // Write single register or coil
}
for (int i = 0; i < payload_len; i++) {
data[pos++] = payload[i];
data.push_back(payload[i]);
}
}
auto crc = crc16(data, pos);
data[pos++] = crc >> 0;
data[pos++] = crc >> 8;
auto crc = crc16(data.data(), data.size());
data.push_back(crc >> 0);
data.push_back(crc >> 8);
if (this->flow_control_pin_ != nullptr)
this->flow_control_pin_->digital_write(true);
this->write_array(data, pos);
this->write_array(data);
this->flush();
if (this->flow_control_pin_ != nullptr)
@@ -272,7 +261,7 @@ void Modbus::send(uint8_t address, uint8_t function_code, uint16_t start_address
#if ESPHOME_LOG_LEVEL >= ESPHOME_LOG_LEVEL_VERBOSE
char hex_buf[format_hex_pretty_size(MODBUS_MAX_LOG_BYTES)];
#endif
ESP_LOGV(TAG, "Modbus write: %s", format_hex_pretty_to(hex_buf, data, pos));
ESP_LOGV(TAG, "Modbus write: %s", format_hex_pretty_to(hex_buf, data.data(), data.size()));
}
// Helper function for lambdas

View File

@@ -120,8 +120,11 @@ def is_authenticated(handler: BaseHandler) -> bool:
if auth_header := handler.request.headers.get("Authorization"):
assert isinstance(auth_header, str)
if auth_header.startswith("Basic "):
auth_decoded = base64.b64decode(auth_header[6:]).decode()
username, password = auth_decoded.split(":", 1)
try:
auth_decoded = base64.b64decode(auth_header[6:]).decode()
username, password = auth_decoded.split(":", 1)
except (binascii.Error, ValueError, UnicodeDecodeError):
return False
return settings.check_password(username, password)
return handler.get_secure_cookie(AUTH_COOKIE_NAME) == COOKIE_AUTHENTICATED_YES

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
from argparse import Namespace
import asyncio
import base64
from collections.abc import Generator
from contextlib import asynccontextmanager
import gzip
@@ -1676,3 +1677,85 @@ def test_proc_on_exit_skips_when_already_closed() -> None:
handler.write_message.assert_not_called()
handler.close.assert_not_called()
def _make_auth_handler(auth_header: str | None = None) -> Mock:
"""Create a mock handler with the given Authorization header."""
handler = Mock()
handler.request = Mock()
if auth_header is not None:
handler.request.headers = {"Authorization": auth_header}
else:
handler.request.headers = {}
handler.get_secure_cookie = Mock(return_value=None)
return handler
@pytest.fixture
def mock_auth_settings(mock_dashboard_settings: MagicMock) -> MagicMock:
"""Fixture to configure mock dashboard settings with auth enabled."""
mock_dashboard_settings.using_auth = True
mock_dashboard_settings.on_ha_addon = False
return mock_dashboard_settings
@pytest.mark.usefixtures("mock_auth_settings")
def test_is_authenticated_malformed_base64() -> None:
"""Test that invalid base64 in Authorization header returns False."""
handler = _make_auth_handler("Basic !!!not-valid-base64!!!")
assert web_server.is_authenticated(handler) is False
@pytest.mark.usefixtures("mock_auth_settings")
def test_is_authenticated_bad_base64_padding() -> None:
"""Test that incorrect base64 padding (binascii.Error) returns False."""
handler = _make_auth_handler("Basic abc")
assert web_server.is_authenticated(handler) is False
@pytest.mark.usefixtures("mock_auth_settings")
def test_is_authenticated_invalid_utf8() -> None:
"""Test that base64 decoding to invalid UTF-8 returns False."""
# \xff\xfe is invalid UTF-8
bad_payload = base64.b64encode(b"\xff\xfe").decode("ascii")
handler = _make_auth_handler(f"Basic {bad_payload}")
assert web_server.is_authenticated(handler) is False
@pytest.mark.usefixtures("mock_auth_settings")
def test_is_authenticated_no_colon() -> None:
"""Test that base64 payload without ':' separator returns False."""
no_colon = base64.b64encode(b"nocolonhere").decode("ascii")
handler = _make_auth_handler(f"Basic {no_colon}")
assert web_server.is_authenticated(handler) is False
def test_is_authenticated_valid_credentials(
mock_auth_settings: MagicMock,
) -> None:
"""Test that valid Basic auth credentials are checked."""
creds = base64.b64encode(b"admin:secret").decode("ascii")
mock_auth_settings.check_password.return_value = True
handler = _make_auth_handler(f"Basic {creds}")
assert web_server.is_authenticated(handler) is True
mock_auth_settings.check_password.assert_called_once_with("admin", "secret")
def test_is_authenticated_wrong_credentials(
mock_auth_settings: MagicMock,
) -> None:
"""Test that valid Basic auth with wrong credentials returns False."""
creds = base64.b64encode(b"admin:wrong").decode("ascii")
mock_auth_settings.check_password.return_value = False
handler = _make_auth_handler(f"Basic {creds}")
assert web_server.is_authenticated(handler) is False
def test_is_authenticated_no_auth_configured(
mock_dashboard_settings: MagicMock,
) -> None:
"""Test that requests pass when auth is not configured."""
mock_dashboard_settings.using_auth = False
mock_dashboard_settings.on_ha_addon = False
handler = _make_auth_handler()
assert web_server.is_authenticated(handler) is True