
Projekt przedstawia autonomicznego robota wykrywającego ogień za pomocą kamery ESP32-CAM i analizy obrazu w HSV. Robot samodzielnie skanuje otoczenie, lokalizuje płomień, precyzyjnie się ustawia, podchodzi do źródła ognia i kończy misję. System działa na podstawie przetwarzania strumienia wideo w OpenCV, wykrywa kontury płomienia, stabilizuje detekcję przez analizę wielu klatek i wysyła komendy ruchu przez Wi-Fi.
-
ESP32-CAM – kamera z Wi-Fi
-
Robot mobilny / pojazd z napędem – np. platforma na kołach z silnikami DC lub serwami
-
Moduł sterujący silnikami
-
Zasilanie
-
Komputer (PC/laptop)
Opis ogólny projektu
Celem projektu było stworzenie autonomicznego robota, który samodzielnie lokalizuje ogień za pomocą kamery i przetwarza obraz w czasie rzeczywistym. System działa na podstawie obrazu przesyłanego przez moduł ESP32-CAM, analizowanego w Pythonie przy użyciu biblioteki OpenCV. Robot wykonuje szereg zautomatyzowanych działań: skanuje przestrzeń, lokalizuje płomień, precyzyjnie się ustawia, podjeżdża w jego kierunku i kończy misję. Całość stanowi przykład zintegrowanego zastosowania wizji komputerowej, mikrokontrolera z kamerą, sieci Wi-Fi oraz algorytmicznego sterowania.
Główna funkcja programu – schemat działania
Program realizuje autonomiczny algorytm wykrywania płomienia i reagowania na niego w następujących krokach:
-
Skanowanie obszaru – robot obraca się w prawo, wykonując pauzy na analizę obrazu.
-
Lokalizacja ognia – obraz z kamery jest przetwarzany w czasie rzeczywistym w celu wykrycia płomienia.
-
Precyzyjne ustawienie się – po wykryciu ognia robot wykonuje mikro-ruchy w lewo lub prawo, aby wyśrodkować płomień.
-
Podejście do ognia – po ustawieniu się robot jedzie do przodu przez zdefiniowany czas.
-
Zakończenie misji – po zbliżeniu się do celu robot się zatrzymuje i informuje o zakończeniu zadania.
Jak to działa – krok po kroku
-
Odbieranie obrazu z kamery ESP32-CAM
Program odbiera strumień wideo (MJPEG) z adresu IP kamery:self.cap = cv2.VideoCapture(ESP32_STREAM_URL)
-
Wstępna analiza obrazu
Przetwarzanie każdej klatki obejmuje:-
Rozmycie obrazu (redukcja szumów):
blur = cv2.GaussianBlur(frame, (21, 21), 0)
-
Konwersję do przestrzeni HSV:
hsv = cv2.cvtColor(blur, cv2.COLOR_BGR2HSV)
-
Binaryzację – maskowanie kolorów ognia w ustalonym zakresie HSV:
mask = cv2.inRange(hsv, LOWER, UPPER)
-
-
Wykrywanie konturów płomienia
Na masce HSV wykrywane są kontury:contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
Kontury o powierzchni mniejszej niż
MIN_AREA
są ignorowane. Ogień musi być wystarczająco duży, by został uznany za istotny. -
Śledzenie aktywnych regionów ognia przez wiele klatek
Aby uniknąć fałszywych detekcji, system śledzi, czy wykryty region występuje przez kilka kolejnych klatek. Dopiero wtedy uznaje, że ogień rzeczywiście istnieje w scenie. -
Podejmowanie decyzji – trzy tryby działania robota
Program dynamicznie przełącza się między trybami:-
Skanowanie – robot obraca się w prawo z przerwami na analizę obrazu.
-
Precyzyjne ustawianie – robot wykonuje mikro-obrót w lewo/prawo, aż ogień znajdzie się w centralnej strefie obrazu.
-
Podejście – robot porusza się do przodu przez określony czas (
APPROACH_DURATION
), po czym się zatrzymuje.
-
-
Wysyłanie komend do robota przez Wi-Fi
Polecenia sterujące ruchem (np."forward"
,"left"
,"stop"
) są wysyłane jako żądania HTTP do ESP32, który je interpretuje i steruje silnikami. -
Wizualizacja i informacja zwrotna
Na ekranie komputera wyświetlany jest:-
Strumień wideo z kamery,
-
Maska HSV z obszarami ognia,
-
Prostokąty z podpisami: „Checking…”, „Fire detected”, „FIRE!”.
-
-
Powiadomienie o sukcesie
Po skutecznym wykryciu i podejściu do ognia uruchamiany jest dźwiękowy sygnał zakończenia misji.
Zastosowane algorytmy i techniki
-
HSV – wykrywanie ognia na podstawie koloru
Przestrzeń HSV lepiej niż RGB oddziela cechy kolorystyczne ognia. Zakres barw (odcienie pomarańczowo-czerwone) umożliwia łatwe binaryzowanie. -
Morfologia obrazu
Operacje morfologiczne (zamykanie) usuwają szumy, a kontury ognia są łączone w większe spójne obszary. -
Algorytm śledzenia ognia między klatkami
System przechowuje informacje o wykrytych obszarach przez wiele klatek. Jeśli ogień znika tylko na moment, detekcja nie jest przerywana. -
Decyzje sterujące na podstawie pozycji ognia
Pozycja środka wykrytego ognia w kadrze (lewa/prawa/centrum) służy do precyzyjnego ustawienia się względem celu. -
Stan wewnętrzny programu
System zachowuje dane o czasie ostatniego wykrycia, aktualnym trybie i kierunku ruchu, by podejmować właściwe decyzje na podstawie historii.
Trudności i wyzwania techniczne
Projekt wymagał dopracowania wielu parametrów, zwłaszcza w zakresie:
-
Doboru zakresów HSV dla płomienia – zależnie od oświetlenia i koloru otoczenia, ogień może być różnie interpretowany. Wymagało to testowania wielu kombinacji wartości
LOWER
iUPPER
. -
Precyzyjnego sterowania ruchem robota – mikro-ruchy oraz czas trwania poszczególnych komend musiały być dostrajane eksperymentalnie, by robot mógł dokładnie się ustawić.
-
Opóźnień wynikających z transmisji wideo i poleceń – ze względu na opóźnienia w przesyłaniu obrazu z ESP32-CAM (średnio kilkaset ms) oraz w przetwarzaniu, trudne było osiągnięcie całkowicie płynnej reakcji robota. Wymusiło to kompromisy w czasie podejmowania decyzji.



