Arduino를 LLM으로 제어하기

개요

Ollama를 사용한 로컬 LLM과 Python을 통해 아두이노를 제어할 수 있다. LLM이 사용자의 입력을 분석하고, 그 결과를 시리얼 통신으로 아두이노에 전달하여 하드웨어를 제어한다.

시스템 구조

사용자 입력 → Python (Ollama API) → LLM 분석 → 시리얼 통신 → Arduino
  1. Python 스크립트: 사용자 입력을 받아 Ollama API를 호출
  2. Ollama LLM: 로컬에서 실행되는 대규모 언어 모델
  3. 시리얼 통신: Python과 아두이노 간 데이터 전송
  4. Arduino: 하드웨어 제어

사전 준비

1. Ollama 설치 및 모델 다운로드

# Ollama 설치 (https://ollama.ai)
# 모델 다운로드
ollama pull llama3.2:latest

2. Python 라이브러리 설치

pip install requests pyserial

3. Arduino 시리얼 통신 준비

아두이노는 시리얼 통신을 통해 명령을 받을 수 있도록 설정되어 있어야 한다.

Python 코드

기본 구조

import requests
import json
import serial
import time

# Ollama 설정
OLLAMA_URL = "http://localhost:11434/api/chat"
MODEL = "llama3.2:latest"

# 시리얼 통신 설정
SERIAL_PORT = "/dev/ttyACM0"  # Linux/Mac
# SERIAL_PORT = "COM3"        # Windows
BAUD = 9600

# 시스템 프롬프트: LLM이 아두이노 제어 명령을 생성하도록 지시
SYSTEM_PROMPT = """사용자의 요청을 분석하여 아두이노 제어 명령을 생성하세요.
명령 형식: LED_ON, LED_OFF, MOTOR_START, MOTOR_STOP 등
JSON 형식으로 응답하세요: {"command": "명령어", "reply": "사용자에게 보여줄 응답"}
"""

def call_ollama(user_input: str) -> dict:
    """Ollama API를 호출하여 응답을 받는다."""
    payload = {
        "model": MODEL,
        "messages": [
            # role 옵션: "system" (시스템 프롬프트), "user" (사용자 입력), "assistant" (LLM 응답)
            {"role": "system", "content": SYSTEM_PROMPT},  # 시스템 프롬프트: LLM의 역할과 동작 방식 정의
            {"role": "user", "content": user_input}         # 사용자 입력
        ],
        "stream": False,      # False: 전체 응답을 한 번에 받음, True: 실시간 스트리밍
        "format": "json"      # JSON 형식으로 응답 받기 (파싱 용이)
    }

    response = requests.post(OLLAMA_URL, json=payload, timeout=30)
    response.raise_for_status()

    content = response.json()["message"]["content"]
    return json.loads(content)

def send_to_arduino(ser: serial.Serial, command: str):
    """아두이노에 명령을 전송한다."""
    ser.write((command + "\n").encode("utf-8"))
    ser.flush()

def main():
    # 시리얼 포트 열기
    ser = serial.Serial(SERIAL_PORT, BAUD, timeout=1.0)
    time.sleep(1.5)  # 연결 대기

    print("시스템 준비 완료. 명령을 입력하세요.")

    while True:
        user_input = input("\n입력> ").strip()

        if user_input.lower() in ("quit", "exit"):
            break

        # LLM 호출
        result = call_ollama(user_input)
        command = result.get("command", "")
        reply = result.get("reply", "")

        # 아두이노에 명령 전송
        if command:
            send_to_arduino(ser, command)
            print(f"명령 전송: {command}")

        print(f"응답: {reply}")

    ser.close()

if __name__ == "__main__":
    main()

Arduino 코드 예시

아두이노는 시리얼 통신으로 명령을 받아 처리한다.

void setup() {
  Serial.begin(9600);
  pinMode(LED_BUILTIN, OUTPUT);
}

void loop() {
  if (Serial.available() > 0) {
    String command = Serial.readStringUntil('\n');
    command.trim();

    if (command == "LED_ON") {
      digitalWrite(LED_BUILTIN, HIGH);
      Serial.println("LED 켜짐");
    }
    else if (command == "LED_OFF") {
      digitalWrite(LED_BUILTIN, LOW);
      Serial.println("LED 꺼짐");
    }
    else {
      Serial.println("알 수 없는 명령");
    }
  }
}

동작 흐름

  1. 사용자 입력: "LED를 켜줘"
  2. LLM 분석: Python이 Ollama API를 호출하여 명령 분석
  3. 명령 생성: LLM이 {"command": "LED_ON", "reply": "LED를 켰습니다"} 생성
  4. 시리얼 전송: Python이 "LED_ON" 명령을 아두이노에 전송
  5. 하드웨어 제어: 아두이노가 LED를 켜고 응답 전송

고급 기능

대화 기록 유지

LLM이 이전 대화를 기억하도록 대화 기록을 유지할 수 있다.

conversation_history = []

def call_ollama_with_history(user_input: str) -> dict:
    messages = [
        # role 옵션: "system" (시스템 프롬프트), "user" (사용자 입력), "assistant" (LLM 응답)
        {"role": "system", "content": SYSTEM_PROMPT}  # 시스템 프롬프트: LLM의 역할과 동작 방식 정의
    ]

    # 대화 기록 추가
    messages.extend(conversation_history)
    messages.append({"role": "user", "content": user_input})  # 사용자 입력 추가

    payload = {
        "model": MODEL,
        "messages": messages,
        "stream": False,      # False: 전체 응답을 한 번에 받음, True: 실시간 스트리밍
        "format": "json"      # JSON 형식으로 응답 받기 (파싱 용이)
    }

    response = requests.post(OLLAMA_URL, json=payload, timeout=30)
    result = json.loads(response.json()["message"]["content"])

    # 대화 기록 업데이트
    conversation_history.append({"role": "user", "content": user_input})  # 사용자 입력 저장
    conversation_history.append({"role": "assistant", "content": result.get("reply", "")})  # LLM 응답 저장

    # 최대 20개 대화만 유지
    if len(conversation_history) > 20:
        conversation_history = conversation_history[-20:]

    return result

에러 처리

def safe_call_ollama(user_input: str) -> dict:
    try:
        return call_ollama(user_input)
    except requests.exceptions.RequestException as e:
        print(f"Ollama 연결 오류: {e}")
        return {"command": "", "reply": "연결 오류가 발생했습니다."}
    except json.JSONDecodeError:
        print("JSON 파싱 오류")
        return {"command": "", "reply": "응답을 처리할 수 없습니다."}

주의사항

  1. 시리얼 포트 확인: 시스템에 따라 시리얼 포트 경로가 다를 수 있다
    • Linux/Mac: /dev/ttyACM0, /dev/ttyUSB0
    • Windows: COM3, COM4
  2. 보드레이트 일치: Python과 Arduino의 보드레이트가 일치해야 한다
  3. 타임아웃 설정: 시리얼 통신 타임아웃을 적절히 설정
  4. 명령 형식: LLM이 생성하는 명령 형식을 일관되게 유지

활용 예시

  • 음성 인식 연동: 음성을 텍스트로 변환 후 LLM 분석
  • 센서 데이터 기반 제어: 센서 값을 LLM에 전달하여 상황에 맞는 제어
  • 자연어 명령: "조금 더 밝게", "천천히 움직여" 등 자연어 명령 처리

참고 자료