Strażak

Typ_projektu
Arduino
Zdjecie główne
Krótki opis projektu

Projekt przedstawia autonomicznego robota wykrywającego ogień za pomocą kamery ESP32-CAM i analizy obrazu w HSV. Robot samodzielnie skanuje otoczenie, lokalizuje płomień, precyzyjnie się ustawia, podchodzi do źródła ognia i kończy misję. System działa na podstawie przetwarzania strumienia wideo w OpenCV, wykrywa kontury płomienia, stabilizuje detekcję przez analizę wielu klatek i wysyła komendy ruchu przez Wi-Fi.

Niezbędne elementy
  • ESP32-CAM – kamera z Wi-Fi 

  • Robot mobilny / pojazd z napędem – np. platforma na kołach z silnikami DC lub serwami

  • Moduł sterujący silnikami

  • Zasilanie 

  • Komputer (PC/laptop)

 

Opis projektu

Opis ogólny projektu

Celem projektu było stworzenie autonomicznego robota, który samodzielnie lokalizuje ogień za pomocą kamery i przetwarza obraz w czasie rzeczywistym. System działa na podstawie obrazu przesyłanego przez moduł ESP32-CAM, analizowanego w Pythonie przy użyciu biblioteki OpenCV. Robot wykonuje szereg zautomatyzowanych działań: skanuje przestrzeń, lokalizuje płomień, precyzyjnie się ustawia, podjeżdża w jego kierunku i kończy misję. Całość stanowi przykład zintegrowanego zastosowania wizji komputerowej, mikrokontrolera z kamerą, sieci Wi-Fi oraz algorytmicznego sterowania.

Główna funkcja programu – schemat działania

Program realizuje autonomiczny algorytm wykrywania płomienia i reagowania na niego w następujących krokach:

  1. Skanowanie obszaru – robot obraca się w prawo, wykonując pauzy na analizę obrazu.

  2. Lokalizacja ognia – obraz z kamery jest przetwarzany w czasie rzeczywistym w celu wykrycia płomienia.

  3. Precyzyjne ustawienie się – po wykryciu ognia robot wykonuje mikro-ruchy w lewo lub prawo, aby wyśrodkować płomień.

  4. Podejście do ognia – po ustawieniu się robot jedzie do przodu przez zdefiniowany czas.

  5. Zakończenie misji – po zbliżeniu się do celu robot się zatrzymuje i informuje o zakończeniu zadania.

Jak to działa – krok po kroku

  1. Odbieranie obrazu z kamery ESP32-CAM
    Program odbiera strumień wideo (MJPEG) z adresu IP kamery:

    self.cap = cv2.VideoCapture(ESP32_STREAM_URL)

  2. Wstępna analiza obrazu
    Przetwarzanie każdej klatki obejmuje:

    • Rozmycie obrazu (redukcja szumów):
      blur = cv2.GaussianBlur(frame, (21, 21), 0)

    • Konwersję do przestrzeni HSV:
      hsv = cv2.cvtColor(blur, cv2.COLOR_BGR2HSV)

    • Binaryzację – maskowanie kolorów ognia w ustalonym zakresie HSV:
      mask = cv2.inRange(hsv, LOWER, UPPER)

  3. Wykrywanie konturów płomienia
    Na masce HSV wykrywane są kontury:

    contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

    Kontury o powierzchni mniejszej niż MIN_AREA są ignorowane. Ogień musi być wystarczająco duży, by został uznany za istotny.

    Zdjęcie1

     

  4. Śledzenie aktywnych regionów ognia przez wiele klatek
    Aby uniknąć fałszywych detekcji, system śledzi, czy wykryty region występuje przez kilka kolejnych klatek. Dopiero wtedy uznaje, że ogień rzeczywiście istnieje w scenie.

  5. Podejmowanie decyzji – trzy tryby działania robota
    Program dynamicznie przełącza się między trybami:

    • Skanowanie – robot obraca się w prawo z przerwami na analizę obrazu.

    • Precyzyjne ustawianie – robot wykonuje mikro-obrót w lewo/prawo, aż ogień znajdzie się w centralnej strefie obrazu.

    • Podejście – robot porusza się do przodu przez określony czas (APPROACH_DURATION), po czym się zatrzymuje.

  6. Wysyłanie komend do robota przez Wi-Fi
    Polecenia sterujące ruchem (np. "forward", "left", "stop") są wysyłane jako żądania HTTP do ESP32, który je interpretuje i steruje silnikami.

  7. Wizualizacja i informacja zwrotna
    Na ekranie komputera wyświetlany jest:

    • Strumień wideo z kamery,

    • Maska HSV z obszarami ognia,

    • Prostokąty z podpisami: „Checking…”, „Fire detected”, „FIRE!”.

  8. Powiadomienie o sukcesie
    Po skutecznym wykryciu i podejściu do ognia uruchamiany jest dźwiękowy sygnał zakończenia misji.

