1
0
mirror of https://github.com/esphome/esphome.git synced 2025-10-28 05:33:53 +00:00

Add readv and writev for more efficient API packets (#2342)

This commit is contained in:
Otto Winter
2021-09-20 00:33:10 +02:00
committed by Jesse Hills
parent d180aee57f
commit 7c17e72db4
7 changed files with 220 additions and 84 deletions

View File

@@ -702,15 +702,7 @@ bool APIConnection::send_log_message(int level, const char *tag, const char *lin
// string message = 3;
buffer.encode_string(3, line, strlen(line));
// SubscribeLogsResponse - 29
bool success = this->send_buffer(buffer, 29);
if (!success) {
buffer = this->create_buffer();
// bool send_failed = 4;
buffer.encode_bool(4, true);
return this->send_buffer(buffer, 29);
} else {
return true;
}
return this->send_buffer(buffer, 29);
}
HelloResponse APIConnection::hello(const HelloRequest &msg) {
@@ -783,8 +775,23 @@ void APIConnection::subscribe_home_assistant_states(const SubscribeHomeAssistant
bool APIConnection::send_buffer(ProtoWriteBuffer buffer, uint32_t message_type) {
if (this->remove_)
return false;
if (!this->helper_->can_write_without_blocking())
return false;
if (!this->helper_->can_write_without_blocking()) {
delay(0);
APIError err = helper_->loop();
if (err != APIError::OK) {
on_fatal_error();
ESP_LOGW(TAG, "%s: Socket operation failed: %s errno=%d", client_info_.c_str(), api_error_to_str(err), errno);
return false;
}
if (!this->helper_->can_write_without_blocking()) {
// SubscribeLogsResponse
if (message_type != 29) {
ESP_LOGV(TAG, "Cannot send message because of TCP buffer space");
}
delay(0);
return false;
}
}
APIError err = this->helper_->write_packet(message_type, buffer.get_buffer()->data(), buffer.get_buffer()->size());
if (err == APIError::WOULD_BLOCK)

View File

@@ -125,13 +125,6 @@ APIError APINoiseFrameHelper::init() {
HELPER_LOG("Setting nonblocking failed with errno %d", errno);
return APIError::TCP_NONBLOCKING_FAILED;
}
int enable = 1;
err = socket_->setsockopt(IPPROTO_TCP, TCP_NODELAY, &enable, sizeof(int));
if (err != 0) {
state_ = State::FAILED;
HELPER_LOG("Setting nodelay failed with errno %d", errno);
return APIError::TCP_NODELAY_FAILED;
}
// init prologue
prologue_.insert(prologue_.end(), PROLOGUE_INIT, PROLOGUE_INIT + strlen(PROLOGUE_INIT));
@@ -494,12 +487,13 @@ APIError APINoiseFrameHelper::write_packet(uint16_t type, const uint8_t *payload
size_t total_len = 3 + mbuf.size;
tmpbuf[1] = (uint8_t)(mbuf.size >> 8);
tmpbuf[2] = (uint8_t) mbuf.size;
struct iovec iov;
iov.iov_base = &tmpbuf[0];
iov.iov_len = total_len;
// write raw to not have two packets sent if NAGLE disabled
aerr = write_raw_(&tmpbuf[0], total_len);
if (aerr != APIError::OK) {
return aerr;
}
return APIError::OK;
return write_raw_(&iov, 1);
}
APIError APINoiseFrameHelper::try_send_tx_buf_() {
// try send from tx_buf
@@ -526,16 +520,19 @@ APIError APINoiseFrameHelper::try_send_tx_buf_() {
* @param data The data to write
* @param len The length of data
*/
APIError APINoiseFrameHelper::write_raw_(const uint8_t *data, size_t len) {
if (len == 0)
APIError APINoiseFrameHelper::write_raw_(const struct iovec *iov, int iovcnt) {
if (iovcnt == 0)
return APIError::OK;
int err;
APIError aerr;
// uncomment for even more debugging
size_t total_write_len = 0;
for (int i = 0; i < iovcnt; i++) {
#ifdef HELPER_LOG_PACKETS
ESP_LOGVV(TAG, "Sending raw: %s", hexencode(data, len).c_str());
ESP_LOGVV(TAG, "Sending raw: %s", hexencode(reinterpret_cast<uint8_t *>(iov[i].iov_base), iov[i].iov_len).c_str());
#endif
total_write_len += iov[i].iov_len;
}
if (!tx_buf_.empty()) {
// try to empty tx_buf_ first
@@ -546,41 +543,56 @@ APIError APINoiseFrameHelper::write_raw_(const uint8_t *data, size_t len) {
if (!tx_buf_.empty()) {
// tx buf not empty, can't write now because then stream would be inconsistent
tx_buf_.insert(tx_buf_.end(), data, data + len);
for (int i = 0; i < iovcnt; i++) {
tx_buf_.insert(tx_buf_.end(), reinterpret_cast<uint8_t *>(iov[i].iov_base),
reinterpret_cast<uint8_t *>(iov[i].iov_base) + iov[i].iov_len);
}
return APIError::OK;
}
ssize_t sent = socket_->write(data, len);
ssize_t sent = socket_->writev(iov, iovcnt);
if (is_would_block(sent)) {
// operation would block, add buffer to tx_buf
tx_buf_.insert(tx_buf_.end(), data, data + len);
for (int i = 0; i < iovcnt; i++) {
tx_buf_.insert(tx_buf_.end(), reinterpret_cast<uint8_t *>(iov[i].iov_base),
reinterpret_cast<uint8_t *>(iov[i].iov_base) + iov[i].iov_len);
}
return APIError::OK;
} else if (sent == -1) {
// an error occured
state_ = State::FAILED;
HELPER_LOG("Socket write failed with errno %d", errno);
return APIError::SOCKET_WRITE_FAILED;
} else if (sent != len) {
} else if (sent != total_write_len) {
// partially sent, add end to tx_buf
tx_buf_.insert(tx_buf_.end(), data + sent, data + len);
size_t to_consume = sent;
for (int i = 0; i < iovcnt; i++) {
if (to_consume >= iov[i].iov_len) {
to_consume -= iov[i].iov_len;
} else {
tx_buf_.insert(tx_buf_.end(), reinterpret_cast<uint8_t *>(iov[i].iov_base) + to_consume,
reinterpret_cast<uint8_t *>(iov[i].iov_base) + iov[i].iov_len);
to_consume = 0;
}
}
return APIError::OK;
}
// fully sent
return APIError::OK;
}
APIError APINoiseFrameHelper::write_frame_(const uint8_t *data, size_t len) {
APIError aerr;
uint8_t header[3];
header[0] = 0x01; // indicator
header[1] = (uint8_t)(len >> 8);
header[2] = (uint8_t) len;
aerr = write_raw_(header, 3);
if (aerr != APIError::OK)
return aerr;
aerr = write_raw_(data, len);
return aerr;
struct iovec iov[2];
iov[0].iov_base = header;
iov[0].iov_len = 3;
iov[1].iov_base = const_cast<uint8_t *>(data);
iov[1].iov_len = len;
return write_raw_(iov, 2);
}
/** Initiate the data structures for the handshake.
@@ -709,13 +721,6 @@ APIError APIPlaintextFrameHelper::init() {
HELPER_LOG("Setting nonblocking failed with errno %d", errno);
return APIError::TCP_NONBLOCKING_FAILED;
}
int enable = 1;
err = socket_->setsockopt(IPPROTO_TCP, TCP_NODELAY, &enable, sizeof(int));
if (err != 0) {
state_ = State::FAILED;
HELPER_LOG("Setting nodelay failed with errno %d", errno);
return APIError::TCP_NODELAY_FAILED;
}
state_ = State::DATA;
return APIError::OK;
@@ -863,15 +868,13 @@ APIError APIPlaintextFrameHelper::write_packet(uint16_t type, const uint8_t *pay
ProtoVarInt(payload_len).encode(header);
ProtoVarInt(type).encode(header);
aerr = write_raw_(&header[0], header.size());
if (aerr != APIError::OK) {
return aerr;
}
aerr = write_raw_(payload, payload_len);
if (aerr != APIError::OK) {
return aerr;
}
return APIError::OK;
struct iovec iov[2];
iov[0].iov_base = &header[0];
iov[0].iov_len = header.size();
iov[1].iov_base = const_cast<uint8_t *>(payload);
iov[1].iov_len = payload_len;
return write_raw_(iov, 2);
}
APIError APIPlaintextFrameHelper::try_send_tx_buf_() {
// try send from tx_buf
@@ -896,16 +899,19 @@ APIError APIPlaintextFrameHelper::try_send_tx_buf_() {
* @param data The data to write
* @param len The length of data
*/
APIError APIPlaintextFrameHelper::write_raw_(const uint8_t *data, size_t len) {
if (len == 0)
APIError APIPlaintextFrameHelper::write_raw_(const struct iovec *iov, int iovcnt) {
if (iovcnt == 0)
return APIError::OK;
int err;
APIError aerr;
// uncomment for even more debugging
size_t total_write_len = 0;
for (int i = 0; i < iovcnt; i++) {
#ifdef HELPER_LOG_PACKETS
ESP_LOGVV(TAG, "Sending raw: %s", hexencode(data, len).c_str());
ESP_LOGVV(TAG, "Sending raw: %s", hexencode(reinterpret_cast<uint8_t *>(iov[i].iov_base), iov[i].iov_len).c_str());
#endif
total_write_len += iov[i].iov_len;
}
if (!tx_buf_.empty()) {
// try to empty tx_buf_ first
@@ -916,23 +922,38 @@ APIError APIPlaintextFrameHelper::write_raw_(const uint8_t *data, size_t len) {
if (!tx_buf_.empty()) {
// tx buf not empty, can't write now because then stream would be inconsistent
tx_buf_.insert(tx_buf_.end(), data, data + len);
for (int i = 0; i < iovcnt; i++) {
tx_buf_.insert(tx_buf_.end(), reinterpret_cast<uint8_t *>(iov[i].iov_base),
reinterpret_cast<uint8_t *>(iov[i].iov_base) + iov[i].iov_len);
}
return APIError::OK;
}
ssize_t sent = socket_->write(data, len);
ssize_t sent = socket_->writev(iov, iovcnt);
if (is_would_block(sent)) {
// operation would block, add buffer to tx_buf
tx_buf_.insert(tx_buf_.end(), data, data + len);
for (int i = 0; i < iovcnt; i++) {
tx_buf_.insert(tx_buf_.end(), reinterpret_cast<uint8_t *>(iov[i].iov_base),
reinterpret_cast<uint8_t *>(iov[i].iov_base) + iov[i].iov_len);
}
return APIError::OK;
} else if (sent == -1) {
// an error occured
state_ = State::FAILED;
HELPER_LOG("Socket write failed with errno %d", errno);
return APIError::SOCKET_WRITE_FAILED;
} else if (sent != len) {
} else if (sent != total_write_len) {
// partially sent, add end to tx_buf
tx_buf_.insert(tx_buf_.end(), data + sent, data + len);
size_t to_consume = sent;
for (int i = 0; i < iovcnt; i++) {
if (to_consume >= iov[i].iov_len) {
to_consume -= iov[i].iov_len;
} else {
tx_buf_.insert(tx_buf_.end(), reinterpret_cast<uint8_t *>(iov[i].iov_base) + to_consume,
reinterpret_cast<uint8_t *>(iov[i].iov_base) + iov[i].iov_len);
to_consume = 0;
}
}
return APIError::OK;
}
// fully sent

View File

@@ -58,6 +58,7 @@ const char *api_error_to_str(APIError err);
class APIFrameHelper {
public:
virtual ~APIFrameHelper() = default;
virtual APIError init() = 0;
virtual APIError loop() = 0;
virtual APIError read_packet(ReadPacketBuffer *buffer) = 0;
@@ -96,7 +97,7 @@ class APINoiseFrameHelper : public APIFrameHelper {
APIError try_read_frame_(ParsedFrame *frame);
APIError try_send_tx_buf_();
APIError write_frame_(const uint8_t *data, size_t len);
APIError write_raw_(const uint8_t *data, size_t len);
APIError write_raw_(const struct iovec *iov, int iovcnt);
APIError init_handshake_();
APIError check_handshake_finished_();
void send_explicit_handshake_reject_(const std::string &reason);
@@ -154,7 +155,7 @@ class APIPlaintextFrameHelper : public APIFrameHelper {
APIError try_read_frame_(ParsedFrame *frame);
APIError try_send_tx_buf_();
APIError write_raw_(const uint8_t *data, size_t len);
APIError write_raw_(const struct iovec *iov, int iovcnt);
std::unique_ptr<socket::Socket> socket_;