Poprzez open AI API komunikujemy się z chatGPT. Użytkownik w terminalu komputera (podłączonego do esp32) wpisuje komendę, a AI w odpowiedzi wysyła odpowiedni plik JSON, który następnie jest interpretowany i uaktywnia odpowiednią część kodu. Zaimplementowane opcje to: zapalanie/gaszenie dwóch diod, pomiar temperatury, czy też prymitywny termometr.
Niezbędne element:
-Esp32 wraz z kablem USB
-termometr (w moim przypadku dht11)
-pasek diod RGB Neopixel (+ rezystor jeżeli nie jest bazowo wlutowany w pasek)
-dioda zielona (wykorzystuje też niebieską wbudowaną w esp)
-rezystor 220ohm
-płytka stykowa
-kabelki
Komputer z możliwością programowania w mikropythonie.
Ogólny opis działania:
• Użytkownik w porcie szeregowym thonny’ego (przy esp podłączonym do komputera) wprowadza jedno z haseł: 'zapal/zgaś diodę/y' 'włącz/wyłącz termometr', 'podaj temperaturę', 'exit/quit'.
• Wprowadzony tekst jest wprowadzany do pliku JSON (który zawiera w sobie instrukcje do chataGPT).
• Json jest zamieniany na string i wysyłany do serwera openAI, gdzie generowana jest odpowiedź również jako string.
• Kod zamienia odpowiedź na plik JSON, a następnie ją interpretuje tak, aby aktywować odpowiednią funkcjonalność. Na przykład włączenie termometru, który luźno reprezentuje temperaturę w pomieszczeniu (mając tylko, 8 diod termometr rejestruje temperaturę tylko w przedziale 8 stopni, aby można było zauważyć zmianę przez np. chuchnięcie).
Jak włączyć openAI API (I co to właściwie jest)?
Application Programming Interface, w skrócie openAI API, prostym językiem mówiąc, jest to łącznik pomiędzy kodem (urządzeniem) użytkownika a serwerem AI API. !!!nie jest to po prostu chat GPT jaki każdy zna i posiadanie płatnego GPT4 nie przekłada się!!! W API wykupuje się „tokeny”, które są naliczane za wysłanie pojedynczej wiadomości, waga wiadomości różni się w zależności, od jakiego modelu używamy, wielkości wiadomości oraz jaki typ plików przesyłamy (wszystko jest ustalane w kodzie). Ja w tym projekcie wybrałem najtańszą opcję, wysyłałem tylko pliki tekstowe do najsłabszego modelu w ofercie (gpt3-turbo) i nie zużyłem nawet centa z początkowo wpłaconych 5 dolarów (minimalna wartość wpłacenia). (Na liczbach, 5 dolarów to około 2,5 mln tokenów, przez cały projekt, czyli wysłanie około 80 poleceń zużyłem niecałe 20 tys.).
Jak łączyć się z API
1) Zakładamy konto na https://platform.openai.com/docs/overview
2) Jeżeli wcześniej nie korzystaliśmy nawet z chatGPT, to przysługuje darmowa próbka i wystarczy wejść w dashboard (prawy górny róg)-> API keys (po lewej) -> create secret new key-> kopiujemy klucz do wklejenia go do kodu.
3) Jeżeli jednak nie mamy tokenów na koncie, to trzeba je dokupić poprzez docs-> search-> billing-> add to credit balance i wtedy już powinno działać.
Hardware: wszystkie elementy są wymienione powyżej, a schemat pokazuje, jak zostały podłączone. Warto zaznaczyć, że esp32 ma wbudowany moduł wifi.
Software: w kodzie każda sekcja została odpowiednio nazwana, ponumerowana oraz zakomentowana z wyjaśnieniami.
Z rzeczy zasługujących na szczególną uwagę:
-Komunikacja z chatgpt-Do openAI wysyłamy plik JSON zamieniony na stringa (za pomocą ujson.dumps(data)).ponieważ jest to prostsze i tańsze rozwiązanie niż wysyłanie plików. Rezultat też otrzymujemy w formie stringa i zamieniamy na plik JSON (za pomocą ujson.loads(anwser)).
-Wielowątkowość (multithreading)- Polega na tym, że poprzez linijkę „_thread.start_new_thread”, tworzymy nowy wątek (główny oraz wątek termometru). Pozornie mikrokontroler wykonuje wtedy dwie czynności jednocześnie, lecz w praktyce bardzo szybko przełącza się między nimi. Sterowanie działaniem drugiego wątku odbywa się za pomocą flagi run_thermometer, gdy jest False, pętla w thermometer_thread() się kończy i wątek się wyłącza.
Zastosowanie:
Prototypy IoT – baza do tworzenia bardziej rozbudowanych projektów (np. z czujnikami i wyświetlaniem stanu w chmurze).
Interaktywny asystent – wprowadzenie poleceń w stylu naturalnego języka, które są interpretowane przez ChatGPT i wykonywane przez ESP32.
############################################################################
# 1. IMPORTY i BIBLIOTEKI
############################################################################
import network
import urequests
import ujson
import time
from machine import Pin
import dht
import neopixel
# !!! Import modułu do wielowątkowości !!!
import _thread
############################################################################
# ZMIENNA GLOBALNA WLAN
############################################################################
wlan = None # Tu przechowujemy obiekt WLAN
############################################################################
# 2. KONFIGURACJA SIECI Wi-Fi i OPENAI
############################################################################
SSID = '...'
PASSWORD = ...'
API_KEY = 'sk-...'
############################################################################
# 3. DEFINICJA URZĄDZEŃ (CZUJNIK DHT11, DIODY, PASEK LED)
############################################################################
# Inicjalizacja czujnika DHT11 na pinie GPIO26.
sensor = dht.DHT11(Pin(26))
# Inicjalizacja zwykłych diod LED:
blue_led = Pin(2, Pin.OUT)
blue_led.value(0) # Ustawiamy początkowo na wyłączoną (0)
green_led = Pin(12, Pin.OUT)
green_led.value(0) # Początkowo wyłączona
# Konfiguracja paska NeoPixel (WS2812) z 8 diodami na pinie GPIO14.
LED_PIN = 14
NUM_LEDS = 8
strip = neopixel.NeoPixel(Pin(LED_PIN), NUM_LEDS)
# Zmienne globalne do przechowywania ostatniego odczytu z czujnika DHT11.
global_temperature = None
global_humidity = None
# Flagi i blokady do wielowątkowości:
run_thermometer = False # Gdy True, wątek termometru ma działać.
thermometer_thread_started = False # Sygnalizuje, czy wątek już został uruchomiony.
############################################################################
# 4. FUNKCJA DO POŁĄCZENIA Z SIECIĄ Wi-Fi
############################################################################
def connect_wifi():
"""
Inicjalizuje globalny obiekt wlan (jeśli jeszcze nie istnieje)
i łączy się z siecią Wi-Fi o podanym SSID i haśle.
"""
global wlan
if wlan is None:
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
# Sprawdzamy, czy ESP jest już połączone z siecią; jeśli tak, kończymy.
if wlan.isconnected():
print("Already connected to WiFi")
return
wlan.disconnect()
# Próba połączenia z siecią Wi-Fi przy użyciu SSID i hasła.
wlan.connect(SSID, PASSWORD)
# Czekamy maksymalnie 10 razy, co 3 sekundy, aż połączenie się powiedzie.
retries = 0
while not wlan.isconnected() and retries < 10:
time.sleep(3)
print("Connecting to WiFi...")
retries += 1
# Po przekroczeniu liczby prób sprawdzamy, czy udało się połączyć.
if wlan.isconnected():
print("Connected to WiFi")
else:
print("Failed to connect to WiFi after several attempts.")
############################################################################
# 5. FUNKCJE POMOCNICZE DO PASKA LED
############################################################################
def wyczysc_pasek():
"""
Gasi wszystkie diody na pasku NeoPixel (ustawiając kolor czarny).
"""
for i in range(NUM_LEDS):
strip[i] = (0, 0, 0)
strip.write()
def ustaw_pasek_termometru(temp):
"""
Zmienia liczbę zapalonych diod w zależności od wartości temperatury (temp).
Minimalna temperatura to 14, a maksymalna to 14 + NUM_LEDS (22).
Diody 0..2 -> zielone
Diody 3..5 -> żółte
Diody 6..7 -> czerwone
"""
temp_dolna = 14
temp_gorna = temp_dolna + NUM_LEDS
if temp < temp_dolna:
temp = temp_dolna
elif temp > temp_gorna:
temp = temp_gorna
leds_to_light = temp - temp_dolna
if leds_to_light > NUM_LEDS:
leds_to_light = NUM_LEDS
for i in range(NUM_LEDS):
strip[i] = (0, 0, 0)
green = (0, 255, 0)
yellow = (255, 255, 0)
red = (255, 0, 0)
for led_index in range(leds_to_light):
if led_index <= 2:
strip[led_index] = green
elif led_index <= 5:
strip[led_index] = yellow
else:
strip[led_index] = red
strip.write()
print(f"[Thread] Temp: {temp}C => zapalamy {leds_to_light} diod (termometr)")
############################################################################
# 6. FUNKCJA WĄTKU TERMOMETRU
############################################################################
def thermometer_thread():
"""
Wątek uruchamiany w tle, cyklicznie (co 5 sekund) pobiera
temperaturę i wilgotność z czujnika DHT11, a następnie
aktualizuje pasek NeoPixel.
Działa dopóki flaga 'run_thermometer' jest True.
Po jej ustawieniu na False wątek przerywa pętlę, czyści pasek
i kończy działanie.
"""
global run_thermometer
global global_temperature
global global_humidity
print("[Thread] Uruchomiono wątek termometru.")
while run_thermometer:
sensor_failure = False
try:
sensor.measure()
t = sensor.temperature()
h = sensor.humidity()
if t is None or h is None:
sensor_failure = True
else:
global_temperature = t
global_humidity = h
except OSError as e:
sensor_failure = True
if not sensor_failure and global_temperature is not None:
ustaw_pasek_termometru(global_temperature)
else:
print("[Thread] Blad odczytu DHT11 albo None")
time.sleep(5)
print("[Thread] Wątek termometru zakończył działanie.")
wyczysc_pasek()
############################################################################
# 7. FUNKCJA "query_openai(prompt)"
############################################################################
def query_openai(prompt):
"""
Wysyła prompt do ChatGPT i oczekuje odpowiedzi w formacie JSON
z kluczem 'action', co decyduje o dalszych krokach w ESP32.
"""
url = "https://api.openai.com/v1/chat/completions"
system_instructions = (
"Jestes asystentem. "
"Jesli uzytkownik pisze cos podobnego do 'wlacz termometr', zwroc: {\"action\":\"termometr\"}. "
"Jesli pisze 'wylacz termometr', zwroc: {\"action\":\"stop_thermometer\"}. "
"Jesli pisze 'podaj temperature', zwroc: {\"action\":\"report_temperature\"}. "
"Jesli pisze 'zapal niebieska diode', zwroc: {\"action\":\"set_pin\",\"pin\":2,\"state\":\"HIGH\"}. "
"Jesli pisze 'zgas niebieska diode', zwroc: {\"action\":\"set_pin\",\"pin\":2,\"state\":\"LOW\"}. "
"Jesli pisze 'zapal zielona diode', zwroc: {\"action\":\"set_pin\",\"pin\":12,\"state\":\"HIGH\"}. "
"Jesli pisze 'zgas zielona diode', zwroc: {\"action\":\"set_pin\",\"pin\":12,\"state\":\"LOW\"}. "
"Jesli pisze 'zapal obydwie diody', zwroc: {\"action\":\"set_pins\",\"pins\":[2,12],\"state\":\"HIGH\"}. "
"Jesli pisze 'zgas obydwie diody', zwroc: {\"action\":\"set_pins\",\"pins\":[2,12],\"state\":\"LOW\"}. "
"Nic wiecej nie zwracaj."
)
data = {
"model": "gpt-3.5-turbo",
"messages": [
{"role": "system", "content": system_instructions},
{"role": "user", "content": prompt}
],
"max_tokens": 200
}
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {API_KEY}"
}
json_payload = ujson.dumps(data)
print("Wysylany JSON:", json_payload)
response = urequests.post(url, data=json_payload, headers=headers)
print("Status code:", response.status_code)
if response.status_code == 200:
result = response.json()
answer = result["choices"][0]["message"]["content"]
print("Response from OpenAI:")
print(answer)
try:
action_data = ujson.loads(answer)
action = action_data.get("action")
if action == "termometr":
start_thermometer_thread()
elif action == "stop_thermometer":
stop_thermometer_thread()
elif action == "report_temperature":
show_temperature()
elif action == "set_pin":
pin_number = action_data.get("pin")
state = action_data.get("state")
set_pin(pin_number, state)
elif action == "set_pins":
pins = action_data.get("pins", [])
state = action_data.get("state")
set_pins(pins, state)
else:
print("Nieznana akcja:", action)
except ValueError:
print("Odpowiedz nie jest poprawnym JSON.")
else:
print("Error:", response.status_code)
print(response.text)
response.close()
############################################################################
# 8. FUNKCJE STERUJĄCE DIODAMI
############################################################################
def set_pin(pin_number, state):
"""
Steruje jedną diodą GPIO2 (niebieska) lub GPIO12 (zielona)
w zależności od parametru state (HIGH/LOW).
"""
if pin_number == 2:
if state == "HIGH":
blue_led.value(1)
print("Niebieska dioda zapalona!")
elif state == "LOW":
blue_led.value(0)
print("Niebieska dioda zgaszona!")
else:
print("Nieznany stan diody:", state)
elif pin_number == 12:
if state == "HIGH":
green_led.value(1)
print("Zielona dioda zapalona!")
elif state == "LOW":
green_led.value(0)
print("Zielona dioda zgaszona!")
else:
print("Nieznany stan diody:", state)
def set_pins(pins, state):
"""
Sterowanie kilkoma pinami jednocześnie (np. [2, 12]) - zapalanie/gaszenie.
"""
for p in pins:
if p == 2:
blue_led.value(1 if state == "HIGH" else 0)
elif p == 12:
green_led.value(1 if state == "HIGH" else 0)
else:
print("Nieznany pin:", p)
if state == "HIGH":
print("Obydwie diody zapalone!")
else:
print("Obydwie diody zgaszone!")
############################################################################
# 9. FUNKCJA "show_temperature()" - WYPISANIE POMIARU
############################################################################
def show_temperature():
"""
Wypisuje ostatnio zmierzoną temperaturę z global_temperature,
jeśli dostępna. W przeciwnym razie sygnalizuje błąd.
"""
if global_temperature is not None:
print(f"Aktualna temperatura: {global_temperature}C")
else:
print("Brak poprawnego pomiaru temperatury (DHT11 nie dziala).")
############################################################################
# 10. OBSŁUGA WĄTKU TERMOMETRU - START/STOP
############################################################################
def start_thermometer_thread():
"""
Ustawia run_thermometer = True i uruchamia wątek, jeśli jeszcze nie wystartował
lub jeśli został wcześniej zatrzymany.
"""
global run_thermometer
global thermometer_thread_started
if run_thermometer:
print("[INFO] Wątek termometru już działa.")
return
run_thermometer = True
if not thermometer_thread_started:
_thread.start_new_thread(thermometer_thread, ())
thermometer_thread_started = True
print("[INFO] Uruchomiono tryb termometru w tle.")
def stop_thermometer_thread():
"""
Ustawia run_thermometer = False, wątek sam się zakończy,
a w jego logu zobaczymy info o zakończeniu.
"""
global run_thermometer
if run_thermometer:
run_thermometer = False
print("[INFO] Zatrzymywanie wątku termometru...")
else:
print("[INFO] Wątek termometru już jest zatrzymany.")
############################################################################
# 11. FUNKCJA "main()" - GŁÓWNA PETLA PROGRAMU
############################################################################
def main():
"""
Funkcja główna programu:
1. Łączy się z Wi-Fi (connect_wifi()).
2. Sprawdza, czy faktycznie uzyskano połączenie (wlan i wlan.isconnected()).
3. W pętli oczekuje na komendy użytkownika (wpisywane w REPL).
4. Dla wybranych komend (np. 'wlacz termometr', 'podaj temperature')
wysyła zapytania do ChatGPT i wykonuje akcje według zwróconego JSON.
5. Przy wyjściu ('exit'/'quit') zatrzymuje ewentualny wątek termometru.
"""
connect_wifi()
# Sprawdzamy, czy wlan jest zdefiniowane i czy jest połączenie:
global wlan
if wlan and wlan.isconnected():
while True:
user_prompt = input(
"\nWpisz polecenie (np. 'wlacz termometr', 'podaj temperature', 'exit/quit'): "
)
if user_prompt.strip().lower() in ["exit", "quit"]:
print("Koniec dzialania.")
stop_thermometer_thread()
break
sensor_failure = False
try:
sensor.measure()
temp = sensor.temperature()
hum = sensor.humidity()
if temp is None or hum is None:
print("Odczyt DHT11 = None")
sensor_failure = True
else:
global global_temperature
global global_humidity
global_temperature = temp
global_humidity = hum
except OSError as e:
print("Blad odczytu DHT11:", e)
sensor_failure = True
if sensor_failure and "podaj temperature" in user_prompt.lower():
print("Nie mozna odpowiedziec - blad odczytu DHT11.")
continue
query_openai(user_prompt)
else:
print("Brak połączenia z Wi-Fi. Uruchom ponownie urządzenie lub sprawdź sieć.")
############################################################################
# 12. WYWOŁANIE MAIN()
############################################################################
main()