Zastosowane algorytmy i techniki

  • HSV – wykrywanie ognia na podstawie koloru
    Przestrzeń HSV lepiej niż RGB oddziela cechy kolorystyczne ognia. Zakres barw (odcienie pomarańczowo-czerwone) umożliwia łatwe binaryzowanie.

  • Morfologia obrazu
    Operacje morfologiczne (zamykanie) usuwają szumy, a kontury ognia są łączone w większe spójne obszary.

  • Algorytm śledzenia ognia między klatkami
    System przechowuje informacje o wykrytych obszarach przez wiele klatek. Jeśli ogień znika tylko na moment, detekcja nie jest przerywana.

  • Decyzje sterujące na podstawie pozycji ognia
    Pozycja środka wykrytego ognia w kadrze (lewa/prawa/centrum) służy do precyzyjnego ustawienia się względem celu.

  • Stan wewnętrzny programu
    System zachowuje dane o czasie ostatniego wykrycia, aktualnym trybie i kierunku ruchu, by podejmować właściwe decyzje na podstawie historii.

Trudności i wyzwania techniczne

Projekt wymagał dopracowania wielu parametrów, zwłaszcza w zakresie:

  • Doboru zakresów HSV dla płomienia – zależnie od oświetlenia i koloru otoczenia, ogień może być różnie interpretowany. Wymagało to testowania wielu kombinacji wartości LOWER i UPPER.

  • Precyzyjnego sterowania ruchem robota – mikro-ruchy oraz czas trwania poszczególnych komend musiały być dostrajane eksperymentalnie, by robot mógł dokładnie się ustawić.

  • Opóźnień wynikających z transmisji wideo i poleceń – ze względu na opóźnienia w przesyłaniu obrazu z ESP32-CAM (średnio kilkaset ms) oraz w przetwarzaniu, trudne było osiągnięcie całkowicie płynnej reakcji robota. Wymusiło to kompromisy w czasie podejmowania decyzji.

Zdjęcia
kod programu

import cv2
import numpy as np
import time
import requests
import winsound  # For Windows notification sound

# ======== Optimized Configuration ========
ESP32_STREAM_URL = "http://192.168.4.1:81/stream"
COMMAND_URL = "http://192.168.4.1/action?go="
MIN_AREA = 100
CONFIRMATION_AREA = 150
PERSISTENCE_FRAMES = 2
DETECTION_FPS = 12
SIZE_BUFFER = 10
CENTER_BAND_MIN = 0.35
CENTER_BAND_MAX = 0.75
SCAN_TURN_DURATION = 0.03
SCAN_WAIT_DURATION = 1.2
PRECISION_TURN_DURATION = 0.001
APPROACH_DURATION = 0.5  # 1 second forward movement after centering

# HSV color range adjustments
LOWER = np.array([2, 20, 250], dtype="uint8")
UPPER = np.array([20, 230, 255], dtype="uint8")

