Wykrywanie urządzeń w otoczeniu komunikujących się protokołem BLE za pomocą esp32

Typ_projektu
microPython
Zdjecie główne
Esp32 i komunikacja BLE
Krótki opis projektu

Projekt opiera się na wykrywaniu urządzeń Bluetooth Low Energy (BLE) za pomocą ESP32. System skanuje otoczenie w celu identyfikacji urządzeń BLE, zapisując informacje o ich adresach MAC, nazwach, sygnale RSSI oraz danych reklamowych. Urządzenia są prezentowane w czytelnej liście, można je zapisać do pliku lub wyświetlić jego zawartość. Projekt umożliwia też zarządzanie plikami zapisu

Niezbędne elementy

[przykładowe - prosimy o edycję]

1. Płytka esp32

2. Urządzenie komunikujące się przez protokół BLE(opcjonalnie telefon z aplikacją:LightBlue)

 

Sprzęt

ESP32

Kompilator Thonny

Opis projektu

Projekt służy do wykrywania urządzeń Bluetooth Low Energy (BLE) przy użyciu ESP32. Wykorzystuje protokół BLE do skanowania otoczenia w celu identyfikacji pobliskich urządzeń. Oto szczegółowy opis wszystkich funkcjonalności zaimplementowanych w kodzie:

Klasa BLEScan

ner

Obsługuje skanowanie urządzeń BLE, przechowywanie ich danych i interpretację wyników.

  1. Inicjalizacja (__init__):

    • Aktywuje moduł BLE w ESP32.
    • Rejestruje funkcję obsługi zdarzeń BLE.
  2. Obsługa zdarzeń (_irq):

    • IRQ_SCAN_RESULT: Przechwytuje wyniki skanowania, zapisuje adres MAC, siłę sygnału (RSSI), dane reklamowe i odpowiedzi na skanowanie.
    • IRQ_SCAN_DONE: Ustawia flagę zakończenia skanowania.
  3. Skanowanie (scan):

    • Rozpoczyna aktywne skanowanie z ustawionymi parametrami czasu trwania, interwału i okna skanowania.
    • Zbiera dane o urządzeniach, takie jak nazwy, RSSI i adresy MAC.
    • Dane są sortowane i zwracane w postaci listy.
  4. Dekodowanie nazw urządzeń (_decode_name):

    • Wyszukuje nazwy w danych reklamowych i odpowiedziach na skanowanie.
    • Obsługuje pełne oraz skrócone nazwy urządzeń.

Funkcje Pomocnicze

  1. Wyświetlanie urządzeń (display_devices):

    • Prezentuje listę wykrytych urządzeń w czytelnej formie (adres MAC, nazwa, RSSI).
  2. Zapisywanie urządzeń do pliku (save_devices_to_file):

    • Zapisuje dane urządzeń (adres, nazwa, RSSI) do pliku tekstowego.
  3. Usuwanie plików (delete_file i delete_selected_file):

    • delete_file: Usuwa domyślny plik z zapisanymi urządzeniami (devices.txt).
    • delete_selected_file: Wyświetla listę plików w katalogu i umożliwia użytkownikowi wybranie pliku do usunięcia.
  4. Otwieranie plików (open_file):

    • Wyświetla listę plików w katalogu.
    • Otwiera wybrany plik i wyświetla jego zawartość.

Menu główne (main)

Menu projektu

Główna pętla programu umożliwia użytkownikowi wybór jednej z opcji:

  1. Wyświetlenie wykrytych urządzeń: Skanuje otoczenie, sortuje urządzenia według RSSI i wyświetla wyniki.
  2. Zapisanie urządzeń do pliku: Zapisuje listę urządzeń do pliku tekstowego.
  3. Usunięcie pliku: Usuwa domyślny plik z zapisanymi urządzeniami.
  4. Otwarcie pliku: Wyświetla zawartość wybranego pliku z listy.
  5. Usunięcie wybranego pliku: Pozwala usunąć dowolny plik w katalogu.