import cv2
import numpy as np
import time
import requests
import winsound # For Windows notification sound
# ======== Optimized Configuration ========
ESP32_STREAM_URL = "http://192.168.4.1:81/stream"
COMMAND_URL = "http://192.168.4.1/action?go="
MIN_AREA = 100
CONFIRMATION_AREA = 150
PERSISTENCE_FRAMES = 2
DETECTION_FPS = 12
SIZE_BUFFER = 10
CENTER_BAND_MIN = 0.35
CENTER_BAND_MAX = 0.75
SCAN_TURN_DURATION = 0.03
SCAN_WAIT_DURATION = 1.2
PRECISION_TURN_DURATION = 0.001
APPROACH_DURATION = 0.5 # 1 second forward movement after centering
# HSV color range adjustments
LOWER = np.array([2, 20, 250], dtype="uint8")
UPPER = np.array([20, 230, 255], dtype="uint8")
class EnhancedFireDetector:
def __init__(self):
self.cap = cv2.VideoCapture(ESP32_STREAM_URL)
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
self.active_regions = {}
self.frame_count = 0
self.kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5,5))
# State management
self.scanning = True
self.precision_mode = False
self.approach_mode = False
self.scan_state = "TURNING"
self.last_state_change = time.time()
self.fire_center_x = None
self.fire_area = None
self.frame_width = 0
self.precision_direction = None
self.last_command_time = 0
self.command_delay = 0.03
self.last_detection_time = 0
self.last_turn_start = 0
self.approach_start = 0
self.mission_complete = False
self.notification_sent = False
def send_command(self, command):
"""Send movement command to ESP32 with rate limiting"""
current_time = time.time()
if current_time - self.last_command_time < self.command_delay and command != "stop":
return
try:
response = requests.get(COMMAND_URL + command, timeout=0.1)
if response.status_code == 200:
print(f"Command sent: {command}")
else:
print(f"Command failed: {response.status_code}")
self.last_command_time = current_time
except Exception as e:
print(f"Command error: {e}")
def process_frame(self, frame):
self.frame_count += 1
# Enhanced processing pipeline
blur = cv2.GaussianBlur(frame, (21, 21), 0)
hsv = cv2.cvtColor(blur, cv2.COLOR_BGR2HSV)
mask = cv2.inRange(hsv, LOWER, UPPER)
# Morphological improvements
mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, self.kernel, iterations=2)
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
current_regions = []
for cnt in contours:
area = cv2.contourArea(cnt)
if area > MIN_AREA:
x, y, w, h = cv2.boundingRect(cnt)
# Add size buffer to regions
x = max(0, x - SIZE_BUFFER//2)
y = max(0, y - SIZE_BUFFER//2)
w = min(frame.shape[1]-x, w + SIZE_BUFFER)
h = min(frame.shape[0]-y, h + SIZE_BUFFER)
current_regions.append((x, y, w, h))
return mask, current_regions
def update_regions(self, new_regions):
# Reset counter for this frame
current_ids = []
# Update existing regions with position smoothing
for region_id, region_data in list(self.active_regions.items()):
x, y, w, h, counter = region_data
best_match = None
min_distance = float('inf')
# Find closest matching new region
for new in new_regions:
nx, ny, nw, nh = new
# Calculate center points
cx = x + w/2
cy = y + h/2
ncx = nx + nw/2
ncy = ny + nh/2
distance = ((cx - ncx)**2 + (cy - ncy)**2)**0.5
# Match if centers are close and size similar
if distance < max(w, h)*0.5 and abs(w-nw) < w*0.5 and abs(h-nh) < h*0.5:
if distance < min_distance:
best_match = new
min_distance = distance
if best_match:
nx, ny, nw, nh = best_match
# Smooth position transition (60% historical, 40% new)
smoothed = (
int(x * 0.6 + nx * 0.4),
int(y * 0.6 + ny * 0.4),
int(w * 0.6 + nw * 0.4),
int(h * 0.6 + nh * 0.4),
counter + 1
)
self.active_regions[region_id] = smoothed
current_ids.append(region_id)
elif counter > 0:
# Decrement counter instead of immediate removal
self.active_regions[region_id] = (x, y, w, h, counter - 1)
current_ids.append(region_id)
# Add new regions
for new in new_regions:
nx, ny, nw, nh = new
matched = False
# Check if already tracked
for region_id in self.active_regions:
if region_id in current_ids:
continue
# Add new region with counter
region_id = self.frame_count
self.active_regions[region_id] = (nx, ny, nw, nh, 1)
current_ids.append(region_id)
# Remove stale regions
for region_id in list(self.active_regions.keys()):
if region_id not in current_ids:
del self.active_regions[region_id]
def detect_fire(self):
"""Detect largest fire region with persistence"""
self.fire_center_x = None
self.fire_area = None
max_area = 0
max_region = None
for region_id, region_data in self.active_regions.items():
x, y, w, h, counter = region_data
area = w * h
# Require persistence and minimum size
if counter >= PERSISTENCE_FRAMES and area >= CONFIRMATION_AREA:
if area > max_area:
max_area = area
center_x = x + w/2
max_region = (center_x, area)
if max_region:
self.fire_center_x, self.fire_area = max_region
self.last_detection_time = time.time()
return True
return False
def draw_stable_boxes(self, frame):
for region_id, region_data in self.active_regions.items():
x, y, w, h, counter = region_data
x2 = x + w
y2 = y + h
# Boundary checks
x = max(0, x)
y = max(0, y)
x2 = min(frame.shape[1], x2)
y2 = min(frame.shape[0], y2)
w = x2 - x
h = y2 - y
area = w * h
# Size-based confirmation
if counter >= PERSISTENCE_FRAMES and area >= CONFIRMATION_AREA:
color = (0, 0, 255) # Red - high confidence
label = "FIRE!"
thickness = 2
elif counter >= PERSISTENCE_FRAMES/2:
color = (0, 165, 255) # Orange - confirmed
label = "Fire detected"
thickness = 2
else:
color = (0, 255, 255) # Yellow - investigating
label = f"Checking... {counter}/{PERSISTENCE_FRAMES}"
thickness = 1
# Draw enhanced boxes
cv2.rectangle(frame, (x, y), (x2, y2), color, thickness)
# Text background
(tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 1)
cv2.rectangle(frame, (x, y-25), (x+tw, y), color, -1)
# Draw text
cv2.putText(frame, label, (x, y-10),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255,255,255), 1)
# Area information
cv2.putText(frame, f"{area}px", (x, y2+15),
cv2.FONT_HERSHEY_SIMPLEX, 0.4, color, 1)
# Draw mission status
if self.mission_complete:
cv2.putText(frame, "MISSION COMPLETE: FIRE LOCATED", (50, 50),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
def run_scanning_mode(self):
"""Handle scanning behavior with timed movements"""
current_time = time.time()
elapsed = current_time - self.last_state_change
if self.scan_state == "TURNING":
if elapsed >= SCAN_TURN_DURATION:
self.send_command("stop")
self.scan_state = "WAITING"
self.last_state_change = current_time
print("Scan: Stopped for detection")
else:
# Continue turning
self.send_command("right")
elif self.scan_state == "WAITING":
if elapsed >= SCAN_WAIT_DURATION:
self.send_command("right")
self.scan_state = "TURNING"
self.last_state_change = current_time
print("Scan: Turning right")
def run_precision_mode(self):
"""Center the detected fire with micro-movements"""
if self.fire_center_x is None or self.frame_width == 0:
self.precision_direction = None
return False
center_band_min = self.frame_width * CENTER_BAND_MIN
center_band_max = self.frame_width * CENTER_BAND_MAX
current_time = time.time()
# 1. Check if fire is centered
if center_band_min <= self.fire_center_x <= center_band_max:
print("Fire centered! Starting approach.")
self.send_command("stop")
return True
# 2. Determine direction if not set
if self.precision_direction is None:
self.precision_direction = "LEFT" if self.fire_center_x < center_band_min else "RIGHT"
print(f"Precision: Turning {self.precision_direction}")
self.last_turn_start = current_time
self.send_command("left" if self.precision_direction == "LEFT" else "right")
return False
# 3. Check if we need to stop turning
if current_time - self.last_turn_start >= PRECISION_TURN_DURATION:
self.send_command("stop")
# Reset direction to re-evaluate in next frame
self.precision_direction = None
print("Precision: Stopped after micro-turn")
return False
def run_approach_mode(self):
"""Move forward for 1 second then stop permanently"""
current_time = time.time()
# Start the approach timer
if self.approach_start == 0:
self.approach_start = current_time
self.send_command("forward")
print("Approach: Moving forward for 1 second")
return False
# Check if approach duration completed
if current_time - self.approach_start >= APPROACH_DURATION:
self.send_command("stop")
self.mission_complete = True
self.approach_mode = False
print("Approach complete! Mission finished.")
return True
return False
def send_notification(self):
"""Send Windows notification when mission is complete"""
if not self.notification_sent:
print("FIRE DETECTED AND LOCATED - MISSION COMPLETE")
# Windows beep sound
winsound.Beep(1000, 1000) # Frequency 1000Hz, duration 1000ms
self.notification_sent = True
def run(self):
try:
while True:
ret, frame = self.cap.read()
if not ret:
print("Reconnecting...")
self.cap.release()
self.cap = cv2.VideoCapture(ESP32_STREAM_URL)
time.sleep(1)
continue
# Store frame dimensions
if frame is not None and frame.size > 0:
self.frame_width = frame.shape[1]
# Skip processing if mission complete
if not self.mission_complete:
# Process frame and detect fire
mask, regions = self.process_frame(frame)
self.update_regions(regions)
fire_detected = self.detect_fire()
# Draw detection results
self.draw_stable_boxes(frame)
# Display frames
cv2.imshow("Fire Detection", frame)
if mask is not None:
cv2.imshow("Analysis Mask", mask)
# State machine logic
if self.approach_mode:
if self.run_approach_mode():
self.send_notification()
elif self.precision_mode:
centered = self.run_precision_mode()
if centered:
# Switch to approach mode
self.precision_mode = False
self.approach_mode = True
elif not fire_detected and (time.time() - self.last_detection_time > 0.5):
# Lost fire during precision mode
print("Fire lost! Returning to scanning.")
self.precision_mode = False
self.scanning = True
self.scan_state = "TURNING"
self.last_state_change = time.time()
self.send_command("right")
elif self.scanning:
self.run_scanning_mode()
# Check if fire detected to switch modes
if fire_detected:
print(f"Fire detected! Center: {self.fire_center_x:.1f}")
self.scanning = False
self.precision_mode = True
self.precision_direction = None # Reset for new approach
self.send_command("stop")
else:
# Mission complete - just display frame
self.draw_stable_boxes(frame)
cv2.imshow("Fire Detection", frame)
self.send_notification()
# Manual controls
key = cv2.waitKey(1) & 0xFF
if key == 27: # ESC
break
elif key == ord('w'):
self.send_command("forward")
# Reset automation
self.scanning = False
self.precision_mode = False
self.approach_mode = False
elif key == ord('s'):
self.send_command("backward")
self.scanning = False
self.precision_mode = False
self.approach_mode = False
elif key == ord('a'):
self.send_command("left")
self.scanning = False
self.precision_mode = False
self.approach_mode = False
elif key == ord('d'):
self.send_command("right")
self.scanning = False
self.precision_mode = False
self.approach_mode = False
elif key == ord(' '):
self.send_command("stop")
# Reset automation
self.scanning = True
self.precision_mode = False
self.approach_mode = False
self.scan_state = "TURNING"
self.last_state_change = time.time()
elif key == ord('r'): # Resume scanning
self.scanning = True
self.precision_mode = False
self.approach_mode = False
self.mission_complete = False
self.notification_sent = False
self.scan_state = "TURNING"
self.last_state_change = time.time()
self.send_command("right")
finally:
self.send_command("stop")
self.cap.release()
cv2.destroyAllWindows()
if __name__ == "__main__":
detector = EnhancedFireDetector()
detector.run()