class EnhancedFireDetector:
   def __init__(self):
       self.cap = cv2.VideoCapture(ESP32_STREAM_URL)
       self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
       self.active_regions = {}
       self.frame_count = 0
       self.kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5,5))
       
       # State management
       self.scanning = True
       self.precision_mode = False
       self.approach_mode = False
       self.scan_state = "TURNING"
       self.last_state_change = time.time()
       self.fire_center_x = None
       self.fire_area = None
       self.frame_width = 0
       self.precision_direction = None
       self.last_command_time = 0
       self.command_delay = 0.03
       self.last_detection_time = 0
       self.last_turn_start = 0
       self.approach_start = 0
       self.mission_complete = False
       self.notification_sent = False

   def send_command(self, command):
       """Send movement command to ESP32 with rate limiting"""
       current_time = time.time()
       if current_time - self.last_command_time < self.command_delay and command != "stop":
           return
           
       try:
           response = requests.get(COMMAND_URL + command, timeout=0.1)
           if response.status_code == 200:
               print(f"Command sent: {command}")
           else:
               print(f"Command failed: {response.status_code}")
           self.last_command_time = current_time
       except Exception as e:
           print(f"Command error: {e}")

   def process_frame(self, frame):
       self.frame_count += 1
       
       # Enhanced processing pipeline
       blur = cv2.GaussianBlur(frame, (21, 21), 0)
       hsv = cv2.cvtColor(blur, cv2.COLOR_BGR2HSV)
       mask = cv2.inRange(hsv, LOWER, UPPER)
       
       # Morphological improvements
       mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, self.kernel, iterations=2)
       
       contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
       current_regions = []
       
       for cnt in contours:
           area = cv2.contourArea(cnt)
           if area > MIN_AREA:
               x, y, w, h = cv2.boundingRect(cnt)
               # Add size buffer to regions
               x = max(0, x - SIZE_BUFFER//2)
               y = max(0, y - SIZE_BUFFER//2)
               w = min(frame.shape[1]-x, w + SIZE_BUFFER)
               h = min(frame.shape[0]-y, h + SIZE_BUFFER)
               current_regions.append((x, y, w, h))
               
       return mask, current_regions

   def update_regions(self, new_regions):
       # Reset counter for this frame
       current_ids = []
       
       # Update existing regions with position smoothing
       for region_id, region_data in list(self.active_regions.items()):
           x, y, w, h, counter = region_data
           best_match = None
           min_distance = float('inf')
           
           # Find closest matching new region
           for new in new_regions:
               nx, ny, nw, nh = new
               # Calculate center points
               cx = x + w/2
               cy = y + h/2
               ncx = nx + nw/2
               ncy = ny + nh/2
               distance = ((cx - ncx)**2 + (cy - ncy)**2)**0.5
               
               # Match if centers are close and size similar
               if distance < max(w, h)*0.5 and abs(w-nw) < w*0.5 and abs(h-nh) < h*0.5:
                   if distance < min_distance:
                       best_match = new
                       min_distance = distance
           
           if best_match:
               nx, ny, nw, nh = best_match
               # Smooth position transition (60% historical, 40% new)
               smoothed = (
                   int(x * 0.6 + nx * 0.4),
                   int(y * 0.6 + ny * 0.4),
                   int(w * 0.6 + nw * 0.4),
                   int(h * 0.6 + nh * 0.4),
                   counter + 1
               )
               self.active_regions[region_id] = smoothed
               current_ids.append(region_id)
           elif counter > 0:
               # Decrement counter instead of immediate removal
               self.active_regions[region_id] = (x, y, w, h, counter - 1)
               current_ids.append(region_id)
       
       # Add new regions
       for new in new_regions:
           nx, ny, nw, nh = new
           matched = False
           
           # Check if already tracked
           for region_id in self.active_regions:
               if region_id in current_ids:
                   continue
                   
           # Add new region with counter
           region_id = self.frame_count
           self.active_regions[region_id] = (nx, ny, nw, nh, 1)
           current_ids.append(region_id)
       
       # Remove stale regions
       for region_id in list(self.active_regions.keys()):
           if region_id not in current_ids:
               del self.active_regions[region_id]

   def detect_fire(self):
       """Detect largest fire region with persistence"""
       self.fire_center_x = None
       self.fire_area = None
       max_area = 0
       max_region = None
       
       for region_id, region_data in self.active_regions.items():
           x, y, w, h, counter = region_data
           area = w * h
           
           # Require persistence and minimum size
           if counter >= PERSISTENCE_FRAMES and area >= CONFIRMATION_AREA:
               if area > max_area:
                   max_area = area
                   center_x = x + w/2
                   max_region = (center_x, area)
       
       if max_region:
           self.fire_center_x, self.fire_area = max_region
           self.last_detection_time = time.time()
           return True
       return False

   def draw_stable_boxes(self, frame):
       for region_id, region_data in self.active_regions.items():
           x, y, w, h, counter = region_data
           x2 = x + w
           y2 = y + h
           
           # Boundary checks
           x = max(0, x)
           y = max(0, y)
           x2 = min(frame.shape[1], x2)
           y2 = min(frame.shape[0], y2)
           w = x2 - x
           h = y2 - y
           
           area = w * h

           # Size-based confirmation
           if counter >= PERSISTENCE_FRAMES and area >= CONFIRMATION_AREA:
               color = (0, 0, 255)  # Red - high confidence
               label = "FIRE!"
               thickness = 2
           elif counter >= PERSISTENCE_FRAMES/2:
               color = (0, 165, 255)  # Orange - confirmed
               label = "Fire detected"
               thickness = 2
           else:
               color = (0, 255, 255)  # Yellow - investigating
               label = f"Checking... {counter}/{PERSISTENCE_FRAMES}"
               thickness = 1

           # Draw enhanced boxes
           cv2.rectangle(frame, (x, y), (x2, y2), color, thickness)
           
           # Text background
           (tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 1)
           cv2.rectangle(frame, (x, y-25), (x+tw, y), color, -1)
           
           # Draw text
           cv2.putText(frame, label, (x, y-10),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255,255,255), 1)
           
           # Area information
           cv2.putText(frame, f"{area}px", (x, y2+15),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.4, color, 1)
       
       # Draw mission status
       if self.mission_complete:
           cv2.putText(frame, "MISSION COMPLETE: FIRE LOCATED", (50, 50), 
                       cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)

   def run_scanning_mode(self):
       """Handle scanning behavior with timed movements"""
       current_time = time.time()
       elapsed = current_time - self.last_state_change
       
       if self.scan_state == "TURNING":
           if elapsed >= SCAN_TURN_DURATION:
               self.send_command("stop")
               self.scan_state = "WAITING"
               self.last_state_change = current_time
               print("Scan: Stopped for detection")
           else:
               # Continue turning
               self.send_command("right")
               
       elif self.scan_state == "WAITING":
           if elapsed >= SCAN_WAIT_DURATION:
               self.send_command("right")
               self.scan_state = "TURNING"
               self.last_state_change = current_time
               print("Scan: Turning right")

   def run_precision_mode(self):
       """Center the detected fire with micro-movements"""
       if self.fire_center_x is None or self.frame_width == 0:
           self.precision_direction = None
           return False
           
       center_band_min = self.frame_width * CENTER_BAND_MIN
       center_band_max = self.frame_width * CENTER_BAND_MAX
       current_time = time.time()
       
       # 1. Check if fire is centered
       if center_band_min <= self.fire_center_x <= center_band_max:
           print("Fire centered! Starting approach.")
           self.send_command("stop")
           return True
           
       # 2. Determine direction if not set
       if self.precision_direction is None:
           self.precision_direction = "LEFT" if self.fire_center_x < center_band_min else "RIGHT"
           print(f"Precision: Turning {self.precision_direction}")
           self.last_turn_start = current_time
           self.send_command("left" if self.precision_direction == "LEFT" else "right")
           return False
       
       # 3. Check if we need to stop turning
       if current_time - self.last_turn_start >= PRECISION_TURN_DURATION:
           self.send_command("stop")
           # Reset direction to re-evaluate in next frame
           self.precision_direction = None
           print("Precision: Stopped after micro-turn")
           
       return False

   def run_approach_mode(self):
       """Move forward for 1 second then stop permanently"""
       current_time = time.time()
       
       # Start the approach timer
       if self.approach_start == 0:
           self.approach_start = current_time
           self.send_command("forward")
           print("Approach: Moving forward for 1 second")
           return False
       
       # Check if approach duration completed
       if current_time - self.approach_start >= APPROACH_DURATION:
           self.send_command("stop")
           self.mission_complete = True
           self.approach_mode = False
           print("Approach complete! Mission finished.")
           return True
       
       return False

   def send_notification(self):
       """Send Windows notification when mission is complete"""
       if not self.notification_sent:
           print("FIRE DETECTED AND LOCATED - MISSION COMPLETE")
           # Windows beep sound
           winsound.Beep(1000, 1000)  # Frequency 1000Hz, duration 1000ms
           self.notification_sent = True

   def run(self):
       try:
           while True:
               ret, frame = self.cap.read()
               if not ret:
                   print("Reconnecting...")
                   self.cap.release()
                   self.cap = cv2.VideoCapture(ESP32_STREAM_URL)
                   time.sleep(1)
                   continue

               # Store frame dimensions
               if frame is not None and frame.size > 0:
                   self.frame_width = frame.shape[1]
               
               # Skip processing if mission complete
               if not self.mission_complete:
                   # Process frame and detect fire
                   mask, regions = self.process_frame(frame)
                   self.update_regions(regions)
                   fire_detected = self.detect_fire()
                       
                   # Draw detection results
                   self.draw_stable_boxes(frame)
                   
                   # Display frames
                   cv2.imshow("Fire Detection", frame)
                   if mask is not None:
                       cv2.imshow("Analysis Mask", mask)
                   
                   # State machine logic
                   if self.approach_mode:
                       if self.run_approach_mode():
                           self.send_notification()
                   elif self.precision_mode:
                       centered = self.run_precision_mode()
                       if centered:
                           # Switch to approach mode
                           self.precision_mode = False
                           self.approach_mode = True
                       elif not fire_detected and (time.time() - self.last_detection_time > 0.5):
                           # Lost fire during precision mode
                           print("Fire lost! Returning to scanning.")
                           self.precision_mode = False
                           self.scanning = True
                           self.scan_state = "TURNING"
                           self.last_state_change = time.time()
                           self.send_command("right")
                   elif self.scanning:
                       self.run_scanning_mode()
                       # Check if fire detected to switch modes
                       if fire_detected:
                           print(f"Fire detected! Center: {self.fire_center_x:.1f}")
                           self.scanning = False
                           self.precision_mode = True
                           self.precision_direction = None  # Reset for new approach
                           self.send_command("stop")
               else:
                   # Mission complete - just display frame
                   self.draw_stable_boxes(frame)
                   cv2.imshow("Fire Detection", frame)
                   self.send_notification()
               
               # Manual controls
               key = cv2.waitKey(1) & 0xFF
               if key == 27:  # ESC
                   break
               elif key == ord('w'):
                   self.send_command("forward")
                   # Reset automation
                   self.scanning = False
                   self.precision_mode = False
                   self.approach_mode = False
               elif key == ord('s'):
                   self.send_command("backward")
                   self.scanning = False
                   self.precision_mode = False
                   self.approach_mode = False
               elif key == ord('a'):
                   self.send_command("left")
                   self.scanning = False
                   self.precision_mode = False
                   self.approach_mode = False
               elif key == ord('d'):
                   self.send_command("right")
                   self.scanning = False
                   self.precision_mode = False
                   self.approach_mode = False
               elif key == ord(' '):
                   self.send_command("stop")
                   # Reset automation
                   self.scanning = True
                   self.precision_mode = False
                   self.approach_mode = False
                   self.scan_state = "TURNING"
                   self.last_state_change = time.time()
               elif key == ord('r'):  # Resume scanning
                   self.scanning = True
                   self.precision_mode = False
                   self.approach_mode = False
                   self.mission_complete = False
                   self.notification_sent = False
                   self.scan_state = "TURNING"
                   self.last_state_change = time.time()
                   self.send_command("right")

       finally:
           self.send_command("stop")
           self.cap.release()
           cv2.destroyAllWindows()

if __name__ == "__main__":
   detector = EnhancedFireDetector()
   detector.run()

Youtube
Tagi
esp32 kamera python opencv robot