mirror of
https://github.com/esphome/esphome.git
synced 2026-02-09 09:11:52 +00:00
Compare commits
4 Commits
modbus_sen
...
hardening/
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4795971f1c | ||
|
|
82d9616f1b | ||
|
|
806a86a6ad | ||
|
|
2829f7b485 |
@@ -219,50 +219,39 @@ void Modbus::send(uint8_t address, uint8_t function_code, uint16_t start_address
|
||||
return;
|
||||
}
|
||||
|
||||
static constexpr size_t ADDR_SIZE = 1;
|
||||
static constexpr size_t FC_SIZE = 1;
|
||||
static constexpr size_t START_ADDR_SIZE = 2;
|
||||
static constexpr size_t NUM_ENTITIES_SIZE = 2;
|
||||
static constexpr size_t BYTE_COUNT_SIZE = 1;
|
||||
static constexpr size_t MAX_PAYLOAD_SIZE = std::numeric_limits<uint8_t>::max();
|
||||
static constexpr size_t CRC_SIZE = 2;
|
||||
static constexpr size_t MAX_FRAME_SIZE =
|
||||
ADDR_SIZE + FC_SIZE + START_ADDR_SIZE + NUM_ENTITIES_SIZE + BYTE_COUNT_SIZE + MAX_PAYLOAD_SIZE + CRC_SIZE;
|
||||
uint8_t data[MAX_FRAME_SIZE];
|
||||
size_t pos = 0;
|
||||
|
||||
data[pos++] = address;
|
||||
data[pos++] = function_code;
|
||||
std::vector<uint8_t> data;
|
||||
data.push_back(address);
|
||||
data.push_back(function_code);
|
||||
if (this->role == ModbusRole::CLIENT) {
|
||||
data[pos++] = start_address >> 8;
|
||||
data[pos++] = start_address >> 0;
|
||||
data.push_back(start_address >> 8);
|
||||
data.push_back(start_address >> 0);
|
||||
if (function_code != ModbusFunctionCode::WRITE_SINGLE_COIL &&
|
||||
function_code != ModbusFunctionCode::WRITE_SINGLE_REGISTER) {
|
||||
data[pos++] = number_of_entities >> 8;
|
||||
data[pos++] = number_of_entities >> 0;
|
||||
data.push_back(number_of_entities >> 8);
|
||||
data.push_back(number_of_entities >> 0);
|
||||
}
|
||||
}
|
||||
|
||||
if (payload != nullptr) {
|
||||
if (this->role == ModbusRole::SERVER || function_code == ModbusFunctionCode::WRITE_MULTIPLE_COILS ||
|
||||
function_code == ModbusFunctionCode::WRITE_MULTIPLE_REGISTERS) { // Write multiple
|
||||
data[pos++] = payload_len; // Byte count is required for write
|
||||
data.push_back(payload_len); // Byte count is required for write
|
||||
} else {
|
||||
payload_len = 2; // Write single register or coil
|
||||
}
|
||||
for (int i = 0; i < payload_len; i++) {
|
||||
data[pos++] = payload[i];
|
||||
data.push_back(payload[i]);
|
||||
}
|
||||
}
|
||||
|
||||
auto crc = crc16(data, pos);
|
||||
data[pos++] = crc >> 0;
|
||||
data[pos++] = crc >> 8;
|
||||
auto crc = crc16(data.data(), data.size());
|
||||
data.push_back(crc >> 0);
|
||||
data.push_back(crc >> 8);
|
||||
|
||||
if (this->flow_control_pin_ != nullptr)
|
||||
this->flow_control_pin_->digital_write(true);
|
||||
|
||||
this->write_array(data, pos);
|
||||
this->write_array(data);
|
||||
this->flush();
|
||||
|
||||
if (this->flow_control_pin_ != nullptr)
|
||||
@@ -272,7 +261,7 @@ void Modbus::send(uint8_t address, uint8_t function_code, uint16_t start_address
|
||||
#if ESPHOME_LOG_LEVEL >= ESPHOME_LOG_LEVEL_VERBOSE
|
||||
char hex_buf[format_hex_pretty_size(MODBUS_MAX_LOG_BYTES)];
|
||||
#endif
|
||||
ESP_LOGV(TAG, "Modbus write: %s", format_hex_pretty_to(hex_buf, data, pos));
|
||||
ESP_LOGV(TAG, "Modbus write: %s", format_hex_pretty_to(hex_buf, data.data(), data.size()));
|
||||
}
|
||||
|
||||
// Helper function for lambdas
|
||||
|
||||
@@ -120,8 +120,11 @@ def is_authenticated(handler: BaseHandler) -> bool:
|
||||
if auth_header := handler.request.headers.get("Authorization"):
|
||||
assert isinstance(auth_header, str)
|
||||
if auth_header.startswith("Basic "):
|
||||
auth_decoded = base64.b64decode(auth_header[6:]).decode()
|
||||
username, password = auth_decoded.split(":", 1)
|
||||
try:
|
||||
auth_decoded = base64.b64decode(auth_header[6:]).decode()
|
||||
username, password = auth_decoded.split(":", 1)
|
||||
except (binascii.Error, ValueError, UnicodeDecodeError):
|
||||
return False
|
||||
return settings.check_password(username, password)
|
||||
return handler.get_secure_cookie(AUTH_COOKIE_NAME) == COOKIE_AUTHENTICATED_YES
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@ from __future__ import annotations
|
||||
|
||||
from argparse import Namespace
|
||||
import asyncio
|
||||
import base64
|
||||
from collections.abc import Generator
|
||||
from contextlib import asynccontextmanager
|
||||
import gzip
|
||||
@@ -1676,3 +1677,85 @@ def test_proc_on_exit_skips_when_already_closed() -> None:
|
||||
|
||||
handler.write_message.assert_not_called()
|
||||
handler.close.assert_not_called()
|
||||
|
||||
|
||||
def _make_auth_handler(auth_header: str | None = None) -> Mock:
|
||||
"""Create a mock handler with the given Authorization header."""
|
||||
handler = Mock()
|
||||
handler.request = Mock()
|
||||
if auth_header is not None:
|
||||
handler.request.headers = {"Authorization": auth_header}
|
||||
else:
|
||||
handler.request.headers = {}
|
||||
handler.get_secure_cookie = Mock(return_value=None)
|
||||
return handler
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_auth_settings(mock_dashboard_settings: MagicMock) -> MagicMock:
|
||||
"""Fixture to configure mock dashboard settings with auth enabled."""
|
||||
mock_dashboard_settings.using_auth = True
|
||||
mock_dashboard_settings.on_ha_addon = False
|
||||
return mock_dashboard_settings
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("mock_auth_settings")
|
||||
def test_is_authenticated_malformed_base64() -> None:
|
||||
"""Test that invalid base64 in Authorization header returns False."""
|
||||
handler = _make_auth_handler("Basic !!!not-valid-base64!!!")
|
||||
assert web_server.is_authenticated(handler) is False
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("mock_auth_settings")
|
||||
def test_is_authenticated_bad_base64_padding() -> None:
|
||||
"""Test that incorrect base64 padding (binascii.Error) returns False."""
|
||||
handler = _make_auth_handler("Basic abc")
|
||||
assert web_server.is_authenticated(handler) is False
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("mock_auth_settings")
|
||||
def test_is_authenticated_invalid_utf8() -> None:
|
||||
"""Test that base64 decoding to invalid UTF-8 returns False."""
|
||||
# \xff\xfe is invalid UTF-8
|
||||
bad_payload = base64.b64encode(b"\xff\xfe").decode("ascii")
|
||||
handler = _make_auth_handler(f"Basic {bad_payload}")
|
||||
assert web_server.is_authenticated(handler) is False
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("mock_auth_settings")
|
||||
def test_is_authenticated_no_colon() -> None:
|
||||
"""Test that base64 payload without ':' separator returns False."""
|
||||
no_colon = base64.b64encode(b"nocolonhere").decode("ascii")
|
||||
handler = _make_auth_handler(f"Basic {no_colon}")
|
||||
assert web_server.is_authenticated(handler) is False
|
||||
|
||||
|
||||
def test_is_authenticated_valid_credentials(
|
||||
mock_auth_settings: MagicMock,
|
||||
) -> None:
|
||||
"""Test that valid Basic auth credentials are checked."""
|
||||
creds = base64.b64encode(b"admin:secret").decode("ascii")
|
||||
mock_auth_settings.check_password.return_value = True
|
||||
handler = _make_auth_handler(f"Basic {creds}")
|
||||
assert web_server.is_authenticated(handler) is True
|
||||
mock_auth_settings.check_password.assert_called_once_with("admin", "secret")
|
||||
|
||||
|
||||
def test_is_authenticated_wrong_credentials(
|
||||
mock_auth_settings: MagicMock,
|
||||
) -> None:
|
||||
"""Test that valid Basic auth with wrong credentials returns False."""
|
||||
creds = base64.b64encode(b"admin:wrong").decode("ascii")
|
||||
mock_auth_settings.check_password.return_value = False
|
||||
handler = _make_auth_handler(f"Basic {creds}")
|
||||
assert web_server.is_authenticated(handler) is False
|
||||
|
||||
|
||||
def test_is_authenticated_no_auth_configured(
|
||||
mock_dashboard_settings: MagicMock,
|
||||
) -> None:
|
||||
"""Test that requests pass when auth is not configured."""
|
||||
mock_dashboard_settings.using_auth = False
|
||||
mock_dashboard_settings.on_ha_addon = False
|
||||
handler = _make_auth_handler()
|
||||
assert web_server.is_authenticated(handler) is True
|
||||
|
||||
Reference in New Issue
Block a user