1
0
mirror of https://github.com/esphome/esphome.git synced 2025-04-15 15:20:27 +01:00

Add K30 CO2 sensor

This commit is contained in:
Gonzalo Cervetti 2024-11-21 15:46:29 +01:00
parent 4d7c6b28e1
commit 13eb231aad
4 changed files with 284 additions and 0 deletions

View File

View File

@ -0,0 +1,156 @@
// Implementation based on:
// - k30: https://cdn.shopify.com/s/files/1/0019/5952/files/AN102-K30-Sensor-Arduino-I2C.zip?v=1653007039
// - Official Datasheet (cn):
// https://rmtplusstoragesenseair.blob.core.windows.net/docs/Dev/publicerat/TDE4700.pdf
//
#include "k30.h"
#include "esphome/core/log.h"
#include "esphome/core/hal.h"
namespace esphome {
namespace k30 {
static const char *const TAG = "K30";
// Command to measure CO2.
static const uint8_t K30_MEASURE_CMD[] = {0x22, 0x00, 0x08, 0x2A};
// Command to read meter control byte.
static const uint8_t K30_READ_METER_CMD[] = {0x41, 0x00, 0x3E, 0x7F};
void K30Component::setup() {
ESP_LOGCONFIG(TAG, "Setting up K30...");
// Set the automatic background calibration. This is done in EEPROM.
// Read meter control.
i2c::ErrorCode error = this->write(K30_READ_METER_CMD, sizeof(K30_READ_METER_CMD));
if (error != i2c::ERROR_OK) {
ESP_LOGD(TAG, "Attempt to read meter control byte failed");
this->mark_failed();
return;
}
// Wait for the sensor to process the command.
uint8_t data[4];
error = this->read(data, 3);
if (error != i2c::ERROR_OK) {
ESP_LOGE(TAG, "Error reading data from K30");
this->mark_failed();
return;
}
// Calculate the checksum. The checksum is stripped to 8 bits.
uint8_t checksum = this->calculate_checksum_(data, 2);
if (checksum != data[2]) {
ESP_LOGE(TAG, "Checksum error when reading meter control byte");
this->mark_failed();
return;
}
// Check if ABC is already enabled.
bool is_abc_enabled_ = data[1] & 0x02;
// Only update if values are different.
if (this->enable_abc_ != is_abc_enabled_) {
// Create the command to configure the ABC.
uint8_t configure_abc_command[] = {0x31, 0x00, 0x3E, 0x00, 0x00};
if (this->enable_abc_) {
// Mask the ABC bit to 1.
configure_abc_command[3] = data[2] | 0x02;
} else {
// Mask the ABC bit to 0.
configure_abc_command[3] = data[2] & 0xFD;
}
// Calculate the checksum for the new command.
checksum = this->calculate_checksum_(configure_abc_command, 4);
configure_abc_command[4] = checksum;
// Send command. Bear in mind that, in order to change the ABC configuration,
// the sensor must be restarted.
error = this->write(configure_abc_command, sizeof(configure_abc_command));
if (error != i2c::ERROR_OK) {
ESP_LOGE(TAG, "Error setting meter control byte");
this->mark_failed();
return;
}
}
ESP_LOGCONFIG(TAG, "K30 initialized");
}
void K30Component::update() {
if (!this->read_started_) {
this->start_time_ = millis();
this->read_started_ = true;
return;
}
uint32_t elapsed = (millis() - this->start_time_) / 1000;
// Time has not passed.
if (elapsed < this->update_interval_) {
return;
}
// Reset flag for the next reading.
this->read_started_ = false;
// send the command to start a measurement.
i2c::ErrorCode error = this->write(K30_MEASURE_CMD, sizeof(K30_MEASURE_CMD));
if (error != i2c::ERROR_OK) {
ESP_LOGE(TAG, "Error sending command to K30");
this->status_set_warning("Error sending command to K30");
}
// Wait for the sensor to process the command.
this->set_timeout(20, [this]() {
// Read the data from the sensor.
uint8_t data[4];
i2c::ErrorCode error = this->read(data, 4);
this->reading_status_ = IDLE;
if (error != i2c::ERROR_OK) {
ESP_LOGE(TAG, "Error reading data from K30");
this->status_set_warning("Error reading data from K30");
return;
}
// Check if the measuring process is finished.
if ((data[0] & 0x01) != 0x01) {
ESP_LOGE(TAG, "Measuring process not finished");
this->status_set_warning("Measuring process not finished");
return;
}
// Calculate the checksum. The checksum is stripped to 8 bits.
uint8_t checksum = this->calculate_checksum_(data, 3);
if (checksum != data[3]) {
ESP_LOGE(TAG, "Checksum error!");
this->status_set_warning("Checksum error!");
return;
}
// Calculate the CO2 value.
uint16_t temp_c_o2_u32 = (((uint16_t(data[1])) << 8) | (uint16_t(data[2])));
float co2 = static_cast<float>(temp_c_o2_u32);
// Publish result.
if (this->co2_sensor_ != nullptr) {
this->co2_sensor_->publish_state(co2);
}
this->status_clear_warning();
});
}
void K30Component::dump_config() {
ESP_LOGCONFIG(TAG, "K30:");
LOG_I2C_DEVICE(this);
ESP_LOGCONFIG(TAG, " Automatic self calibration: %s", ONOFF(this->enable_abc_));
ESP_LOGCONFIG(TAG, " Automatic self calibration interval: %dh", this->abc_update_interval_ / 3600);
ESP_LOGCONFIG(TAG, " Update interval: %ds", this->update_interval_);
LOG_SENSOR(" ", "CO2", this->co2_sensor_);
}
float K30Component::get_setup_priority() const { return setup_priority::DATA; }
uint8_t K30Component::calculate_checksum_(uint8_t *array, uint8_t len) {
uint32_t checksum = 0;
for (uint8_t i = 0; i < len; i++) {
checksum += array[i];
}
return checksum & 0xFF;
}
} // namespace k30
} // namespace esphome

