프로젝트형 IoT 서비스 개발 4회차/3. 게이트웨이 디바이스 제어

[Day60] 2022-04-22(금) 라즈베리파이9 - 카메라, 이벤트 처리 - 김서연 강사님

powerstone 2022. 4. 22. 18:32
728x90

[1] 카메라

  1. 카메라 세팅

https://powerstone829.tistory.com/76

 

[Day52] 2022-04-12(화) 라즈베리파이 준비6 - Camera 연결 - 김서연 강사님

[1] Camera 연결 1. 보드와 연결 2. bullseye 버전 앱 사용 (신버전) - 명령어: ~$ libcamera-jpeg -o pic.jpg - bullseye에서는 Picamear 라이브러리와 raspistill 등 구버전 기능을 지원하지 않기 때문에, bu..

powerstone829.tistory.com

  2. Legacy 카메라 사용 확인

    1) 사진 촬영

      - 명령어 ~$ raspistill -o 파일명(.jpg) -t 타이머시간(ms)

      - -vf 옵션: 수직 반전

      - -vh 옵션: 수평 반전

    2) 동영상 촬영

      - 명령어 ~$ raspivid -o 파일명(.h264) -t 촬영시간(ms)

      - 동영상 촬영 파일 확인 : Legacy 카메라 disable 시키고 VNC로 접속하여 확인

      

  3. Python에서 카메라 활용

    1) Picamera 모듈 설치 (라즈베리파이에만 사용 가능)

      - 명령어 ~$ sudo apt-get install python-picamera

    2) 사진 촬영 예제1

      - picamera 모듈PiCamera 클래스 import

# pycamera_test1.py
from picamera import PiCamera
from time import sleep

# 사진 찍기
camera = PiCamera() # Picamera 객체 생성
camera.start_preview()  # 미리보기 화면 시작
# 카메라의 센서가 빛의 수준을 감지할 시간이 있어야 하므로 이미지를 캡쳐하기 전에 최소 2초는 sleep
sleep(10)
camera.capture("/home/pi/mywork/picamera/image.jpg")    # 사진 촬영(매개변수로 파일명 또는 경로+파일명)
camera.stop_preview()

    3) 사진 촬영 예제2

      - 사진 5장 연속으로 촬영

# pycamera_test2.py
from picamera import PiCamera
from time import sleep

camera = PiCamera()
camera.start_preview()

for i in range(1, 6):
    sleep(2)
    camera.capture("/home/pi/mywork/picamera/image"+str(i)+".jpg")
    
camera.stop_preview()

    4) 사진 촬영 예제3

# pycamera_test3.py
from picamera import PiCamera, Color
from time import sleep

# 사진 찍기
camera = PiCamera() # Picamera 객체 생성
camera.start_preview()  # 미리보기 화면 시작

# 사진에 text 넣기
camera.annotate_text = "raspberry PI"
camera.annotate_text_size = 50
camera.annotate_background = Color("blue")
camera.annotate_foreground = Color("yellow")

# 카메라의 센서가 빛의 수준을 감지할 시간이 있어야 하므로 이미지를 캡쳐하기 전에 최소 2초는 sleep
sleep(10)
camera.capture("/home/pi/mywork/picamera/image.jpg")    # 사진 촬영(매개변수로 파일명 또는 경로)
camera.stop_preview()

    5) 사진 촬영 예제4

# pycamera_test4.py
from picamera import PiCamera
from time import sleep

# 사진 찍기
camera = PiCamera() # Picamera 객체 생성
# 해상도 적용 - 파이 카메라로 최대 해상도는 25XX, 19XX
camera.resolution = (2592, 1944)
camera.framerate = 15
camera.start_preview()  # 미리보기 화면 시작
camera.rotation = 180   # 사진 회전
# 카메라의 센서가 빛의 수준을 감지할 시간이 있어야 하므로 이미지를 캡쳐하기 전에 최소 2초는 sleep
sleep(10)
camera.capture("/home/pi/mywork/picamera/image.jpg")    # 사진 촬영(매개변수로 파일명 또는 경로)
camera.stop_preview()

    6) 동영상 촬영 예제

# pycamera_test1.py
from picamera import PiCamera
from time import sleep

# 사진 찍기
camera = PiCamera() # Picamera 객체 생성
camera.start_preview()  # 미리보기 화면 시작
# 동영상 저장 시작
camera.start_recording("/home/pi/mywork/picamera/video.h264")
# 카메라의 센서가 빛의 수준을 감지할 시간이 있어야 하므로 이미지를 캡쳐하기 전에 최소 2초는 sleep
sleep(10)
camera.stop_recording()
camera.stop_preview()

  4. 이미지 파일을 MQTT 통신을 통해 전송

    - 카메라로 촬영한 이미지 파일을 mqtt 통신을 통해서 전송

    - PC는 subscriber

      - broker가 보내주는 바이너리데이터(이미지 파일)의 내용을 받아서 파일로 저장

    - 라즈베리파이는 publisher

      - 라즈베리파이 카메라로 영상을 촬영

        2초에 한 번씩 촬영해서 5개 파일 전송

      - 생성된 이미지 파일을 읽어서 바이트배열로 publish

    1) Raspberry Pi - Publisher

# pycamera_raspberry_exam.py
from picamera import PiCamera
import time
from paho.mqtt import publish

camera = PiCamera()
camera.start_preview()

