[Day60] 2022-04-22(금) 라즈베리파이9 - 카메라, 이벤트 처리 - 김서연 강사님
[1] 카메라
1. 카메라 세팅
https://powerstone829.tistory.com/76
[Day52] 2022-04-12(화) 라즈베리파이 준비6 - Camera 연결 - 김서연 강사님
[1] Camera 연결 1. 보드와 연결 2. bullseye 버전 앱 사용 (신버전) - 명령어: ~$ libcamera-jpeg -o pic.jpg - bullseye에서는 Picamear 라이브러리와 raspistill 등 구버전 기능을 지원하지 않기 때문에, bu..
powerstone829.tistory.com
2. Legacy 카메라 사용 확인
1) 사진 촬영
- 명령어 ~$ raspistill -o 파일명(.jpg) -t 타이머시간(ms)
- -vf 옵션: 수직 반전
- -vh 옵션: 수평 반전
2) 동영상 촬영
- 명령어 ~$ raspivid -o 파일명(.h264) -t 촬영시간(ms)
- 동영상 촬영 파일 확인 : Legacy 카메라 disable 시키고 VNC로 접속하여 확인
3. Python에서 카메라 활용
1) Picamera 모듈 설치 (라즈베리파이에만 사용 가능)
- 명령어 ~$ sudo apt-get install python-picamera
2) 사진 촬영 예제1
- picamera 모듈의 PiCamera 클래스 import
# pycamera_test1.py
from picamera import PiCamera
from time import sleep
# 사진 찍기
camera = PiCamera() # Picamera 객체 생성
camera.start_preview() # 미리보기 화면 시작
# 카메라의 센서가 빛의 수준을 감지할 시간이 있어야 하므로 이미지를 캡쳐하기 전에 최소 2초는 sleep
sleep(10)
camera.capture("/home/pi/mywork/picamera/image.jpg") # 사진 촬영(매개변수로 파일명 또는 경로+파일명)
camera.stop_preview()
3) 사진 촬영 예제2
- 사진 5장 연속으로 촬영
# pycamera_test2.py
from picamera import PiCamera
from time import sleep
camera = PiCamera()
camera.start_preview()
for i in range(1, 6):
sleep(2)
camera.capture("/home/pi/mywork/picamera/image"+str(i)+".jpg")
camera.stop_preview()
4) 사진 촬영 예제3
# pycamera_test3.py
from picamera import PiCamera, Color
from time import sleep
# 사진 찍기
camera = PiCamera() # Picamera 객체 생성
camera.start_preview() # 미리보기 화면 시작
# 사진에 text 넣기
camera.annotate_text = "raspberry PI"
camera.annotate_text_size = 50
camera.annotate_background = Color("blue")
camera.annotate_foreground = Color("yellow")
# 카메라의 센서가 빛의 수준을 감지할 시간이 있어야 하므로 이미지를 캡쳐하기 전에 최소 2초는 sleep
sleep(10)
camera.capture("/home/pi/mywork/picamera/image.jpg") # 사진 촬영(매개변수로 파일명 또는 경로)
camera.stop_preview()
5) 사진 촬영 예제4
# pycamera_test4.py
from picamera import PiCamera
from time import sleep
# 사진 찍기
camera = PiCamera() # Picamera 객체 생성
# 해상도 적용 - 파이 카메라로 최대 해상도는 25XX, 19XX
camera.resolution = (2592, 1944)
camera.framerate = 15
camera.start_preview() # 미리보기 화면 시작
camera.rotation = 180 # 사진 회전
# 카메라의 센서가 빛의 수준을 감지할 시간이 있어야 하므로 이미지를 캡쳐하기 전에 최소 2초는 sleep
sleep(10)
camera.capture("/home/pi/mywork/picamera/image.jpg") # 사진 촬영(매개변수로 파일명 또는 경로)
camera.stop_preview()
6) 동영상 촬영 예제
# pycamera_test1.py
from picamera import PiCamera
from time import sleep
# 사진 찍기
camera = PiCamera() # Picamera 객체 생성
camera.start_preview() # 미리보기 화면 시작
# 동영상 저장 시작
camera.start_recording("/home/pi/mywork/picamera/video.h264")
# 카메라의 센서가 빛의 수준을 감지할 시간이 있어야 하므로 이미지를 캡쳐하기 전에 최소 2초는 sleep
sleep(10)
camera.stop_recording()
camera.stop_preview()
4. 이미지 파일을 MQTT 통신을 통해 전송
- 카메라로 촬영한 이미지 파일을 mqtt 통신을 통해서 전송
- PC는 subscriber
- broker가 보내주는 바이너리데이터(이미지 파일)의 내용을 받아서 파일로 저장
- 라즈베리파이는 publisher
- 라즈베리파이 카메라로 영상을 촬영
2초에 한 번씩 촬영해서 5개 파일 전송
- 생성된 이미지 파일을 읽어서 바이트배열로 publish
1) Raspberry Pi - Publisher
# pycamera_raspberry_exam.py
from picamera import PiCamera
import time
from paho.mqtt import publish
camera = PiCamera()
camera.start_preview()
for i in range(1, 6):
time.sleep(2)
camera.capture("/home/pi/mywork/picamera/mqttimage%d.jpg" % i)
f = open("/home/pi/mywork/picamera/mqttimage%d.jpg" % i, "rb")
# fArray = bytearray(f.read()) # 그냥 보내도 보내짐..
# print(fArray)
publish.single("iot/cam", f.read(), hostname="XXX.XXX.XXX.XXX")
f.close()
camera.stop_preview()
2) PC - Subscriber
# pycamera_pc_exam.py
from paho.mqtt import client
i = 0
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("연결 완료")
client.subscribe("iot/cam")
else:
print("연결 실패")
def on_message(client, userdata, msg):
global i
i += 1
f = open("./picamera/mqttimage%d.jpg" % i, "wb")
f.write(msg.payload)
# print(msg.payload)
f.close()
mqttClient = client.Client()
mqttClient.on_connect = on_connect
mqttClient.on_message = on_message
mqttClient.connect("XXX.XXX.XXX.XXX", 1883, 60)
mqttClient.loop_forever()
[2] 이벤트 처리
- 특정 이벤트가 발생했을 때, 원하는 코드가 실행되도록 구현하는 방법
- threading 모듈의 Event 클래스 import
- signal 모듈 import
1. 이벤트 객체 생성
- 객체명 = Event()
- flag 변수를 갖고 있다.
- 객체명.set( ) : flag 변수를 True 로 설정
- 객체명.clear( ) : flag 변수를 False 로 설정
- 객체명.wait( ) : flag 변수가 True면 바로 리턴되고, False라면 True가 될 때까지 대기
- 객체명.is_set( ) : flag 변수의 현재 값을 boolean 타입으로 반환 ( isSet( ) 도 동일 )
2. signal 함수에 들어갈 callback 함수 정의
- def 함수명(signum, frame)
- callback 함수는 매개변수가 정해져 있으므로, 두 개의 매개변수를 반드시 지정(변수명은 마음대로)
3. 원하는 이벤트에 callback 함수 연결
- signal.signal(이벤트 변수, callback함수)
- 이벤트 변수 : 아래 사이트에 다양하게 나와 있다. 예제에서는 그 중 signal.SIGINT (키보드 인터럽트) 사용하였다.
- callback 함수 : 2.에서 정의한 함수명을 입력
https://docs.python.org/3/library/signal.html
signal — Set handlers for asynchronous events — Python 3.10.4 documentation
signal — Set handlers for asynchronous events This module provides mechanisms to use signal handlers in Python. General rules The signal.signal() function allows defining custom handlers to be executed when a signal is received. A small number of default
docs.python.org
4. 이벤트 처리 예제
# mymqtt.py
import paho.mqtt.client as mqtt
from mysensor import PirSensor
import RPi.GPIO as gpio
from threading import Thread, Event
from led import LED
import signal
# Event 객체 - 쓰레드 간의 간단한 통신을 위해서 사용되는 객체 - 키보드 인터럽트 시그널을 감지하고 미리 등록한 이벤트가 발생했을 때 처리할 함수가 실행되도록 구현
# 1. 이벤트 객체를 만들기
# 내부적으로 flag 변수를 갖고 있다.
# is_set : 내부플래그가 True로 설정되었으면, True 반환
# set : 내부 플래그를 True로 설정
# 2. 이벤트가 발생되면 실행할 callback 함수 정의
# 3. 키보드인터럽트 시그널을 리스닝하고 있다가 이벤트가 발생하면 반응할 수 있도록 등록 - signal 모듈의 signal 함수가 담당
class MqttWorker:
def __init__(self):
self.client = mqtt.Client()
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
# 1. 이벤트 객체 생성
self.exit_event = Event()
self.led = LED()
self.pir = PirSensor(self.client)
self.pir.start()
# 2. 키보드 인터럽트 이벤트가 발생되면 호출할 콜백함수
def signal_handler(self, signum, frame):
print("signal_handler+++++++++++++++++++++++++")
self.exit_event.set() # 이벤트 객체가 갖고 있는 플래그변수가 True로 세팅
self.led.clean()
# 현재 이벤트 발생을 체크하고 다른 작업을 수행하기 위한 코드 - 보통 별도의 메소드에서 처리
if self.exit_event.is_set():
print("이벤트 객체의 플래그 변수가 True로 바뀜")
exit(0)
def mymqtt_connect(self):
try:
print("브로커 연결 시작하기")
self.client.connect("XXX.XXX.XXX.XXX", 1883, 60)
# 3. 키보드 인터럽트 시그널(SIGINT)을 리스닝하고 있다가 키보드 인터럽트가 발생되면 등록한 함수(self.signal_handler)를 호출
signal.signal(signal.SIGINT, self.signal_handler)
mythreadobj = Thread(target=self.client.loop_forever)
mythreadobj.start()
except KeyboardInterrupt:
pass
finally:
print("종료")
def on_connect(self, client, userdata, flags, rc):
print("connect..."+str(rc))
if rc == 0:
self.client.subscribe("iot/led")
else:
print("연결 실패.....")
def on_message(self, client, userdata, message):
myval = message.payload.decode("utf-8")
print(message.topic+"-----"+myval)
if myval == "led_on":
self.led.led_on()
elif myval == "led_off":
self.led.led_off()
- 끝 -