1
0
mirror of https://github.com/esphome/esphome.git synced 2025-04-15 15:20:27 +01:00
This commit is contained in:
Tomasz Duda 2024-02-11 13:57:09 +01:00
parent aed9e117ec
commit da91b37e57
13 changed files with 531 additions and 95 deletions

View File

@ -6,6 +6,7 @@ import os
import re
import sys
import time
import asyncio
from datetime import datetime
import argcomplete
@ -21,6 +22,7 @@ from esphome.const import (
CONF_LOGGER,
CONF_NAME,
CONF_OTA,
CONF_FOTA,
CONF_MQTT,
CONF_MDNS,
CONF_DISABLED,
@ -47,6 +49,7 @@ from esphome.util import (
get_serial_ports,
)
from esphome.log import color, setup_log, Fore
from .zephyr_tools import smpmgr_scan, smpmgr_upload, list_pyocd
_LOGGER = logging.getLogger(__name__)
@ -89,12 +92,40 @@ def choose_upload_log_host(
options = []
for port in get_serial_ports():
options.append((f"{port.path} ({port.description})", port.path))
if show_ota and CONF_FOTA in CORE.config:
options.append(
(f"mcumgr {port.path} ({port.description})", f"mcumgr {port.path}")
)
if default == "SERIAL":
return choose_prompt(options, purpose=purpose)
pyocd = list_pyocd()
if len(pyocd) > 0:
for probe in list_pyocd():
options.append(
(
f"pyOCD {probe['product_name']} ({probe['unique_id']})",
f"pyocd {probe['unique_id']}",
)
)
if default == "PYOCD":
return choose_prompt(options, purpose=purpose)
if (show_ota and "ota" in CORE.config) or (show_api and "api" in CORE.config):
options.append((f"Over The Air ({CORE.address})", CORE.address))
if default == "OTA":
return CORE.address
if show_ota and CONF_FOTA in CORE.config:
if default is None or default == "FOTA":
ble_devices = asyncio.run(smpmgr_scan())
if len(ble_devices) == 0:
_LOGGER.warning("No FOTA service found!")
for device in ble_devices:
options.append(
(
f"FOTA over Bluetooth LE({device.address}) {device.name}",
f"mcumgr {device.address}",
)
)
return choose_prompt(options, purpose=purpose)
if show_mqtt and CONF_MQTT in CORE.config:
options.append((f"MQTT ({CORE.config['mqtt'][CONF_BROKER]})", "MQTT"))
if default == "OTA":
@ -316,6 +347,17 @@ def upload_program(config, args, host):
raise EsphomeError(f"Unknown target platform: {CORE.target_platform}")
if host.startswith("pyocd"):
print(host.split(" ")[1])
raise EsphomeError("Not implemented")
if host.startswith("mcumgr"):
firmware = os.path.abspath(
CORE.relative_pioenvs_path(CORE.name, "zephyr", "app_update.bin")
)
if CONF_FOTA in config:
return asyncio.run(smpmgr_upload(config, host.split(" ")[1], firmware))
if CONF_OTA not in config:
raise EsphomeError(
"Cannot upload Over the Air as the config does not include the ota: "

View File

@ -6,7 +6,7 @@ from esphome.const import (
)
from esphome.core import CORE
from esphome.components.nrf52 import add_zephyr_prj_conf_option
from esphome.components.nrf52.zephyr import zephyr_add_prj_conf
dfu_ns = cg.esphome_ns.namespace("dfu")
DeviceFirmwareUpdate = dfu_ns.class_("DeviceFirmwareUpdate", cg.Component)
@ -32,5 +32,5 @@ async def to_code(config):
# week symbol do not work for some reason so use wrap instaed
cg.add_build_flag("-Wl,--wrap=tud_cdc_line_state_cb")
elif CORE.using_zephyr:
add_zephyr_prj_conf_option("CONFIG_CDC_ACM_DTE_RATE_CALLBACK_SUPPORT", True)
zephyr_add_prj_conf("CDC_ACM_DTE_RATE_CALLBACK_SUPPORT", True)
await cg.register_component(var, config)

View File

@ -0,0 +1,88 @@
import esphome.codegen as cg
import esphome.config_validation as cv
from esphome.const import (
CONF_ID,
)
from esphome.components.nrf52.zephyr import zephyr_add_prj_conf, zephyr_add_overlay
fota_ns = cg.esphome_ns.namespace("fota")
FOTAComponent = fota_ns.class_("FOTAComponent", cg.Component)
CONFIG_SCHEMA = cv.All(
cv.Schema(
{
cv.GenerateID(): cv.declare_id(FOTAComponent),
}
).extend(cv.COMPONENT_SCHEMA),
cv.only_on_nrf52,
cv.only_with_zephyr,
)
bt = True
cdc = True
async def to_code(config):
var = cg.new_Pvariable(config[CONF_ID])
zephyr_add_prj_conf("NET_BUF", True)
zephyr_add_prj_conf("ZCBOR", True)
zephyr_add_prj_conf("MCUMGR", True)
zephyr_add_prj_conf("BOOTLOADER_MCUBOOT", True)
zephyr_add_prj_conf("IMG_MANAGER", True)
zephyr_add_prj_conf("STREAM_FLASH", True)
zephyr_add_prj_conf("FLASH_MAP", True)
zephyr_add_prj_conf("FLASH", True)
zephyr_add_prj_conf("MCUBOOT_SHELL", True)
zephyr_add_prj_conf("SHELL", True)
zephyr_add_prj_conf("SYSTEM_WORKQUEUE_STACK_SIZE", 2304)
zephyr_add_prj_conf("MAIN_STACK_SIZE", 2048)
zephyr_add_prj_conf("MCUMGR_GRP_IMG", True)
zephyr_add_prj_conf("MCUMGR_GRP_OS", True)
# echo
zephyr_add_prj_conf("MCUMGR_GRP_OS_ECHO", True)
# for Android-nRF-Connect-Device-Manager
zephyr_add_prj_conf("MCUMGR_GRP_OS_INFO", True)
zephyr_add_prj_conf("MCUMGR_GRP_OS_BOOTLOADER_INFO", True)
zephyr_add_prj_conf("MCUMGR_GRP_OS_MCUMGR_PARAMS", True)
# bt update fails without this
zephyr_add_prj_conf("MCUMGR_GRP_SHELL", True)
# make MTU bigger and other things
zephyr_add_prj_conf("NCS_SAMPLE_MCUMGR_BT_OTA_DFU_SPEEDUP", True)
# zephyr_add_prj_conf("MCUMGR_TRANSPORT_LOG_LEVEL_DBG", True)
if bt:
zephyr_add_prj_conf("BT", True)
zephyr_add_prj_conf("BT_PERIPHERAL", True)
zephyr_add_prj_conf("MCUMGR_TRANSPORT_BT", True)
# fix corrupted data during ble transport
zephyr_add_prj_conf("MCUMGR_TRANSPORT_BT_REASSEMBLY", True)
if cdc:
zephyr_add_prj_conf("MCUMGR_TRANSPORT_UART", True)
zephyr_add_prj_conf("USB_DEVICE_STACK", True)
zephyr_add_prj_conf("BASE64", True)
zephyr_add_prj_conf("CONSOLE", True)
# needed ?
zephyr_add_prj_conf("SERIAL", True)
zephyr_add_prj_conf("UART_LINE_CTRL", True)
zephyr_add_overlay(
"""
/ {
chosen {
zephyr,uart-mcumgr = &cdc_acm_uart0;
};
};
&zephyr_udc0 {
cdc_acm_uart0: cdc_acm_uart0 {
compatible = "zephyr,cdc-acm-uart";
};
};
"""
)
await cg.register_component(var, config)

View File

@ -0,0 +1,80 @@
/*
* Copyright (c) 2012-2014 Wind River Systems, Inc.
* Copyright (c) 2020 Prevas A/S
*
* SPDX-License-Identifier: Apache-2.0
*/
#include <zephyr/bluetooth/bluetooth.h>
#include <zephyr/bluetooth/conn.h>
#include <zephyr/bluetooth/gatt.h>
#include <zephyr/mgmt/mcumgr/transport/smp_bt.h>
#define LOG_LEVEL LOG_LEVEL_DBG
#include <zephyr/logging/log.h>
LOG_MODULE_REGISTER(smp_bt_sample);
static struct k_work advertise_work;
static const struct bt_data ad[] = {
BT_DATA_BYTES(BT_DATA_FLAGS, (BT_LE_AD_GENERAL | BT_LE_AD_NO_BREDR)),
BT_DATA_BYTES(BT_DATA_UUID128_ALL,
0x84, 0xaa, 0x60, 0x74, 0x52, 0x8a, 0x8b, 0x86,
0xd3, 0x4c, 0xb7, 0x1d, 0x1d, 0xdc, 0x53, 0x8d),
};
static void advertise(struct k_work *work)
{
int rc;
bt_le_adv_stop();
rc = bt_le_adv_start(BT_LE_ADV_CONN_NAME, ad, ARRAY_SIZE(ad), NULL, 0);
if (rc) {
LOG_ERR("Advertising failed to start (rc %d)", rc);
return;
}
LOG_INF("Advertising successfully started");
}
static void connected(struct bt_conn *conn, uint8_t err)
{
if (err) {
LOG_ERR("Connection failed (err 0x%02x)", err);
} else {
LOG_INF("Connected");
}
}
static void disconnected(struct bt_conn *conn, uint8_t reason)
{
LOG_INF("Disconnected (reason 0x%02x)", reason);
k_work_submit(&advertise_work);
}
BT_CONN_CB_DEFINE(conn_callbacks) = {
.connected = connected,
.disconnected = disconnected,
};
static void bt_ready(int err)
{
if (err != 0) {
LOG_ERR("Bluetooth failed to initialise: %d", err);
} else {
k_work_submit(&advertise_work);
}
}
void start_smp_bluetooth_adverts(void)
{
int rc;
k_work_init(&advertise_work, advertise);
rc = bt_enable(bt_ready);
if (rc != 0) {
LOG_ERR("Bluetooth enable failed: %d", rc);
}
}

View File

@ -0,0 +1,18 @@
#include "fota_component.h"
#include <zephyr/usb/usb_device.h>
#include <zephyr/mgmt/mcumgr/mgmt/callbacks.h>
extern "C" void start_smp_bluetooth_adverts();
namespace esphome {
namespace fota {
void FOTAComponent::setup() {
if (IS_ENABLED(CONFIG_USB_DEVICE_STACK)) {
usb_enable(NULL);
}
start_smp_bluetooth_adverts();
}
} // namespace fota
} // namespace esphome

View File

@ -0,0 +1,14 @@
#pragma once
#include "esphome/core/component.h"
namespace esphome {
namespace fota {
class FOTAComponent : public Component {
public:
void setup() override;
};
} // namespace fota
} // namespace esphome

View File

@ -40,7 +40,7 @@ from esphome.components.libretiny.const import (
COMPONENT_BK72XX,
COMPONENT_RTL87XX,
)
from esphome.components.nrf52 import add_zephyr_overlay, add_zephyr_prj_conf_option
from esphome.components.nrf52.zephyr import zephyr_add_overlay, zephyr_add_prj_conf
CODEOWNERS = ["@esphome/core"]
logger_ns = cg.esphome_ns.namespace("logger")
@ -298,9 +298,20 @@ async def to_code(config):
if CORE.using_zephyr:
if config[CONF_HARDWARE_UART] == UART0:
add_zephyr_overlay("""&uart0 { status = "okay";};""")
zephyr_add_overlay("""&uart0 { status = "okay";};""")
if config[CONF_HARDWARE_UART] == USB_CDC:
add_zephyr_prj_conf_option("CONFIG_UART_LINE_CTRL", True)
zephyr_add_prj_conf("UART_LINE_CTRL", True)
zephyr_add_prj_conf("USB_DEVICE_STACK", True)
zephyr_add_prj_conf("USB_CDC_ACM", True)
zephyr_add_overlay(
"""
&zephyr_udc0 {
cdc_acm_uart0: cdc_acm_uart0 {
compatible = "zephyr,cdc-acm-uart";
};
};
"""
)
# Register at end for safe mode
await cg.register_component(log, config)

View File

@ -13,21 +13,25 @@ from esphome.const import (
)
from esphome.core import CORE, coroutine_with_priority
from esphome.helpers import (
write_file_if_changed,
copy_file_if_changed,
)
from typing import Union
from .zephyr import (
zephyr_copy_files,
zephyr_set_core_data,
zephyr_to_code,
)
from .const import (
ZEPHYR_VARIANT_GENERIC,
ZEPHYR_VARIANT_NRF_SDK,
)
# force import gpio to register pin schema
from .gpio import nrf52_pin_to_code # noqa
KEY_NRF52 = "nrf52"
def set_core_data(config):
CORE.data[KEY_NRF52] = {}
CORE.data[KEY_NRF52][KEY_PRJ_CONF_OPTIONS] = {}
CORE.data[KEY_NRF52][KEY_ZEPHYR_OVERLAY] = ""
zephyr_set_core_data(config)
CORE.data[KEY_CORE][KEY_TARGET_PLATFORM] = PLATFORM_NRF52
CORE.data[KEY_CORE][KEY_TARGET_FRAMEWORK] = config[CONF_FRAMEWORK][CONF_TYPE]
return config
@ -66,8 +70,6 @@ PLATFORM_FRAMEWORK_SCHEMA = cv.All(
_platform_check_versions,
)
ZEPHYR_VARIANT_GENERIC = "generic"
ZEPHYR_VARIANT_NRF_SDK = "nrf-sdk"
ZEPHYR_VARIANTS = [
ZEPHYR_VARIANT_GENERIC,
ZEPHYR_VARIANT_NRF_SDK,
@ -102,30 +104,6 @@ CONFIG_SCHEMA = cv.All(
nrf52_ns = cg.esphome_ns.namespace("nrf52")
PrjConfValueType = Union[bool, str, int]
KEY_PRJ_CONF_OPTIONS = "prj_conf_options"
KEY_ZEPHYR_OVERLAY = "zephyr_overlay"
def add_zephyr_prj_conf_option(name: str, value: PrjConfValueType):
"""Set an zephyr prj conf value."""
if not CORE.using_zephyr:
raise ValueError("Not an zephyr project")
if name in CORE.data[KEY_NRF52][KEY_PRJ_CONF_OPTIONS]:
old_value = CORE.data[KEY_NRF52][KEY_PRJ_CONF_OPTIONS][name]
if old_value != value:
raise ValueError(
f"{name} alread set with value {old_value}, new value {value}"
)
CORE.data[KEY_NRF52][KEY_PRJ_CONF_OPTIONS][name] = value
def add_zephyr_overlay(content):
if not CORE.using_zephyr:
raise ValueError("Not an zephyr project")
CORE.data[KEY_NRF52][KEY_ZEPHYR_OVERLAY] += content
@coroutine_with_priority(1000)
async def to_code(config):
cg.add(nrf52_ns.setup_preferences())
@ -152,6 +130,7 @@ async def to_code(config):
cg.add_platformio_option("board_upload.use_1200bps_touch", "true")
cg.add_platformio_option("board_upload.require_upload_port", "true")
cg.add_platformio_option("board_upload.wait_for_upload_port", "true")
#
cg.add_platformio_option("extra_scripts", [f"pre:build_{conf[CONF_TYPE]}.py"])
if CORE.using_arduino:
cg.add_build_flag("-DUSE_ARDUINO")
@ -166,71 +145,21 @@ async def to_code(config):
)
cg.add_library("https://github.com/NordicSemiconductor/nrfx#v2.1.0", None, None)
elif CORE.using_zephyr:
cg.add_build_flag("-DUSE_ZEPHYR")
if conf[CONF_VARIANT] == ZEPHYR_VARIANT_GENERIC:
cg.add_platformio_option(
"platform_packages",
[
"platformio/framework-zephyr@^2.30500.231204",
# "platformio/toolchain-gccarmnoneeabi@^1.120301.0"
],
)
elif conf[CONF_VARIANT] == ZEPHYR_VARIANT_NRF_SDK:
cg.add_platformio_option(
"platform_packages",
[
"platformio/framework-zephyr@https://github.com/tomaszduda23/framework-sdk-nrf",
"platformio/toolchain-gccarmnoneeabi@https://github.com/tomaszduda23/toolchain-sdk-ng",
],
)
else:
raise NotImplementedError
# c++ support
add_zephyr_prj_conf_option("CONFIG_NEWLIB_LIBC", False)
add_zephyr_prj_conf_option("CONFIG_NEWLIB_LIBC_NANO", True)
add_zephyr_prj_conf_option("CONFIG_NEWLIB_LIBC_FLOAT_PRINTF", True)
add_zephyr_prj_conf_option("CONFIG_CPLUSPLUS", True)
add_zephyr_prj_conf_option("CONFIG_LIB_CPLUSPLUS", True)
# watchdog
add_zephyr_prj_conf_option("CONFIG_WATCHDOG", True)
add_zephyr_prj_conf_option("CONFIG_WDT_DISABLE_AT_BOOT", False)
# TODO debug only
add_zephyr_prj_conf_option("CONFIG_DEBUG_THREAD_INFO", True)
zephyr_to_code(conf)
else:
raise NotImplementedError
# add_zephyr_prj_conf_option("CONFIG_USE_SEGGER_RTT", True)
# add_zephyr_prj_conf_option("CONFIG_RTT_CONSOLE", True)
# add_zephyr_prj_conf_option("CONFIG_UART_CONSOLE", False)
# zephyr_add_prj_conf("USE_SEGGER_RTT", True)
# zephyr_add_prj_conf("RTT_CONSOLE", True)
# zephyr_add_prj_conf("UART_CONSOLE", False)
def _format_prj_conf_val(value: PrjConfValueType) -> str:
if isinstance(value, bool):
return "y" if value else "n"
if isinstance(value, int):
return str(value)
if isinstance(value, str):
return f'"{value}"'
raise ValueError
# zephyr_add_prj_conf("LOG", True)
# zephyr_add_prj_conf("MCUBOOT_UTIL_LOG_LEVEL_WRN", True)
# Called by writer.py
def copy_files():
if CORE.using_zephyr:
want_opts = CORE.data[KEY_NRF52][KEY_PRJ_CONF_OPTIONS]
contents = (
"\n".join(
f"{name}={_format_prj_conf_val(value)}"
for name, value in sorted(want_opts.items())
)
+ "\n"
)
write_file_if_changed(CORE.relative_build_path("zephyr/prj.conf"), contents)
write_file_if_changed(
CORE.relative_build_path("zephyr/app.overlay"),
CORE.data[KEY_NRF52][KEY_ZEPHYR_OVERLAY],
)
zephyr_copy_files()
dir = os.path.dirname(__file__)
build_zephyr_file = os.path.join(

View File

@ -0,0 +1,5 @@
KEY_ZEPHYR = "zephyr"
KEY_PRJ_CONF = "prj_conf"
KEY_OVERLAY = "overlay"
ZEPHYR_VARIANT_GENERIC = "generic"
ZEPHYR_VARIANT_NRF_SDK = "nrf-sdk"

View File

@ -0,0 +1,106 @@
import esphome.codegen as cg
from typing import Union
from esphome.core import CORE
from esphome.helpers import (
write_file_if_changed,
)
from .const import (
ZEPHYR_VARIANT_GENERIC,
KEY_ZEPHYR,
KEY_PRJ_CONF,
KEY_OVERLAY,
ZEPHYR_VARIANT_NRF_SDK,
)
from esphome.const import (
CONF_VARIANT,
)
def zephyr_set_core_data(config):
CORE.data[KEY_ZEPHYR] = {}
CORE.data[KEY_ZEPHYR][KEY_PRJ_CONF] = {}
CORE.data[KEY_ZEPHYR][KEY_OVERLAY] = ""
return config
PrjConfValueType = Union[bool, str, int]
def zephyr_add_prj_conf(name: str, value: PrjConfValueType):
"""Set an zephyr prj conf value."""
if not CORE.using_zephyr:
raise ValueError("Not an zephyr project")
if not name.startswith("CONFIG_"):
name = "CONFIG_" + name
if name in CORE.data[KEY_ZEPHYR][KEY_PRJ_CONF]:
old_value = CORE.data[KEY_ZEPHYR][KEY_PRJ_CONF][name]
if old_value != value:
raise ValueError(
f"{name} alread set with value {old_value}, new value {value}"
)
CORE.data[KEY_ZEPHYR][KEY_PRJ_CONF][name] = value
def zephyr_add_overlay(content):
if not CORE.using_zephyr:
raise ValueError("Not an zephyr project")
CORE.data[KEY_ZEPHYR][KEY_OVERLAY] += content
def zephyr_to_code(conf):
cg.add_build_flag("-DUSE_ZEPHYR")
if conf[CONF_VARIANT] == ZEPHYR_VARIANT_GENERIC:
cg.add_platformio_option(
"platform_packages",
[
"platformio/framework-zephyr@^2.30500.231204",
],
)
elif conf[CONF_VARIANT] == ZEPHYR_VARIANT_NRF_SDK:
cg.add_platformio_option(
"platform_packages",
[
"platformio/framework-zephyr@https://github.com/tomaszduda23/framework-sdk-nrf",
"platformio/toolchain-gccarmnoneeabi@https://github.com/tomaszduda23/toolchain-sdk-ng",
],
)
else:
raise NotImplementedError
# c++ support
zephyr_add_prj_conf("NEWLIB_LIBC", False)
zephyr_add_prj_conf("NEWLIB_LIBC_NANO", True)
zephyr_add_prj_conf("NEWLIB_LIBC_FLOAT_PRINTF", True)
zephyr_add_prj_conf("CPLUSPLUS", True)
zephyr_add_prj_conf("LIB_CPLUSPLUS", True)
# watchdog
zephyr_add_prj_conf("WATCHDOG", True)
zephyr_add_prj_conf("WDT_DISABLE_AT_BOOT", False)
# TODO debug only
# zephyr_add_prj_conf("DEBUG_THREAD_INFO", True)
def _format_prj_conf_val(value: PrjConfValueType) -> str:
if isinstance(value, bool):
return "y" if value else "n"
if isinstance(value, int):
return str(value)
if isinstance(value, str):
return f'"{value}"'
raise ValueError
def zephyr_copy_files():
want_opts = CORE.data[KEY_ZEPHYR][KEY_PRJ_CONF]
contents = (
"\n".join(
f"{name}={_format_prj_conf_val(value)}"
for name, value in sorted(want_opts.items())
)
+ "\n"
)
write_file_if_changed(CORE.relative_build_path("zephyr/prj.conf"), contents)
write_file_if_changed(
CORE.relative_build_path("zephyr/app.overlay"),
CORE.data[KEY_ZEPHYR][KEY_OVERLAY],
)

View File

@ -555,6 +555,7 @@ CONF_OSCILLATION_COMMAND_TOPIC = "oscillation_command_topic"
CONF_OSCILLATION_OUTPUT = "oscillation_output"
CONF_OSCILLATION_STATE_TOPIC = "oscillation_state_topic"
CONF_OTA = "ota"
CONF_FOTA = "fota"
CONF_OUTPUT = "output"
CONF_OUTPUT_ID = "output_id"
CONF_OUTPUTS = "outputs"

130
esphome/zephyr_tools.py Normal file
View File

@ -0,0 +1,130 @@
import asyncio
import logging
import re
from typing import Final
from rich.pretty import pprint
from bleak import BleakScanner
from bleak.exc import BleakDeviceNotFoundError
from smpclient.transport.ble import SMPBLETransport
from smpclient.transport.serial import SMPSerialTransport
from smpclient import SMPClient
from smpclient.mcuboot import IMAGE_TLV, ImageInfo, TLVNotFound
from smpclient.requests.image_management import ImageStatesRead, ImageStatesWrite
from smpclient.requests.os_management import ResetWrite
from pyocd.tools.lists import ListGenerator
from smpclient.generics import error, success
from esphome.espota2 import ProgressBar
SMP_SERVICE_UUID = "8D53DC1D-1DB7-4CD3-868B-8A527460AA84"
MAC_ADDRESS_PATTERN: Final = re.compile(
r"([0-9A-F]{2}[:]){5}[0-9A-F]{2}$", flags=re.IGNORECASE
)
_LOGGER = logging.getLogger(__name__)
async def smpmgr_scan():
_LOGGER.info("Scanning bluetooth...")
devices = await BleakScanner.discover(service_uuids=[SMP_SERVICE_UUID])
return devices
def get_image_tlv_sha256(file):
_LOGGER.info(f"Checking image: {str(file)}")
try:
image_info = ImageInfo.load_file(str(file))
pprint(image_info.header)
_LOGGER.debug(str(image_info))
except Exception as e:
_LOGGER.error(f"Inspection of FW image failed: {e}")
return None
try:
image_tlv_sha256 = image_info.get_tlv(IMAGE_TLV.SHA256)
_LOGGER.debug(f"IMAGE_TLV_SHA256: {image_tlv_sha256}")
except TLVNotFound:
_LOGGER.error("Could not find IMAGE_TLV_SHA256 in image.")
return None
return image_tlv_sha256.value
async def smpmgr_upload(config, host, firmware):
image_tlv_sha256 = get_image_tlv_sha256(firmware)
if image_tlv_sha256 is None:
return 1
if MAC_ADDRESS_PATTERN.match(host):
smp_client = SMPClient(SMPBLETransport(), host)
else:
smp_client = SMPClient(SMPSerialTransport(mtu=256), host)
_LOGGER.info(f"Connecting {host}...")
try:
await smp_client.connect()
except BleakDeviceNotFoundError:
_LOGGER.error(f"Device {host} not found")
return 1
_LOGGER.info(f"Connected {host}...")
image_state = await asyncio.wait_for(
smp_client.request(ImageStatesRead()), timeout=SMPClient.MEDIUM_TIMEOUT
)
already_uploaded = False
if error(image_state):
_LOGGER.error(image_state)
return 1
elif success(image_state):
if len(image_state.images) == 0:
_LOGGER.warning("No images on device!")
for image in image_state.images:
pprint(image)
if image.hash == image_tlv_sha256:
if already_uploaded:
_LOGGER.error("Both slots have the same image")
return 1
else:
if image.confirmed:
_LOGGER.error("Image already confirmted")
return 1
_LOGGER.warning("The same image already uploaded")
already_uploaded = True
if not already_uploaded:
with open(firmware, "rb") as file:
image = file.read()
file.close()
upload_size = len(image)
progress = ProgressBar()
progress.update(0)
async for offset in smp_client.upload(image):
progress.update(offset / upload_size)
progress.done()
_LOGGER.info("Mark image for testing")
r = await asyncio.wait_for(
smp_client.request(ImageStatesWrite(hash=image_tlv_sha256)),
timeout=SMPClient.SHORT_TIMEOUT,
)
if error(r):
_LOGGER.error(r)
return 1
_LOGGER.info("Reset")
r = await asyncio.wait_for(
smp_client.request(ResetWrite()), timeout=SMPClient.SHORT_TIMEOUT
)
if error(r):
_LOGGER.error(r)
return 1
return 0
def list_pyocd():
return ListGenerator.list_probes()["boards"]

View File

@ -25,3 +25,15 @@ pyparsing >= 3.0
# For autocompletion
argcomplete>=2.0.0
# for mcumgr
git+https://github.com/tomaszduda23/smp/#f9b85266485062f6a466989e8f59b913cc83b08b
git+https://github.com/tomaszduda23/smpclient/#d25c8035ae2858fd41a106058297b619d58fbcb5
# move to framework?
git+https://github.com/tomaszduda23/pyOCD/#949193f7cbf09081f8e46d6b9d2e4a79e536997e
bleak==0.21.1
pydantic==2.16.2
cbor2==5.6.1
crcmod==1.7
# pretty print by logging?
rich==13.7.0