Testowanie z aplikacją LightBlue

aplikacja LightBlue

Do testowania użyto aplikacji LightBlue, która pozwala symulować urządzenia BLE. Dzięki temu można było zweryfikować poprawność działania modułu, w tym interpretację danych reklamowych i odpowiedzi na skanowanie.

Zastosowanie

Kod umożliwia identyfikację urządzeń BLE w różnych środowiskach, zapis danych do analizy oraz zarządzanie wynikami. Przydatny w projektach związanych z monitorowaniem otoczenia BLE, IoT, czy lokalizacją urządzeń.

Zdjęcia
esp + thonny
Znajdywanie urządzeń
kod programu
# Stałe dla zdarzeń BLE
_IRQ_SCAN_RESULT = const(5)
_IRQ_SCAN_DONE = const(6)

# Typy reklam
_ADV_TYPE_SCAN_RSP = const(0x04)

class BLEScanner:
   def __init__(self):
       self._ble = ubluetooth.BLE()
       self._ble.active(True)
       self._ble.irq(self._irq)
       self.devices = {}
       self.scan_complete = False

   def _irq(self, event, data):
       if event == _IRQ_SCAN_RESULT:
           addr_type, addr, adv_type, rssi, adv_data = data
           addr_str = ':'.join('{:02X}'.format(b) for b in addr)
           # Konwertuj adv_data do bajtów
           adv_data = bytes(adv_data)
           # Jeśli urządzenie nie jest jeszcze na liście, dodaj je
           if addr_str not in self.devices:
               self.devices[addr_str] = {'address': addr_str, 'name': None, 'rssi': rssi, 'adv_data': b'', 'scan_resp': b''}
           device = self.devices[addr_str]
           device['rssi'] = rssi  # Aktualizuj RSSI
           if adv_type == _ADV_TYPE_SCAN_RSP:
               # To są dane odpowiedzi na skanowanie
               device['scan_resp'] = adv_data
           else:
               # To są dane reklamowe
               device['adv_data'] = adv_data
           # Próbuj odczytać nazwę z danych reklamowych lub odpowiedzi na skanowanie
           name = self._decode_name(device['adv_data']) or self._decode_name(device['scan_resp'])
           if name:
               device['name'] = name
       elif event == _IRQ_SCAN_DONE:
           # Skanowanie zakończone
           self.scan_complete = True

   def scan(self, duration_ms=30000):
       self.devices = {}
       self.scan_complete = False
       # Ustawienie aktywnego skanowania (active=True)
       # Dostosowanie parametrów interval i window
       scan_interval_us = 50000  # 50 ms
       scan_window_us = 50000    # 50 ms
       self._ble.gap_scan(duration_ms, scan_interval_us, scan_window_us, True)
       while not self.scan_complete:
           time.sleep_ms(100)
       # Konwersja urządzeń do listy
       return list(self.devices.values())

   def _decode_name(self, adv_data):
       # Parsowanie danych reklamowych w celu znalezienia nazwy
       i = 0
       while i + 1 < len(adv_data):
           length = adv_data[i]
           if length == 0 or i + length >= len(adv_data):
               break
           field_type = adv_data[i+1]
           if field_type == 0x09:  # Pełna nazwa urządzenia
               name_bytes = adv_data[i+2:i+1+length]
               try:
                   name = name_bytes.decode('utf-8')
               except UnicodeDecodeError:
                   name = name_bytes.decode('utf-8', 'ignore')
               return name
           elif field_type == 0x08:  # Skrócona nazwa urządzenia
               name_bytes = adv_data[i+2:i+1+length]
               try:
                   name = name_bytes.decode('utf-8')
               except UnicodeDecodeError:
                   name = name_bytes.decode('utf-8', 'ignore')
               return name
           i += length + 1
       return None