View File

@ -0,0 +1,71 @@
#pragma once
#include <cstdint>
#include <utility>
#include "esphome/core/component.h"
#include "esphome/components/sensor/sensor.h"
#include "esphome/components/i2c/i2c.h"
namespace esphome {
namespace k30 {
enum ErrorCode {
COMMUNICATION_FAILED,
FIRMWARE_IDENTIFICATION_FAILED,
MEASUREMENT_INIT_FAILED,
FORCE_RECALIBRATION_FAILED,
UNKNOWN
};
enum ReadingStatus {
IDLE,
REQUEST_SEND,
};
class K30Component : public PollingComponent, public i2c::I2CDevice {
public:
// Setters called by python. This runs before setup().
void set_automatic_self_calibration(bool asc) { enable_abc_ = asc; }
void set_automatic_self_calibration_update_interval(uint32_t interval) { abc_update_interval_ = interval; }
void set_co2_sensor(sensor::Sensor *co2_sensor) { co2_sensor_ = co2_sensor; }
void set_update_interval(uint16_t interval) { update_interval_ = interval; }
// Called by the framework.
void setup() override;
void update() override;
void dump_config() override;
float get_setup_priority() const override;
protected:
/**
* @brief Calculate the checksum of an array given. This is used to verify the data
* received from the sensor.
*
* @param array pointer to array of bytes.
* @param array_length length of the array.
* @return uint8_t calculation of the checksum.
*/
uint8_t calculate_checksum_(uint8_t *array, uint8_t array_length);
ErrorCode error_code_{UNKNOWN};
// Status of the reading process. This is used to avoid blocking the update() loop.
ReadingStatus reading_status_{IDLE};
// Flag to enable automatic background calibration.
bool enable_abc_{true};
// Configurations for CO2 sensor.
sensor::Sensor *co2_sensor_{nullptr};
/// Update interval in seconds.
uint16_t update_interval_{0xFFFF};
uint32_t abc_update_interval_{0xFFFFFFFF};
uint32_t start_time_{};
bool read_started_{false};
unsigned read_count_;
void read_data_();
void restart_read_();
};
} // namespace k30
} // namespace esphome

View File

@ -0,0 +1,57 @@
from esphome import core
import esphome.codegen as cg
from esphome.components import i2c, sensor
import esphome.config_validation as cv
from esphome.const import (
CONF_CO2,
CONF_ID,
CONF_UPDATE_INTERVAL,
DEVICE_CLASS_CARBON_DIOXIDE,
ICON_MOLECULE_CO2,
STATE_CLASS_MEASUREMENT,
UNIT_PARTS_PER_MILLION,
)
DEPENDENCIES = ["i2c"]
k30_ns = cg.esphome_ns.namespace("k30")
K30Component = k30_ns.class_("K30Component", cg.PollingComponent, i2c.I2CDevice)
CONF_AUTOMATIC_SELF_CALIBRATION = "use_abc"
CONFIG_SCHEMA = (
cv.Schema(
{
cv.GenerateID(): cv.declare_id(K30Component),
cv.Optional(CONF_CO2): sensor.sensor_schema(
unit_of_measurement=UNIT_PARTS_PER_MILLION,
icon=ICON_MOLECULE_CO2,
accuracy_decimals=0,
device_class=DEVICE_CLASS_CARBON_DIOXIDE,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional(CONF_AUTOMATIC_SELF_CALIBRATION, default=True): cv.boolean,
cv.Optional(CONF_UPDATE_INTERVAL, default="60s"): cv.All(
cv.positive_time_period_seconds,
cv.Range(
min=core.TimePeriod(seconds=1), max=core.TimePeriod(seconds=1800)
),
),
}
)
.extend(cv.COMPONENT_SCHEMA)
.extend(i2c.i2c_device_schema(0x68))
)
async def to_code(config):
var = cg.new_Pvariable(config[CONF_ID])
await cg.register_component(var, config)
await i2c.register_i2c_device(var, config)
cg.add(var.set_automatic_self_calibration(config[CONF_AUTOMATIC_SELF_CALIBRATION]))
cg.add(var.set_update_interval(config[CONF_UPDATE_INTERVAL]))
if CONF_CO2 in config:
sens = await sensor.new_sensor(config[CONF_CO2])
cg.add(var.set_co2_sensor(sens))