1
0
mirror of https://github.com/esphome/esphome.git synced 2025-10-30 14:43:51 +00:00

add ble logger

This commit is contained in:
Tomasz Duda
2024-02-21 20:17:01 +01:00
parent 84959cc167
commit 5078a9a388
3 changed files with 95 additions and 25 deletions

View File

@@ -48,7 +48,13 @@ from esphome.util import (
get_serial_ports,
)
from esphome.log import color, setup_log, Fore
from .zephyr_tools import smpmgr_upload, smpmgr_scan
from .zephyr_tools import (
logger_connect,
smpmgr_upload,
is_mac_address,
logger_scan,
smpmgr_scan,
)
_LOGGER = logging.getLogger(__name__)
@@ -88,33 +94,49 @@ def choose_prompt(options, purpose: str = None):
def choose_upload_log_host(
default, check_default, show_ota, show_mqtt, show_api, purpose: str = None
):
try:
mcuboot = CORE.config["nrf52"]["bootloader"] == "mcuboot"
except KeyError:
mcuboot = False
try:
ble_logger = CORE.config["zephyr_ble_nus"]["log"]
except KeyError:
ble_logger = False
ota = "ota" in CORE.config
options = []
for port in get_serial_ports():
options.append((f"{port.path} ({port.description})", port.path))
# if show_ota and CONF_FOTA in CORE.config:
# options.append(
# (f"mcumgr {port.path} ({port.description})", f"mcumgr {port.path}")
# )
if mcuboot and show_ota and ota:
for port in get_serial_ports():
options.append(
(f"mcumgr {port.path} ({port.description})", f"mcumgr {port.path}")
)
else:
for port in get_serial_ports():
options.append((f"{port.path} ({port.description})", port.path))
if default == "SERIAL":
return choose_prompt(options, purpose=purpose)
if default == "PYOCD":
options = [("pyocd", "PYOCD")]
return choose_prompt(options, purpose=purpose)
if CORE.target_platform in (PLATFORM_NRF52):
if (show_ota and "ota" in CORE.config) and default is None:
ble_devices = asyncio.run(smpmgr_scan())
if len(ble_devices) == 0:
_LOGGER.warning("No OTA service found!")
for device in ble_devices:
if mcuboot:
if show_ota and ota:
if default:
options.append(
(
f"FOTA over Bluetooth LE({device.address}) {device.name}",
f"mcumgr {device.address}",
)
(f"OTA over Bluetooth LE ({default})", f"mcumgr {default}")
)
return choose_prompt(options, purpose=purpose)
return choose_prompt(options, purpose=purpose)
else:
ble_devices = asyncio.run(smpmgr_scan(CORE.config["esphome"]["name"]))
if len(ble_devices) == 0:
_LOGGER.warning("No OTA over Bluetooth LE service found!")
for device in ble_devices:
options.append(
(
f"OTA over Bluetooth LE({device.address}) {device.name}",
f"mcumgr {device.address}",
)
)
else:
if (show_ota and "ota" in CORE.config) or (show_api and "api" in CORE.config):
if (show_ota and ota) or (show_api and "api" in CORE.config):
options.append((f"Over The Air ({CORE.address})", CORE.address))
if default == "OTA":
return CORE.address
@@ -122,6 +144,12 @@ def choose_upload_log_host(
options.append((f"MQTT ({CORE.config['mqtt'][CONF_BROKER]})", "MQTT"))
if default == "OTA":
return "MQTT"
if "logging" == purpose and ble_logger and default is None:
ble_device = asyncio.run(logger_scan(CORE.config["esphome"]["name"]))
if ble_device:
options.append((f"Bluetooth LE logger ({ble_device})", ble_device.address))
else:
_LOGGER.warning("No logger over Bluetooth LE service found!")
if default is not None:
return default
if check_default is not None and check_default in [opt[1] for opt in options]:
@@ -134,6 +162,8 @@ def get_port_type(port):
return "SERIAL"
if port == "MQTT":
return "MQTT"
if is_mac_address(port):
return "BLE"
return "NETWORK"
@@ -403,6 +433,9 @@ def show_logs(config, args, port):
config, args.topic, args.username, args.password, args.client_id
)
if get_port_type(port) == "BLE":
return asyncio.run(logger_connect(port))
raise EsphomeError("No remote or local logging method configured (api/mqtt/logger)")

View File

@@ -83,6 +83,8 @@ void BLENUS::setup() {
if (logger::global_logger != nullptr && this->expose_log_) {
logger::global_logger->add_on_log_callback([this](int level, const char *tag, const char *message) {
this->write_array(reinterpret_cast<const uint8_t *>(message), strlen(message));
const char c = '\n';
this->write_array(reinterpret_cast<const uint8_t *>(&c), 1);
});
}
#endif

View File

@@ -3,8 +3,8 @@ import logging
import re
from typing import Final
from rich.pretty import pprint
from bleak import BleakScanner
from bleak.exc import BleakDeviceNotFoundError
from bleak import BleakScanner, BleakClient
from bleak.exc import BleakDeviceNotFoundError, BleakDBusError
from smpclient.transport.ble import SMPBLETransport
from smpclient.transport.serial import SMPSerialTransport
from smpclient import SMPClient
@@ -16,6 +16,8 @@ from smpclient.generics import error, success
from esphome.espota2 import ProgressBar
SMP_SERVICE_UUID = "8D53DC1D-1DB7-4CD3-868B-8A527460AA84"
NUS_SERVICE_UUID = "6E400001-B5A3-F393-E0A9-E50E24DCCA9E"
NUS_TX_CHAR_UUID = "6E400003-B5A3-F393-E0A9-E50E24DCCA9E"
MAC_ADDRESS_PATTERN: Final = re.compile(
r"([0-9A-F]{2}[:]){5}[0-9A-F]{2}$", flags=re.IGNORECASE
)
@@ -23,9 +25,42 @@ MAC_ADDRESS_PATTERN: Final = re.compile(
_LOGGER = logging.getLogger(__name__)
async def smpmgr_scan():
_LOGGER.info("Scanning bluetooth...")
devices = await BleakScanner.discover(service_uuids=[SMP_SERVICE_UUID])
def is_mac_address(value):
return MAC_ADDRESS_PATTERN.match(value)
async def logger_scan(name):
_LOGGER.info(f"Scanning bluetooth for {name}...")
device = await BleakScanner.find_device_by_name(name)
return device
async def logger_connect(host):
disconnected_event = asyncio.Event()
def handle_disconnect(client):
disconnected_event.set()
def handle_rx(_, data: bytearray):
print(data.decode("utf-8"), end="")
_LOGGER.info(f"Connecting {host}...")
async with BleakClient(host, disconnected_callback=handle_disconnect) as client:
_LOGGER.info(f"Connected {host}...")
try:
await client.start_notify(NUS_TX_CHAR_UUID, handle_rx)
except BleakDBusError as e:
_LOGGER.error(f"Bluetooth LE logger: {e}")
disconnected_event.set()
await disconnected_event.wait()
async def smpmgr_scan(name):
_LOGGER.info(f"Scanning bluetooth for {name}...")
devices = []
for device in await BleakScanner.discover(service_uuids=[SMP_SERVICE_UUID]):
if device.name == name:
devices += [device]
return devices
@@ -53,7 +88,7 @@ async def smpmgr_upload(config, host, firmware):
if image_tlv_sha256 is None:
return 1
if MAC_ADDRESS_PATTERN.match(host):
if is_mac_address(host):
smp_client = SMPClient(SMPBLETransport(), host)
else:
smp_client = SMPClient(SMPSerialTransport(mtu=256), host)