def display_devices(devices):
   print("=== Lista Urządzeń ===")
   for idx, device in enumerate(devices):
       name = device['name'] if device['name'] else 'Nieznane'
       print("{}. {} | {} | RSSI: {}".format(idx+1, device['address'], name, device['rssi']))
   print("======================")

def save_devices_to_file(devices, filename='devices.txt'):
   try:
       with open(filename, 'w') as f:
           for device in devices:
               name = device['name'] if device['name'] else 'Nieznane'
               f.write("{},{},{}\n".format(device['address'], name, device['rssi']))
       print("Urządzenia zostały zapisane do pliku.")
   except Exception as e:
       print("Błąd zapisu pliku:", e)

def delete_file(filename='devices.txt'):
   try:
       if filename in os.listdir():
           os.remove(filename)
           print("Plik '{}' został usunięty.".format(filename))
       else:
           print("Plik '{}' nie istnieje.".format(filename))
   except Exception as e:
       print("Błąd usuwania pliku:", e)

def delete_selected_file():
   files = os.listdir()
   print("=== Lista Plików ===")
   for idx, f in enumerate(files):
       print(f"{idx + 1}. {f}")
   print("====================")
   selection = input("Wybierz numer pliku do usunięcia: ").strip()
   try:
       file_index = int(selection) - 1
       if 0 <= file_index < len(files):
           file_to_delete = files[file_index]
           confirm = input(f"Czy na pewno chcesz usunąć plik '{file_to_delete}'? (t/n): ").strip().lower()
           if confirm == 't':
               try:
                   os.remove(file_to_delete)
                   print(f"Plik '{file_to_delete}' został usunięty.")
               except Exception as e:
                   print("Błąd usuwania pliku:", e)
           else:
               print("Anulowano usuwanie pliku.")
       else:
           print("Nieprawidłowy numer pliku.")
   except ValueError:
       print("Proszę wprowadzić poprawny numer.")

def open_file():
   files = os.listdir()
   print("=== Lista Plików ===")
   for idx, f in enumerate(files):
       print(f"{idx + 1}. {f}")
   print("====================")
   selection = input("Wybierz numer pliku do otwarcia: ").strip()
   try:
       file_index = int(selection) - 1
       if 0 <= file_index < len(files):
           file_to_open = files[file_index]
           print(f"Wybrano plik: '{file_to_open}'")
           with open(file_to_open, 'r') as f:
               print("=== Zawartość Pliku ===")
               print(f.read())
               print("=======================")
       else:
           print("Nieprawidłowy numer pliku.")
   except ValueError:
       print("Proszę wprowadzić poprawny numer.")
   except Exception as e:
       print("Błąd otwarcia pliku:", e)

def main():
   scanner = BLEScanner()
   while True:
       print("\n=== Menu ===")
       print("1. Wyświetl urządzenia")
       print("2. Zapisz urządzenia do pliku")
       print("3. Usuń plik z urządzeniami")
       print("4. Otwórz plik z urządzeniami")
       print("5. Usuń wybrany plik")
       option = input("Wybierz opcję (1-5): ").strip()
       if option == '1':
           print("Skanowanie urządzeń...")
           devices = scanner.scan(duration_ms=30000)
           # Sortowanie urządzeń według RSSI malejąco
           devices.sort(key=lambda x: x['rssi'], reverse=True)
           display_devices(devices)
       elif option == '2':
           if hasattr(scanner, 'devices') and scanner.devices:
               devices = list(scanner.devices.values())
               save_devices_to_file(devices)
           else:
               print("Brak urządzeń do zapisania. Wybierz opcję 1, aby przeskanować urządzenia.")
       elif option == '3':
           delete_file()  # Usuwa domyślny plik 'devices.txt'
       elif option == '4':
           open_file()
       elif option == '5':
           delete_selected_file()
       else:
           print("Nieprawidłowa opcja. Spróbuj ponownie.")

if __name__ == '__main__':
   main()

ęzyk programowania (ostatnia ikona powyżej)

 

Tagi
#esp32 #BLE #IoT #micropython #Scanner