1
0
mirror of https://github.com/esphome/esphome.git synced 2025-09-14 09:12:19 +01:00

Add to_ntc_resistance|temperature sensor filter (esphome/feature-requests#2967) (#7898)

Co-authored-by: Clyde Stubbs <2366188+clydebarrow@users.noreply.github.com>
This commit is contained in:
Ralf Habacker
2025-05-01 09:53:12 +02:00
committed by GitHub
parent 087ff865a7
commit da9c755f67
5 changed files with 196 additions and 0 deletions

View File

@@ -1,3 +1,4 @@
import logging
import math
from esphome import automation
@@ -9,6 +10,7 @@ from esphome.const import (
CONF_ACCURACY_DECIMALS,
CONF_ALPHA,
CONF_BELOW,
CONF_CALIBRATION,
CONF_DEVICE_CLASS,
CONF_ENTITY_CATEGORY,
CONF_EXPIRE_AFTER,
@@ -30,6 +32,7 @@ from esphome.const import (
CONF_SEND_EVERY,
CONF_SEND_FIRST_AT,
CONF_STATE_CLASS,
CONF_TEMPERATURE,
CONF_TIMEOUT,
CONF_TO,
CONF_TRIGGER_ID,
@@ -153,6 +156,8 @@ DEVICE_CLASSES = [
DEVICE_CLASS_WIND_SPEED,
]
_LOGGER = logging.getLogger(__name__)
sensor_ns = cg.esphome_ns.namespace("sensor")
StateClasses = sensor_ns.enum("StateClass")
STATE_CLASSES = {
@@ -246,6 +251,8 @@ HeartbeatFilter = sensor_ns.class_("HeartbeatFilter", Filter, cg.Component)
DeltaFilter = sensor_ns.class_("DeltaFilter", Filter)
OrFilter = sensor_ns.class_("OrFilter", Filter)
CalibrateLinearFilter = sensor_ns.class_("CalibrateLinearFilter", Filter)
ToNTCResistanceFilter = sensor_ns.class_("ToNTCResistanceFilter", Filter)
ToNTCTemperatureFilter = sensor_ns.class_("ToNTCTemperatureFilter", Filter)
CalibratePolynomialFilter = sensor_ns.class_("CalibratePolynomialFilter", Filter)
SensorInRangeCondition = sensor_ns.class_("SensorInRangeCondition", Filter)
ClampFilter = sensor_ns.class_("ClampFilter", Filter)
@@ -852,6 +859,138 @@ async def sensor_in_range_to_code(config, condition_id, template_arg, args):
return var
def validate_ntc_calibration_parameter(value):
if isinstance(value, dict):
return cv.Schema(
{
cv.Required(CONF_TEMPERATURE): cv.temperature,
cv.Required(CONF_VALUE): cv.resistance,
}
)(value)
value = cv.string(value)
parts = value.split("->")
if len(parts) != 2:
raise cv.Invalid("Calibration parameter must be of form 3000 -> 23°C")
resistance = cv.resistance(parts[0].strip())
temperature = cv.temperature(parts[1].strip())
return validate_ntc_calibration_parameter(
{
CONF_TEMPERATURE: temperature,
CONF_VALUE: resistance,
}
)
CONF_A = "a"
CONF_B = "b"
CONF_C = "c"
ZERO_POINT = 273.15
def ntc_calc_steinhart_hart(value):
r1 = value[0][CONF_VALUE]
r2 = value[1][CONF_VALUE]
r3 = value[2][CONF_VALUE]
t1 = value[0][CONF_TEMPERATURE] + ZERO_POINT
t2 = value[1][CONF_TEMPERATURE] + ZERO_POINT
t3 = value[2][CONF_TEMPERATURE] + ZERO_POINT
l1 = math.log(r1)
l2 = math.log(r2)
l3 = math.log(r3)
y1 = 1 / t1
y2 = 1 / t2
y3 = 1 / t3
g2 = (y2 - y1) / (l2 - l1)
g3 = (y3 - y1) / (l3 - l1)
c = (g3 - g2) / (l3 - l2) * 1 / (l1 + l2 + l3)
b = g2 - c * (l1 * l1 + l1 * l2 + l2 * l2)
a = y1 - (b + l1 * l1 * c) * l1
return a, b, c
def ntc_get_abc(value):
a = value[CONF_A]
b = value[CONF_B]
c = value[CONF_C]
return a, b, c
def ntc_process_calibration(value):
if isinstance(value, dict):
value = cv.Schema(
{
cv.Required(CONF_A): cv.float_,
cv.Required(CONF_B): cv.float_,
cv.Required(CONF_C): cv.float_,
}
)(value)
a, b, c = ntc_get_abc(value)
elif isinstance(value, list):
if len(value) != 3:
raise cv.Invalid(
"SteinhartHart Calibration must consist of exactly three values"
)
value = cv.Schema([validate_ntc_calibration_parameter])(value)
a, b, c = ntc_calc_steinhart_hart(value)
else:
raise cv.Invalid(
f"Calibration parameter accepts either a list for steinhart-hart calibration, or mapping for b-constant calibration, not {type(value)}"
)
_LOGGER.info("Coefficient: a:%s, b:%s, c:%s", a, b, c)
return {
CONF_A: a,
CONF_B: b,
CONF_C: c,
}
@FILTER_REGISTRY.register(
"to_ntc_resistance",
ToNTCResistanceFilter,
cv.All(
cv.Schema(
{
cv.Required(CONF_CALIBRATION): ntc_process_calibration,
}
),
),
)
async def calibrate_ntc_resistance_filter_to_code(config, filter_id):
calib = config[CONF_CALIBRATION]
return cg.new_Pvariable(
filter_id,
calib[CONF_A],
calib[CONF_B],
calib[CONF_C],
)
@FILTER_REGISTRY.register(
"to_ntc_temperature",
ToNTCTemperatureFilter,
cv.All(
cv.Schema(
{
cv.Required(CONF_CALIBRATION): ntc_process_calibration,
}
),
),
)
async def calibrate_ntc_temperature_filter_to_code(config, filter_id):
calib = config[CONF_CALIBRATION]
return cg.new_Pvariable(
filter_id,
calib[CONF_A],
calib[CONF_B],
calib[CONF_C],
)
def _mean(xs):
return sum(xs) / len(xs)