import cv2 import numpy as np import time import requests import winsound # For Windows notification sound # ======== Optimized Configuration ======== ESP32_STREAM_URL = "http://192.168.4.1:81/stream" COMMAND_URL = "http://192.168.4.1/action?go=" MIN_AREA = 100 CONFIRMATION_AREA = 150 PERSISTENCE_FRAMES = 2 DETECTION_FPS = 12 SIZE_BUFFER = 10 CENTER_BAND_MIN = 0.35 CENTER_BAND_MAX = 0.75 SCAN_TURN_DURATION = 0.03 SCAN_WAIT_DURATION = 1.2 PRECISION_TURN_DURATION = 0.001 APPROACH_DURATION = 0.5 # 1 second forward movement after centering # HSV color range adjustments LOWER = np.array([2, 20, 250], dtype="uint8") UPPER = np.array([20, 230, 255], dtype="uint8") class EnhancedFireDetector: def __init__(self): self.cap = cv2.VideoCapture(ESP32_STREAM_URL) self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) self.active_regions = {} self.frame_count = 0 self.kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5,5)) # State management self.scanning = True self.precision_mode = False self.approach_mode = False self.scan_state = "TURNING" self.last_state_change = time.time() self.fire_center_x = None self.fire_area = None self.frame_width = 0 self.precision_direction = None self.last_command_time = 0 self.command_delay = 0.03 self.last_detection_time = 0 self.last_turn_start = 0 self.approach_start = 0 self.mission_complete = False self.notification_sent = False def send_command(self, command): """Send movement command to ESP32 with rate limiting""" current_time = time.time() if current_time - self.last_command_time < self.command_delay and command != "stop": return try: response = requests.get(COMMAND_URL + command, timeout=0.1) if response.status_code == 200: print(f"Command sent: {command}") else: print(f"Command failed: {response.status_code}") self.last_command_time = current_time except Exception as e: print(f"Command error: {e}") def process_frame(self, frame): self.frame_count += 1 # Enhanced processing pipeline blur = cv2.GaussianBlur(frame, (21, 21), 0) hsv = cv2.cvtColor(blur, cv2.COLOR_BGR2HSV) mask = cv2.inRange(hsv, LOWER, UPPER) # Morphological improvements mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, self.kernel, iterations=2) contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) current_regions = [] for cnt in contours: area = cv2.contourArea(cnt) if area > MIN_AREA: x, y, w, h = cv2.boundingRect(cnt) # Add size buffer to regions x = max(0, x - SIZE_BUFFER//2) y = max(0, y - SIZE_BUFFER//2) w = min(frame.shape[1]-x, w + SIZE_BUFFER) h = min(frame.shape[0]-y, h + SIZE_BUFFER) current_regions.append((x, y, w, h)) return mask, current_regions def update_regions(self, new_regions): # Reset counter for this frame current_ids = [] # Update existing regions with position smoothing for region_id, region_data in list(self.active_regions.items()): x, y, w, h, counter = region_data best_match = None min_distance = float('inf') # Find closest matching new region for new in new_regions: nx, ny, nw, nh = new # Calculate center points cx = x + w/2 cy = y + h/2 ncx = nx + nw/2 ncy = ny + nh/2 distance = ((cx - ncx)**2 + (cy - ncy)**2)**0.5 # Match if centers are close and size similar if distance < max(w, h)*0.5 and abs(w-nw) < w*0.5 and abs(h-nh) < h*0.5: if distance < min_distance: best_match = new min_distance = distance if best_match: nx, ny, nw, nh = best_match # Smooth position transition (60% historical, 40% new) smoothed = ( int(x * 0.6 + nx * 0.4), int(y * 0.6 + ny * 0.4), int(w * 0.6 + nw * 0.4), int(h * 0.6 + nh * 0.4), counter + 1 ) self.active_regions[region_id] = smoothed current_ids.append(region_id) elif counter > 0: # Decrement counter instead of immediate removal self.active_regions[region_id] = (x, y, w, h, counter - 1) current_ids.append(region_id) # Add new regions for new in new_regions: nx, ny, nw, nh = new matched = False # Check if already tracked for region_id in self.active_regions: if region_id in current_ids: continue # Add new region with counter region_id = self.frame_count self.active_regions[region_id] = (nx, ny, nw, nh, 1) current_ids.append(region_id) # Remove stale regions for region_id in list(self.active_regions.keys()): if region_id not in current_ids: del self.active_regions[region_id] def detect_fire(self): """Detect largest fire region with persistence""" self.fire_center_x = None self.fire_area = None max_area = 0 max_region = None for region_id, region_data in self.active_regions.items(): x, y, w, h, counter = region_data area = w * h # Require persistence and minimum size if counter >= PERSISTENCE_FRAMES and area >= CONFIRMATION_AREA: if area > max_area: max_area = area center_x = x + w/2 max_region = (center_x, area) if max_region: self.fire_center_x, self.fire_area = max_region self.last_detection_time = time.time() return True return False def draw_stable_boxes(self, frame): for region_id, region_data in self.active_regions.items(): x, y, w, h, counter = region_data x2 = x + w y2 = y + h # Boundary checks x = max(0, x) y = max(0, y) x2 = min(frame.shape[1], x2) y2 = min(frame.shape[0], y2) w = x2 - x h = y2 - y area = w * h # Size-based confirmation if counter >= PERSISTENCE_FRAMES and area >= CONFIRMATION_AREA: color = (0, 0, 255) # Red - high confidence label = "FIRE!" thickness = 2 elif counter >= PERSISTENCE_FRAMES/2: color = (0, 165, 255) # Orange - confirmed label = "Fire detected" thickness = 2 else: color = (0, 255, 255) # Yellow - investigating label = f"Checking... {counter}/{PERSISTENCE_FRAMES}" thickness = 1 # Draw enhanced boxes cv2.rectangle(frame, (x, y), (x2, y2), color, thickness) # Text background (tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 1) cv2.rectangle(frame, (x, y-25), (x+tw, y), color, -1) # Draw text cv2.putText(frame, label, (x, y-10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255,255,255), 1) # Area information cv2.putText(frame, f"{area}px", (x, y2+15), cv2.FONT_HERSHEY_SIMPLEX, 0.4, color, 1) # Draw mission status if self.mission_complete: cv2.putText(frame, "MISSION COMPLETE: FIRE LOCATED", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) def run_scanning_mode(self): """Handle scanning behavior with timed movements""" current_time = time.time() elapsed = current_time - self.last_state_change if self.scan_state == "TURNING": if elapsed >= SCAN_TURN_DURATION: self.send_command("stop") self.scan_state = "WAITING" self.last_state_change = current_time print("Scan: Stopped for detection") else: # Continue turning self.send_command("right") elif self.scan_state == "WAITING": if elapsed >= SCAN_WAIT_DURATION: self.send_command("right") self.scan_state = "TURNING" self.last_state_change = current_time print("Scan: Turning right") def run_precision_mode(self): """Center the detected fire with micro-movements""" if self.fire_center_x is None or self.frame_width == 0: self.precision_direction = None return False center_band_min = self.frame_width * CENTER_BAND_MIN center_band_max = self.frame_width * CENTER_BAND_MAX current_time = time.time() # 1. Check if fire is centered if center_band_min <= self.fire_center_x <= center_band_max: print("Fire centered! Starting approach.") self.send_command("stop") return True # 2. Determine direction if not set if self.precision_direction is None: self.precision_direction = "LEFT" if self.fire_center_x < center_band_min else "RIGHT" print(f"Precision: Turning {self.precision_direction}") self.last_turn_start = current_time self.send_command("left" if self.precision_direction == "LEFT" else "right") return False # 3. Check if we need to stop turning if current_time - self.last_turn_start >= PRECISION_TURN_DURATION: self.send_command("stop") # Reset direction to re-evaluate in next frame self.precision_direction = None print("Precision: Stopped after micro-turn") return False def run_approach_mode(self): """Move forward for 1 second then stop permanently""" current_time = time.time() # Start the approach timer if self.approach_start == 0: self.approach_start = current_time self.send_command("forward") print("Approach: Moving forward for 1 second") return False # Check if approach duration completed if current_time - self.approach_start >= APPROACH_DURATION: self.send_command("stop") self.mission_complete = True self.approach_mode = False print("Approach complete! Mission finished.") return True return False def send_notification(self): """Send Windows notification when mission is complete""" if not self.notification_sent: print("FIRE DETECTED AND LOCATED - MISSION COMPLETE") # Windows beep sound winsound.Beep(1000, 1000) # Frequency 1000Hz, duration 1000ms self.notification_sent = True def run(self): try: while True: ret, frame = self.cap.read() if not ret: print("Reconnecting...") self.cap.release() self.cap = cv2.VideoCapture(ESP32_STREAM_URL) time.sleep(1) continue # Store frame dimensions if frame is not None and frame.size > 0: self.frame_width = frame.shape[1] # Skip processing if mission complete if not self.mission_complete: # Process frame and detect fire mask, regions = self.process_frame(frame) self.update_regions(regions) fire_detected = self.detect_fire() # Draw detection results self.draw_stable_boxes(frame) # Display frames cv2.imshow("Fire Detection", frame) if mask is not None: cv2.imshow("Analysis Mask", mask) # State machine logic if self.approach_mode: if self.run_approach_mode(): self.send_notification() elif self.precision_mode: centered = self.run_precision_mode() if centered: # Switch to approach mode self.precision_mode = False self.approach_mode = True elif not fire_detected and (time.time() - self.last_detection_time > 0.5): # Lost fire during precision mode print("Fire lost! Returning to scanning.") self.precision_mode = False self.scanning = True self.scan_state = "TURNING" self.last_state_change = time.time() self.send_command("right") elif self.scanning: self.run_scanning_mode() # Check if fire detected to switch modes if fire_detected: print(f"Fire detected! Center: {self.fire_center_x:.1f}") self.scanning = False self.precision_mode = True self.precision_direction = None # Reset for new approach self.send_command("stop") else: # Mission complete - just display frame self.draw_stable_boxes(frame) cv2.imshow("Fire Detection", frame) self.send_notification() # Manual controls key = cv2.waitKey(1) & 0xFF if key == 27: # ESC break elif key == ord('w'): self.send_command("forward") # Reset automation self.scanning = False self.precision_mode = False self.approach_mode = False elif key == ord('s'): self.send_command("backward") self.scanning = False self.precision_mode = False self.approach_mode = False elif key == ord('a'): self.send_command("left") self.scanning = False self.precision_mode = False self.approach_mode = False elif key == ord('d'): self.send_command("right") self.scanning = False self.precision_mode = False self.approach_mode = False elif key == ord(' '): self.send_command("stop") # Reset automation self.scanning = True self.precision_mode = False self.approach_mode = False self.scan_state = "TURNING" self.last_state_change = time.time() elif key == ord('r'): # Resume scanning self.scanning = True self.precision_mode = False self.approach_mode = False self.mission_complete = False self.notification_sent = False self.scan_state = "TURNING" self.last_state_change = time.time() self.send_command("right") finally: self.send_command("stop") self.cap.release() cv2.destroyAllWindows() if __name__ == "__main__": detector = EnhancedFireDetector() detector.run()