for i in range(1, 6):
    time.sleep(2)
    camera.capture("/home/pi/mywork/picamera/mqttimage%d.jpg" % i)
    f = open("/home/pi/mywork/picamera/mqttimage%d.jpg" % i, "rb")
    # fArray = bytearray(f.read())  # 그냥 보내도 보내짐..
    # print(fArray)
    publish.single("iot/cam", f.read(), hostname="XXX.XXX.XXX.XXX")
    f.close()
    
camera.stop_preview()

    2) PC - Subscriber

# pycamera_pc_exam.py
from paho.mqtt import client

i = 0
def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print("연결 완료")
        client.subscribe("iot/cam")
    else:
        print("연결 실패")

def on_message(client, userdata, msg):
    global i
    i += 1
    f = open("./picamera/mqttimage%d.jpg" % i, "wb")
    f.write(msg.payload)
    # print(msg.payload)
    f.close()


mqttClient = client.Client()
mqttClient.on_connect = on_connect
mqttClient.on_message = on_message
mqttClient.connect("XXX.XXX.XXX.XXX", 1883, 60)
mqttClient.loop_forever()

 

[2] 이벤트 처리

  - 특정 이벤트가 발생했을 때, 원하는 코드가 실행되도록 구현하는 방법

  - threading 모듈의 Event 클래스 import

  - signal 모듈 import

  1. 이벤트 객체 생성

    - 객체명 = Event()

    - flag 변수를 갖고 있다.

    - 객체명.set( ) : flag 변수를 True 로 설정

    - 객체명.clear( ) : flag 변수를 False 로 설정

    - 객체명.wait( ) : flag 변수가 True면 바로 리턴되고, False라면 True가 될 때까지 대기

    - 객체명.is_set( ) : flag 변수의 현재 값을 boolean 타입으로 반환 ( isSet( ) 도 동일 )

  2. signal 함수에 들어갈 callback 함수 정의

    - def 함수명(signum, frame)

    - callback 함수는 매개변수가 정해져 있으므로, 두 개의 매개변수를 반드시 지정(변수명은 마음대로)

  3. 원하는 이벤트에 callback 함수 연결

    - signal.signal(이벤트 변수, callback함수)

    - 이벤트 변수 : 아래 사이트에 다양하게 나와 있다. 예제에서는 그 중 signal.SIGINT (키보드 인터럽트) 사용하였다.

    - callback 함수 : 2.에서 정의한 함수명을 입력

출처 - https://docs.python.org/3/library/signal.html

https://docs.python.org/3/library/signal.html

 

signal — Set handlers for asynchronous events — Python 3.10.4 documentation

signal — Set handlers for asynchronous events This module provides mechanisms to use signal handlers in Python. General rules The signal.signal() function allows defining custom handlers to be executed when a signal is received. A small number of default

docs.python.org

  4. 이벤트 처리 예제

# mymqtt.py
import paho.mqtt.client as mqtt
from mysensor import PirSensor
import RPi.GPIO as gpio
from threading import Thread, Event
from led import LED
import signal
# Event 객체 - 쓰레드 간의 간단한 통신을 위해서 사용되는 객체 - 키보드 인터럽트 시그널을 감지하고 미리 등록한 이벤트가 발생했을 때 처리할 함수가 실행되도록 구현
# 1. 이벤트 객체를 만들기
#    내부적으로 flag 변수를 갖고 있다.
#    is_set : 내부플래그가 True로 설정되었으면, True 반환
#    set : 내부 플래그를 True로 설정
# 2. 이벤트가 발생되면 실행할 callback 함수 정의
# 3. 키보드인터럽트 시그널을 리스닝하고 있다가 이벤트가 발생하면 반응할 수 있도록 등록 - signal 모듈의 signal 함수가 담당

class MqttWorker:
    def __init__(self):
        self.client = mqtt.Client()
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message
        # 1. 이벤트 객체 생성
        self.exit_event = Event()
        
        self.led = LED()
        self.pir = PirSensor(self.client)
        self.pir.start()
    
    # 2. 키보드 인터럽트 이벤트가 발생되면 호출할 콜백함수
    def signal_handler(self, signum, frame):
        print("signal_handler+++++++++++++++++++++++++")
        self.exit_event.set()   # 이벤트 객체가 갖고 있는 플래그변수가 True로 세팅
        self.led.clean()
        # 현재 이벤트 발생을 체크하고 다른 작업을 수행하기 위한 코드 - 보통 별도의 메소드에서 처리
        if self.exit_event.is_set():
            print("이벤트 객체의 플래그 변수가 True로 바뀜")
            exit(0)
        
    def mymqtt_connect(self):
        try:
            print("브로커 연결 시작하기")
            self.client.connect("XXX.XXX.XXX.XXX", 1883, 60)
            # 3. 키보드 인터럽트 시그널(SIGINT)을 리스닝하고 있다가 키보드 인터럽트가 발생되면 등록한 함수(self.signal_handler)를 호출
            signal.signal(signal.SIGINT, self.signal_handler)
            
            mythreadobj = Thread(target=self.client.loop_forever)
            mythreadobj.start()
        except KeyboardInterrupt:
            pass
        finally:
            print("종료")
            
    def on_connect(self, client, userdata, flags, rc):
        print("connect..."+str(rc))
        if rc == 0:
            self.client.subscribe("iot/led")
        else:
            print("연결 실패.....")
            
    def on_message(self, client, userdata, message):
        myval = message.payload.decode("utf-8")
        print(message.topic+"-----"+myval)
        if myval == "led_on":
            self.led.led_on()
        elif myval == "led_off":
            self.led.led_off()

 

- 끝 -  

728x90