mirror of
				https://github.com/esphome/esphome.git
				synced 2025-11-04 09:01:49 +00:00 
			
		
		
		
	Merge branch 'integration' into memory_api
This commit is contained in:
		@@ -82,7 +82,13 @@ void HOT Scheduler::set_timer_common_(Component *component, SchedulerItem::Type
 | 
				
			|||||||
  item->set_name(name_cstr, !is_static_string);
 | 
					  item->set_name(name_cstr, !is_static_string);
 | 
				
			||||||
  item->type = type;
 | 
					  item->type = type;
 | 
				
			||||||
  item->callback = std::move(func);
 | 
					  item->callback = std::move(func);
 | 
				
			||||||
 | 
					  // Initialize remove to false (though it should already be from constructor)
 | 
				
			||||||
 | 
					  // Not using mark_item_removed_ helper since we're setting to false, not true
 | 
				
			||||||
 | 
					#ifdef ESPHOME_THREAD_MULTI_ATOMICS
 | 
				
			||||||
 | 
					  item->remove.store(false, std::memory_order_relaxed);
 | 
				
			||||||
 | 
					#else
 | 
				
			||||||
  item->remove = false;
 | 
					  item->remove = false;
 | 
				
			||||||
 | 
					#endif
 | 
				
			||||||
  item->is_retry = is_retry;
 | 
					  item->is_retry = is_retry;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#ifndef ESPHOME_THREAD_SINGLE
 | 
					#ifndef ESPHOME_THREAD_SINGLE
 | 
				
			||||||
@@ -398,6 +404,31 @@ void HOT Scheduler::call(uint32_t now) {
 | 
				
			|||||||
        this->pop_raw_();
 | 
					        this->pop_raw_();
 | 
				
			||||||
        continue;
 | 
					        continue;
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					      // Check if item is marked for removal
 | 
				
			||||||
 | 
					      // This handles two cases:
 | 
				
			||||||
 | 
					      // 1. Item was marked for removal after cleanup_() but before we got here
 | 
				
			||||||
 | 
					      // 2. Item is marked for removal but wasn't at the front of the heap during cleanup_()
 | 
				
			||||||
 | 
					#ifdef ESPHOME_THREAD_MULTI_NO_ATOMICS
 | 
				
			||||||
 | 
					      // Multi-threaded platforms without atomics: must take lock to safely read remove flag
 | 
				
			||||||
 | 
					      {
 | 
				
			||||||
 | 
					        LockGuard guard{this->lock_};
 | 
				
			||||||
 | 
					        if (is_item_removed_(item.get())) {
 | 
				
			||||||
 | 
					          this->pop_raw_();
 | 
				
			||||||
 | 
					          this->to_remove_--;
 | 
				
			||||||
 | 
					          continue;
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					      }
 | 
				
			||||||
 | 
					#else
 | 
				
			||||||
 | 
					      // Single-threaded or multi-threaded with atomics: can check without lock
 | 
				
			||||||
 | 
					      if (is_item_removed_(item.get())) {
 | 
				
			||||||
 | 
					        LockGuard guard{this->lock_};
 | 
				
			||||||
 | 
					        this->pop_raw_();
 | 
				
			||||||
 | 
					        this->to_remove_--;
 | 
				
			||||||
 | 
					        continue;
 | 
				
			||||||
 | 
					      }
 | 
				
			||||||
 | 
					#endif
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#ifdef ESPHOME_DEBUG_SCHEDULER
 | 
					#ifdef ESPHOME_DEBUG_SCHEDULER
 | 
				
			||||||
      const char *item_name = item->get_name();
 | 
					      const char *item_name = item->get_name();
 | 
				
			||||||
      ESP_LOGV(TAG, "Running %s '%s/%s' with interval=%" PRIu32 " next_execution=%" PRIu64 " (now=%" PRIu64 ")",
 | 
					      ESP_LOGV(TAG, "Running %s '%s/%s' with interval=%" PRIu32 " next_execution=%" PRIu64 " (now=%" PRIu64 ")",
 | 
				
			||||||
@@ -518,7 +549,7 @@ bool HOT Scheduler::cancel_item_locked_(Component *component, const char *name_c
 | 
				
			|||||||
  if (type == SchedulerItem::TIMEOUT) {
 | 
					  if (type == SchedulerItem::TIMEOUT) {
 | 
				
			||||||
    for (auto &item : this->defer_queue_) {
 | 
					    for (auto &item : this->defer_queue_) {
 | 
				
			||||||
      if (this->matches_item_(item, component, name_cstr, type, match_retry)) {
 | 
					      if (this->matches_item_(item, component, name_cstr, type, match_retry)) {
 | 
				
			||||||
        item->remove = true;
 | 
					        this->mark_item_removed_(item.get());
 | 
				
			||||||
        total_cancelled++;
 | 
					        total_cancelled++;
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
@@ -528,7 +559,7 @@ bool HOT Scheduler::cancel_item_locked_(Component *component, const char *name_c
 | 
				
			|||||||
  // Cancel items in the main heap
 | 
					  // Cancel items in the main heap
 | 
				
			||||||
  for (auto &item : this->items_) {
 | 
					  for (auto &item : this->items_) {
 | 
				
			||||||
    if (this->matches_item_(item, component, name_cstr, type, match_retry)) {
 | 
					    if (this->matches_item_(item, component, name_cstr, type, match_retry)) {
 | 
				
			||||||
      item->remove = true;
 | 
					      this->mark_item_removed_(item.get());
 | 
				
			||||||
      total_cancelled++;
 | 
					      total_cancelled++;
 | 
				
			||||||
      this->to_remove_++;  // Track removals for heap items
 | 
					      this->to_remove_++;  // Track removals for heap items
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
@@ -537,7 +568,7 @@ bool HOT Scheduler::cancel_item_locked_(Component *component, const char *name_c
 | 
				
			|||||||
  // Cancel items in to_add_
 | 
					  // Cancel items in to_add_
 | 
				
			||||||
  for (auto &item : this->to_add_) {
 | 
					  for (auto &item : this->to_add_) {
 | 
				
			||||||
    if (this->matches_item_(item, component, name_cstr, type, match_retry)) {
 | 
					    if (this->matches_item_(item, component, name_cstr, type, match_retry)) {
 | 
				
			||||||
      item->remove = true;
 | 
					      this->mark_item_removed_(item.get());
 | 
				
			||||||
      total_cancelled++;
 | 
					      total_cancelled++;
 | 
				
			||||||
      // Don't track removals for to_add_ items
 | 
					      // Don't track removals for to_add_ items
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -97,22 +97,42 @@ class Scheduler {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    std::function<void()> callback;
 | 
					    std::function<void()> callback;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    // Bit-packed fields to minimize padding
 | 
					#ifdef ESPHOME_THREAD_MULTI_ATOMICS
 | 
				
			||||||
 | 
					    // Multi-threaded with atomics: use atomic for lock-free access
 | 
				
			||||||
 | 
					    // Place atomic<bool> separately since it can't be packed with bit fields
 | 
				
			||||||
 | 
					    std::atomic<bool> remove{false};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    // Bit-packed fields (3 bits used, 5 bits padding in 1 byte)
 | 
				
			||||||
 | 
					    enum Type : uint8_t { TIMEOUT, INTERVAL } type : 1;
 | 
				
			||||||
 | 
					    bool name_is_dynamic : 1;  // True if name was dynamically allocated (needs delete[])
 | 
				
			||||||
 | 
					    bool is_retry : 1;         // True if this is a retry timeout
 | 
				
			||||||
 | 
					                               // 5 bits padding
 | 
				
			||||||
 | 
					#else
 | 
				
			||||||
 | 
					    // Single-threaded or multi-threaded without atomics: can pack all fields together
 | 
				
			||||||
 | 
					    // Bit-packed fields (4 bits used, 4 bits padding in 1 byte)
 | 
				
			||||||
    enum Type : uint8_t { TIMEOUT, INTERVAL } type : 1;
 | 
					    enum Type : uint8_t { TIMEOUT, INTERVAL } type : 1;
 | 
				
			||||||
    bool remove : 1;
 | 
					    bool remove : 1;
 | 
				
			||||||
    bool name_is_dynamic : 1;  // True if name was dynamically allocated (needs delete[])
 | 
					    bool name_is_dynamic : 1;  // True if name was dynamically allocated (needs delete[])
 | 
				
			||||||
    bool is_retry : 1;         // True if this is a retry timeout
 | 
					    bool is_retry : 1;         // True if this is a retry timeout
 | 
				
			||||||
                               // 4 bits padding
 | 
					                               // 4 bits padding
 | 
				
			||||||
 | 
					#endif
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    // Constructor
 | 
					    // Constructor
 | 
				
			||||||
    SchedulerItem()
 | 
					    SchedulerItem()
 | 
				
			||||||
        : component(nullptr),
 | 
					        : component(nullptr),
 | 
				
			||||||
          interval(0),
 | 
					          interval(0),
 | 
				
			||||||
          next_execution_(0),
 | 
					          next_execution_(0),
 | 
				
			||||||
 | 
					#ifdef ESPHOME_THREAD_MULTI_ATOMICS
 | 
				
			||||||
 | 
					          // remove is initialized in the member declaration as std::atomic<bool>{false}
 | 
				
			||||||
 | 
					          type(TIMEOUT),
 | 
				
			||||||
 | 
					          name_is_dynamic(false),
 | 
				
			||||||
 | 
					          is_retry(false) {
 | 
				
			||||||
 | 
					#else
 | 
				
			||||||
          type(TIMEOUT),
 | 
					          type(TIMEOUT),
 | 
				
			||||||
          remove(false),
 | 
					          remove(false),
 | 
				
			||||||
          name_is_dynamic(false),
 | 
					          name_is_dynamic(false),
 | 
				
			||||||
          is_retry(false) {
 | 
					          is_retry(false) {
 | 
				
			||||||
 | 
					#endif
 | 
				
			||||||
      name_.static_name = nullptr;
 | 
					      name_.static_name = nullptr;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -219,6 +239,35 @@ class Scheduler {
 | 
				
			|||||||
    return item->remove || (item->component != nullptr && item->component->is_failed());
 | 
					    return item->remove || (item->component != nullptr && item->component->is_failed());
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  // Helper to check if item is marked for removal (platform-specific)
 | 
				
			||||||
 | 
					  // Returns true if item should be skipped, handles platform-specific synchronization
 | 
				
			||||||
 | 
					  // NOTE: For ESPHOME_THREAD_MULTI_NO_ATOMICS, caller must hold lock!
 | 
				
			||||||
 | 
					  bool is_item_removed_(SchedulerItem *item) const {
 | 
				
			||||||
 | 
					#ifdef ESPHOME_THREAD_MULTI_ATOMICS
 | 
				
			||||||
 | 
					    // Multi-threaded with atomics: use atomic load for lock-free access
 | 
				
			||||||
 | 
					    return item->remove.load(std::memory_order_acquire);
 | 
				
			||||||
 | 
					#else
 | 
				
			||||||
 | 
					    // Single-threaded (ESPHOME_THREAD_SINGLE) or
 | 
				
			||||||
 | 
					    // multi-threaded without atomics (ESPHOME_THREAD_MULTI_NO_ATOMICS): direct read
 | 
				
			||||||
 | 
					    // For ESPHOME_THREAD_MULTI_NO_ATOMICS, caller MUST hold lock!
 | 
				
			||||||
 | 
					    return item->remove;
 | 
				
			||||||
 | 
					#endif
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  // Helper to mark item for removal (platform-specific)
 | 
				
			||||||
 | 
					  // NOTE: For ESPHOME_THREAD_MULTI_NO_ATOMICS, caller must hold lock!
 | 
				
			||||||
 | 
					  void mark_item_removed_(SchedulerItem *item) {
 | 
				
			||||||
 | 
					#ifdef ESPHOME_THREAD_MULTI_ATOMICS
 | 
				
			||||||
 | 
					    // Multi-threaded with atomics: use atomic store
 | 
				
			||||||
 | 
					    item->remove.store(true, std::memory_order_release);
 | 
				
			||||||
 | 
					#else
 | 
				
			||||||
 | 
					    // Single-threaded (ESPHOME_THREAD_SINGLE) or
 | 
				
			||||||
 | 
					    // multi-threaded without atomics (ESPHOME_THREAD_MULTI_NO_ATOMICS): direct write
 | 
				
			||||||
 | 
					    // For ESPHOME_THREAD_MULTI_NO_ATOMICS, caller MUST hold lock!
 | 
				
			||||||
 | 
					    item->remove = true;
 | 
				
			||||||
 | 
					#endif
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  // Template helper to check if any item in a container matches our criteria
 | 
					  // Template helper to check if any item in a container matches our criteria
 | 
				
			||||||
  template<typename Container>
 | 
					  template<typename Container>
 | 
				
			||||||
  bool has_cancelled_timeout_in_container_(const Container &container, Component *component, const char *name_cstr,
 | 
					  bool has_cancelled_timeout_in_container_(const Container &container, Component *component, const char *name_cstr,
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										139
									
								
								tests/integration/fixtures/scheduler_removed_item_race.yaml
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										139
									
								
								tests/integration/fixtures/scheduler_removed_item_race.yaml
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,139 @@
 | 
				
			|||||||
 | 
					esphome:
 | 
				
			||||||
 | 
					  name: scheduler-removed-item-race
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					host:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					api:
 | 
				
			||||||
 | 
					  services:
 | 
				
			||||||
 | 
					    - service: run_test
 | 
				
			||||||
 | 
					      then:
 | 
				
			||||||
 | 
					        - script.execute: run_test_script
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					logger:
 | 
				
			||||||
 | 
					  level: DEBUG
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					globals:
 | 
				
			||||||
 | 
					  - id: test_passed
 | 
				
			||||||
 | 
					    type: bool
 | 
				
			||||||
 | 
					    initial_value: 'true'
 | 
				
			||||||
 | 
					  - id: removed_item_executed
 | 
				
			||||||
 | 
					    type: int
 | 
				
			||||||
 | 
					    initial_value: '0'
 | 
				
			||||||
 | 
					  - id: normal_item_executed
 | 
				
			||||||
 | 
					    type: int
 | 
				
			||||||
 | 
					    initial_value: '0'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					sensor:
 | 
				
			||||||
 | 
					  - platform: template
 | 
				
			||||||
 | 
					    id: test_sensor
 | 
				
			||||||
 | 
					    name: "Test Sensor"
 | 
				
			||||||
 | 
					    update_interval: never
 | 
				
			||||||
 | 
					    lambda: return 0.0;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					script:
 | 
				
			||||||
 | 
					  - id: run_test_script
 | 
				
			||||||
 | 
					    then:
 | 
				
			||||||
 | 
					      - logger.log: "=== Starting Removed Item Race Test ==="
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					      # This test creates a scenario where:
 | 
				
			||||||
 | 
					      # 1. First item in heap is NOT cancelled (cleanup stops immediately)
 | 
				
			||||||
 | 
					      # 2. Items behind it ARE cancelled (remain in heap after cleanup)
 | 
				
			||||||
 | 
					      # 3. All items execute at the same time, including cancelled ones
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					      - lambda: |-
 | 
				
			||||||
 | 
					          // The key to hitting the race:
 | 
				
			||||||
 | 
					          // 1. Add items in a specific order to control heap structure
 | 
				
			||||||
 | 
					          // 2. Cancel ONLY items that won't be at the front
 | 
				
			||||||
 | 
					          // 3. Ensure the first item stays non-cancelled so cleanup_() stops immediately
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					          // Schedule all items to execute at the SAME time (1ms from now)
 | 
				
			||||||
 | 
					          // Using 1ms instead of 0 to avoid defer queue on multi-core platforms
 | 
				
			||||||
 | 
					          // This ensures they'll all be ready together and go through the heap
 | 
				
			||||||
 | 
					          const uint32_t exec_time = 1;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					          // CRITICAL: Add a non-cancellable item FIRST
 | 
				
			||||||
 | 
					          // This will be at the front of the heap and block cleanup_()
 | 
				
			||||||
 | 
					          App.scheduler.set_timeout(id(test_sensor), "blocker", exec_time, []() {
 | 
				
			||||||
 | 
					            ESP_LOGD("test", "Blocker timeout executed (expected) - was at front of heap");
 | 
				
			||||||
 | 
					            id(normal_item_executed)++;
 | 
				
			||||||
 | 
					          });
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					          // Now add items that we WILL cancel
 | 
				
			||||||
 | 
					          // These will be behind the blocker in the heap
 | 
				
			||||||
 | 
					          App.scheduler.set_timeout(id(test_sensor), "cancel_1", exec_time, []() {
 | 
				
			||||||
 | 
					            ESP_LOGE("test", "RACE: Cancelled timeout 1 executed after being cancelled!");
 | 
				
			||||||
 | 
					            id(removed_item_executed)++;
 | 
				
			||||||
 | 
					            id(test_passed) = false;
 | 
				
			||||||
 | 
					          });
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					          App.scheduler.set_timeout(id(test_sensor), "cancel_2", exec_time, []() {
 | 
				
			||||||
 | 
					            ESP_LOGE("test", "RACE: Cancelled timeout 2 executed after being cancelled!");
 | 
				
			||||||
 | 
					            id(removed_item_executed)++;
 | 
				
			||||||
 | 
					            id(test_passed) = false;
 | 
				
			||||||
 | 
					          });
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					          App.scheduler.set_timeout(id(test_sensor), "cancel_3", exec_time, []() {
 | 
				
			||||||
 | 
					            ESP_LOGE("test", "RACE: Cancelled timeout 3 executed after being cancelled!");
 | 
				
			||||||
 | 
					            id(removed_item_executed)++;
 | 
				
			||||||
 | 
					            id(test_passed) = false;
 | 
				
			||||||
 | 
					          });
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					          // Add some more normal items
 | 
				
			||||||
 | 
					          App.scheduler.set_timeout(id(test_sensor), "normal_1", exec_time, []() {
 | 
				
			||||||
 | 
					            ESP_LOGD("test", "Normal timeout 1 executed (expected)");
 | 
				
			||||||
 | 
					            id(normal_item_executed)++;
 | 
				
			||||||
 | 
					          });
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					          App.scheduler.set_timeout(id(test_sensor), "normal_2", exec_time, []() {
 | 
				
			||||||
 | 
					            ESP_LOGD("test", "Normal timeout 2 executed (expected)");
 | 
				
			||||||
 | 
					            id(normal_item_executed)++;
 | 
				
			||||||
 | 
					          });
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					          App.scheduler.set_timeout(id(test_sensor), "normal_3", exec_time, []() {
 | 
				
			||||||
 | 
					            ESP_LOGD("test", "Normal timeout 3 executed (expected)");
 | 
				
			||||||
 | 
					            id(normal_item_executed)++;
 | 
				
			||||||
 | 
					          });
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					          // Force items into the heap before cancelling
 | 
				
			||||||
 | 
					          App.scheduler.process_to_add();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					          // NOW cancel the items - they're behind "blocker" in the heap
 | 
				
			||||||
 | 
					          // When cleanup_() runs, it will see "blocker" (not removed) at the front
 | 
				
			||||||
 | 
					          // and stop immediately, leaving cancel_1, cancel_2, cancel_3 in the heap
 | 
				
			||||||
 | 
					          bool c1 = App.scheduler.cancel_timeout(id(test_sensor), "cancel_1");
 | 
				
			||||||
 | 
					          bool c2 = App.scheduler.cancel_timeout(id(test_sensor), "cancel_2");
 | 
				
			||||||
 | 
					          bool c3 = App.scheduler.cancel_timeout(id(test_sensor), "cancel_3");
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					          ESP_LOGD("test", "Cancelled items (behind blocker): %s, %s, %s",
 | 
				
			||||||
 | 
					                   c1 ? "true" : "false",
 | 
				
			||||||
 | 
					                   c2 ? "true" : "false",
 | 
				
			||||||
 | 
					                   c3 ? "true" : "false");
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					          // The heap now has:
 | 
				
			||||||
 | 
					          // - "blocker" at front (not cancelled)
 | 
				
			||||||
 | 
					          // - cancelled items behind it (marked remove=true but still in heap)
 | 
				
			||||||
 | 
					          // - When all execute at once, cleanup_() stops at "blocker"
 | 
				
			||||||
 | 
					          // - The loop then executes ALL ready items including cancelled ones
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					          ESP_LOGD("test", "Setup complete. Blocker at front prevents cleanup of cancelled items behind it");
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					      # Wait for all timeouts to execute (or not)
 | 
				
			||||||
 | 
					      - delay: 20ms
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					      # Check results
 | 
				
			||||||
 | 
					      - lambda: |-
 | 
				
			||||||
 | 
					          ESP_LOGI("test", "=== Test Results ===");
 | 
				
			||||||
 | 
					          ESP_LOGI("test", "Normal items executed: %d (expected 4)", id(normal_item_executed));
 | 
				
			||||||
 | 
					          ESP_LOGI("test", "Removed items executed: %d (expected 0)", id(removed_item_executed));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					          if (id(removed_item_executed) > 0) {
 | 
				
			||||||
 | 
					            ESP_LOGE("test", "TEST FAILED: %d cancelled items were executed!", id(removed_item_executed));
 | 
				
			||||||
 | 
					            id(test_passed) = false;
 | 
				
			||||||
 | 
					          } else if (id(normal_item_executed) != 4) {
 | 
				
			||||||
 | 
					            ESP_LOGE("test", "TEST FAILED: Expected 4 normal items, got %d", id(normal_item_executed));
 | 
				
			||||||
 | 
					            id(test_passed) = false;
 | 
				
			||||||
 | 
					          } else {
 | 
				
			||||||
 | 
					            ESP_LOGI("test", "TEST PASSED: No cancelled items were executed");
 | 
				
			||||||
 | 
					          }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					          ESP_LOGI("test", "=== Test Complete ===");
 | 
				
			||||||
							
								
								
									
										102
									
								
								tests/integration/test_scheduler_removed_item_race.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										102
									
								
								tests/integration/test_scheduler_removed_item_race.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,102 @@
 | 
				
			|||||||
 | 
					"""Test for scheduler race condition where removed items still execute."""
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import asyncio
 | 
				
			||||||
 | 
					import re
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import pytest
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					from .types import APIClientConnectedFactory, RunCompiledFunction
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					@pytest.mark.asyncio
 | 
				
			||||||
 | 
					async def test_scheduler_removed_item_race(
 | 
				
			||||||
 | 
					    yaml_config: str,
 | 
				
			||||||
 | 
					    run_compiled: RunCompiledFunction,
 | 
				
			||||||
 | 
					    api_client_connected: APIClientConnectedFactory,
 | 
				
			||||||
 | 
					) -> None:
 | 
				
			||||||
 | 
					    """Test that items marked for removal don't execute.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    This test verifies the fix for a race condition where:
 | 
				
			||||||
 | 
					    1. cleanup_() only removes items from the front of the heap
 | 
				
			||||||
 | 
					    2. Items in the middle of the heap marked for removal still execute
 | 
				
			||||||
 | 
					    3. This causes cancelled timeouts to run when they shouldn't
 | 
				
			||||||
 | 
					    """
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    loop = asyncio.get_running_loop()
 | 
				
			||||||
 | 
					    test_complete_future: asyncio.Future[bool] = loop.create_future()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # Track test results
 | 
				
			||||||
 | 
					    test_passed = False
 | 
				
			||||||
 | 
					    removed_executed = 0
 | 
				
			||||||
 | 
					    normal_executed = 0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # Patterns to match
 | 
				
			||||||
 | 
					    race_pattern = re.compile(r"RACE: .* executed after being cancelled!")
 | 
				
			||||||
 | 
					    passed_pattern = re.compile(r"TEST PASSED")
 | 
				
			||||||
 | 
					    failed_pattern = re.compile(r"TEST FAILED")
 | 
				
			||||||
 | 
					    complete_pattern = re.compile(r"=== Test Complete ===")
 | 
				
			||||||
 | 
					    normal_count_pattern = re.compile(r"Normal items executed: (\d+)")
 | 
				
			||||||
 | 
					    removed_count_pattern = re.compile(r"Removed items executed: (\d+)")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def check_output(line: str) -> None:
 | 
				
			||||||
 | 
					        """Check log output for test results."""
 | 
				
			||||||
 | 
					        nonlocal test_passed, removed_executed, normal_executed
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if race_pattern.search(line):
 | 
				
			||||||
 | 
					            # Race condition detected - a cancelled item executed
 | 
				
			||||||
 | 
					            test_passed = False
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if passed_pattern.search(line):
 | 
				
			||||||
 | 
					            test_passed = True
 | 
				
			||||||
 | 
					        elif failed_pattern.search(line):
 | 
				
			||||||
 | 
					            test_passed = False
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        normal_match = normal_count_pattern.search(line)
 | 
				
			||||||
 | 
					        if normal_match:
 | 
				
			||||||
 | 
					            normal_executed = int(normal_match.group(1))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        removed_match = removed_count_pattern.search(line)
 | 
				
			||||||
 | 
					        if removed_match:
 | 
				
			||||||
 | 
					            removed_executed = int(removed_match.group(1))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if not test_complete_future.done() and complete_pattern.search(line):
 | 
				
			||||||
 | 
					            test_complete_future.set_result(True)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    async with (
 | 
				
			||||||
 | 
					        run_compiled(yaml_config, line_callback=check_output),
 | 
				
			||||||
 | 
					        api_client_connected() as client,
 | 
				
			||||||
 | 
					    ):
 | 
				
			||||||
 | 
					        # Verify we can connect
 | 
				
			||||||
 | 
					        device_info = await client.device_info()
 | 
				
			||||||
 | 
					        assert device_info is not None
 | 
				
			||||||
 | 
					        assert device_info.name == "scheduler-removed-item-race"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # List services
 | 
				
			||||||
 | 
					        _, services = await asyncio.wait_for(
 | 
				
			||||||
 | 
					            client.list_entities_services(), timeout=5.0
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # Find run_test service
 | 
				
			||||||
 | 
					        run_test_service = next((s for s in services if s.name == "run_test"), None)
 | 
				
			||||||
 | 
					        assert run_test_service is not None, "run_test service not found"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # Execute the test
 | 
				
			||||||
 | 
					        client.execute_service(run_test_service, {})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # Wait for test completion
 | 
				
			||||||
 | 
					        try:
 | 
				
			||||||
 | 
					            await asyncio.wait_for(test_complete_future, timeout=5.0)
 | 
				
			||||||
 | 
					        except TimeoutError:
 | 
				
			||||||
 | 
					            pytest.fail("Test did not complete within timeout")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # Verify results
 | 
				
			||||||
 | 
					        assert test_passed, (
 | 
				
			||||||
 | 
					            f"Test failed! Removed items executed: {removed_executed}, "
 | 
				
			||||||
 | 
					            f"Normal items executed: {normal_executed}"
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					        assert removed_executed == 0, (
 | 
				
			||||||
 | 
					            f"Cancelled items should not execute, but {removed_executed} did"
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					        assert normal_executed == 4, (
 | 
				
			||||||
 | 
					            f"Expected 4 normal items to execute, got {normal_executed}"
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
		Reference in New Issue
	
	Block a user