Python OpenCV RTSP 串流影片
介紹如何在 Python 中使用 OpenCV 搭配多行程(multiprocessing)的方式開啟、顯示、處理與儲存 RTSP 串流影片。
測試用 RTSP 串流網路上有一些測試用的 RTSP 串流伺服器,可用於程式的開發與測試: 安裝 OpenCV安裝 Python 的 OpenCV 模組: - # 安裝 Python 的 OpenCV 模組
- pip3 install opencv-python
複製代碼
開啟與顯示 RTSP 串流影像OpenCV 中的 VideoCapture() 函數可以用來開啟各種不同的影片來源,包含影片檔案、網路攝影機、RTSP 串流等,以下是使用 OpenCV 開啟 RTSP 串流影片的範例。 - import cv2
- if __name__ == '__main__':
- # 開啟 RTSP 串流
- vidCap = cv2.VideoCapture('rtsp://ipcam.stream:8554/bars')
- # 建立視窗
- cv2.namedWindow('image_display', cv2.WINDOW_AUTOSIZE)
- while True:
- # 從 RTSP 串流讀取一張影像
- ret, image = vidCap.read()
- if ret:
- # 顯示影像
- cv2.imshow('image_display', image)
- cv2.waitKey(10)
- else:
- # 若沒有影像跳出迴圈
- break
- # 釋放資源
- vidCap.release()
- # 關閉所有 OpenCV 視窗
- cv2.destroyAllWindows()
複製代碼
上面這段 Python 指令稿執行之後,就會開啟一個 OpenCV 的視窗,顯示 RTSP 串流的影像:
Python OpenCV RTSP 串流影片
OpenCV 視窗顯示 RTSP 串流影像多行程版本若在 CPU 處理速度比較慢的機器上,或是遇到影格率(frame rate)較高的影片,可能會出現單一執行緒無法及時處理的問題,這時候就可以考慮使用多行程(multiprocessing)的方式,以一個行程專門收取 RTSP 串流影像,然後將收進來的影像透過佇列(queue)交給另外一個行程來進行顯示,這樣就可以改善整體程式的處理效能,比較不會發生程式處理速度跟不上影片速度的問題。 - from multiprocessing import Process, Queue
- import cv2
- def image_display(taskqueue):
- # 建立視窗
- cv2.namedWindow('image_display', cv2.WINDOW_AUTOSIZE)
- while True:
- # 從工作佇列取得影像
- image = taskqueue.get()
- # 若沒有影像則終止迴圈
- if image is None: break
- # 顯示影像
- cv2.imshow('image_display', image)
- cv2.waitKey(10)
- if __name__ == '__main__':
- # 開啟 RTSP 串流
- vidCap = cv2.VideoCapture('rtsp://ipcam.stream:8554/bars')
- # 建立工作佇列
- taskqueue = Queue()
- # 建立並執行工作行程
- proc = Process(target=image_display, args=(taskqueue,))
- proc.start()
- while True:
- # 從 RTSP 串流讀取一張影像
- ret, image = vidCap.read()
- if ret:
- # 將影像放入工作佇列
- taskqueue.put(image)
- else:
- # 若沒有影像跳出迴圈
- break
- # 傳入 None 終止工作行程
- taskqueue.put(None)
- # 等待工作行程結束
- proc.join()
- # 釋放資源
- vidCap.release()
- # 關閉所有 OpenCV 視窗
- cv2.destroyAllWindows()
複製代碼
儲存 RTSP 串流影像以下是使用 OpenCV 讀取 RTSP 串流影像之後,將其儲存成影片檔案的範例,在儲存影片時維持原影片的解析度與影格率,而編碼則採用 mp4v,靠著計算影格數的方式錄製 10 秒鐘的 RTSP 串流影像,儲存為 output.mp4:
- from multiprocessing import Process, Queue
- import cv2
- def image_save(taskqueue, width, height, fps):
- # 指定影片編碼
- #fourcc = cv2.VideoWriter_fourcc(*'XVID')
- #fourcc = cv2.VideoWriter_fourcc(*'H264')
- fourcc = cv2.VideoWriter_fourcc(*'mp4v')
- # 建立 VideoWriter 物件
- writer = cv2.VideoWriter('output.mp4', fourcc, fps, (width, height))
- while True:
- # 從工作佇列取得影像
- image = taskqueue.get()
- # 若沒有影像則終止迴圈
- if image is None: break
- # 儲存影像
- writer.write(image)
- # 釋放資源
- writer.release()
- if __name__ == '__main__':
- # 開啟 RTSP 串流
- vidCap = cv2.VideoCapture('rtsp://ipcam.stream:8554/bars')
- # 取得影像的尺寸大小
- width = int(vidCap.get(cv2.CAP_PROP_FRAME_WIDTH))
- height = int(vidCap.get(cv2.CAP_PROP_FRAME_HEIGHT))
- # 取得影格率
- fps = vidCap.get(cv2.CAP_PROP_FPS)
- # 建立工作佇列
- taskqueue = Queue()
- # 建立並執行工作行程
- proc = Process(target=image_save, args=(taskqueue, width, height, fps))
- proc.start()
- # 計數器
- frame_counter = 0
- # 總錄製幀數(10 秒鐘)
- total_frames = fps * 10
- while frame_counter < total_frames:
- # 從 RTSP 串流讀取一張影像
- ret, image = vidCap.read()
- if ret:
- # 將影像放入工作佇列
- taskqueue.put(image)
- frame_counter += 1
- else:
- # 若沒有影像跳出迴圈
- break
- # 傳入 None 終止工作行程
- taskqueue.put(None)
- # 等待工作行程結束
- proc.join()
- # 釋放資源
- vidCap.release()
複製代碼
儲存長時間串流影片若需要儲存長時間的 RTSP 串流影片,可以利用以下範例,將串流切成固定長度的影片,依照編號或時間戳記來命名儲存的影片檔案,方便後續處理。 - from multiprocessing import Process, Queue
- import cv2
- from datetime import datetime
- def image_save(taskqueue, width, height, fps, frames_per_file):
- # 指定影片編碼
- #fourcc = cv2.VideoWriter_fourcc(*'XVID')
- #fourcc = cv2.VideoWriter_fourcc(*'H264')
- fourcc = cv2.VideoWriter_fourcc(*'mp4v')
- writer = None
- while True:
- # 從工作佇列取得影像
- image, frame_counter = taskqueue.get()
- # 若沒有影像則終止迴圈
- if image is None: break
- if frame_counter % frames_per_file == 0:
- if writer: writer.release()
- # 建立 VideoWriter 物件(以數字編號)
- # index = int(frame_counter // frames_per_file)
- # writer = cv2.VideoWriter(f'output-{index}.mp4', fourcc, fps, (width, height))
- # 建立 VideoWriter 物件(以時間命名)
- now = datetime.now()
- timestamp = now.strftime("%Y-%m-%d-%H-%M-%S")
- writer = cv2.VideoWriter(f'output-{timestamp}.mp4', fourcc, fps, (width, height))
- # 儲存影像
- writer.write(image)
- # 釋放資源
- writer.release()
- if __name__ == '__main__':
- # 開啟 RTSP 串流
- vidCap = cv2.VideoCapture('rtsp://ipcam.stream:8554/bars')
- # 取得影像的尺寸大小
- width = int(vidCap.get(cv2.CAP_PROP_FRAME_WIDTH))
- height = int(vidCap.get(cv2.CAP_PROP_FRAME_HEIGHT))
- # 取得影格率
- fps = vidCap.get(cv2.CAP_PROP_FPS)
- # 建立工作佇列
- taskqueue = Queue()
- # 計數器
- frame_counter = 0
- # 總錄製幀數(30 秒鐘)
- total_frames = fps * 30
- # 每個檔案的幀數(10 秒鐘)
- frames_per_file = fps * 10
- # 建立並執行工作行程
- proc = Process(target=image_save, args=(taskqueue, width, height, fps, frames_per_file))
- proc.start()
- while frame_counter < total_frames:
- # 從 RTSP 串流讀取一張影像
- ret, image = vidCap.read()
- if ret:
- # 將影像放入工作佇列
- taskqueue.put((image, frame_counter))
- frame_counter += 1
- else:
- # 若沒有影像跳出迴圈
- break
- # 傳入 None 終止工作行程
- taskqueue.put((None, None))
- # 等待工作行程結束
- proc.join()
- # 釋放資源
- vidCap.release()
複製代碼
這裡為了方便起見,檔案名稱用的時間戳記是放在工作行程中產生的,所以時間可能會有一些誤差,如果需要非常精確的時間,就要將時間戳記改為接收串流時同時取得時間。
錄製機器作動影片這裡我的應用場景是將一台網路攝影機架設在工廠內,鏡頭對準特定的機台,然後我希望透過 RTSP 串流影片監看機台的狀況,當機台有動作時自動將作動過程的影片錄製下來,以利後續的 AI 分析,以下是我在開發過程中所用的基本架構範例。 - from multiprocessing import Process, Queue
- import cv2
- from datetime import datetime
- import numpy as np
- # 開發模式
- DEV_MODE = False
- def printLogMsg(msg):
- now = datetime.now()
- timestamp = now.strftime("%Y/%m/%d %H:%M:%S")
- print("[{}] {}".format(timestamp, msg))
- def image_save(taskqueue, width, height, fps):
- # 指定影片編碼
- fourcc = cv2.VideoWriter_fourcc(*'XVID')
- #fourcc = cv2.VideoWriter_fourcc(*'H264')
- #fourcc = cv2.VideoWriter_fourcc(*'mp4v')
- writer = None
- while True:
- # 從工作佇列取得影像
- image, frameCounter = taskqueue.get()
- # 若沒有影像則終止迴圈
- if image is None: break
- if frameCounter == 0:
- if writer: writer.release()
- # 建立 VideoWriter 物件(以時間命名)
- now = datetime.now()
- timestamp = now.strftime("%Y%m%d-%H%M%S")
- writer = cv2.VideoWriter(f'test-output-{timestamp}.avi', fourcc, fps, (width, height))
- # 儲存影像
- writer.write(image)
- # 釋放資源
- if writer: writer.release()
- def captureRTSP(src):
- # 開啟 RTSP 串流
- vidCap = cv2.VideoCapture(src)
- # 取得影像的尺寸大小
- imgWidth = int(vidCap.get(cv2.CAP_PROP_FRAME_WIDTH))
- imgHeight = int(vidCap.get(cv2.CAP_PROP_FRAME_HEIGHT))
- printLogMsg(f"Size: {imgWidth} x {imgHeight}")
- # 取得影格率
- fps = vidCap.get(cv2.CAP_PROP_FPS)
- printLogMsg(f"FPS: {fps}")
- # 建立工作佇列
- taskqueue = Queue()
- # 目標影像 FPS
- desired_fps = 15
- # 建立並執行工作行程
- proc = Process(target=image_save, args=(taskqueue, imgWidth, imgHeight, desired_fps))
- proc.start()
- avgImage = None
- avgImageFloat = None
- # 計數器
- frameCounter = 0
- while vidCap.isOpened():
- # 從 RTSP 串流讀取一張影像
- ret, image = vidCap.read()
- if ret:
- # 忽略黑白影像(紅外線)
- if np.array_equal(image[:,:,0], image[:,:,1]) and np.array_equal(image[:,:,0], image[:,:,2]):
- avgImage = None
- avgImageFloat = None
- if frameCounter:
- printLogMsg(f"Stop recording (frame counter = {frameCounter})")
- frameCounter = 0
- continue
- # 取出中央部分影像,作為變動依據
- cropImage = image[(imgHeight//4):(imgHeight//4*3), (imgWidth//8*3):(imgWidth//8*5)]
- centerImage = cv2.resize(cropImage, (imgHeight//4, imgWidth//8))
- centerArea = imgHeight//4 * imgWidth//8
- # 初始化平均影像
- if avgImage is None:
- avgImage = cv2.blur(centerImage, (4, 4))
- avgImageFloat = np.float32(avgImage)
- printLogMsg("avgImage initialization")
- continue
- # 模糊處理
- blurImage = cv2.blur(centerImage, (4, 4))
- # 計算目前影格與平均影像的差異值
- diffImage = cv2.absdiff(avgImage, blurImage)
- # 將圖片轉為灰階
- grayImage = cv2.cvtColor(diffImage, cv2.COLOR_BGR2GRAY)
- # 篩選出變動程度大於門檻值的區域
- ret, binImage = cv2.threshold(grayImage, 5, 255, cv2.THRESH_BINARY)
- # 使用型態轉換函數去除雜訊
- kernel = np.ones((5, 5), np.uint8)
- binImage = cv2.morphologyEx(binImage, cv2.MORPH_OPEN, kernel, iterations=2)
- binImage = cv2.morphologyEx(binImage, cv2.MORPH_CLOSE, kernel, iterations=2)
- # 產生等高線
- countours, _ = cv2.findContours(binImage, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
- detected = False
- for c in countours:
- if cv2.contourArea(c) > centerArea / 5:
- detected = True
- if DEV_MODE:
- # 計算等高線的外框範圍
- (x, y, w, h) = cv2.boundingRect(c)
- # 畫出外框
- cv2.rectangle(centerImage, (x, y), (x + w, y + h), (0, 255, 0), 2)
- else:
- break
- if DEV_MODE:
- # 顯示偵測結果影像
- cv2.imshow('image', centerImage)
- cv2.imshow('avgImage', avgImage)
- cv2.imshow('blurImage', blurImage)
- if cv2.waitKey(1) & 0xFF == ord('q'):
- # 傳入 None 終止工作行程
- taskqueue.put((None, None))
- break
- # 更新平均影像
- cv2.accumulateWeighted(blurImage, avgImageFloat, 0.1)
- avgImage = cv2.convertScaleAbs(avgImageFloat)
- # 將影像放入工作佇列
- if detected:
- if DEV_MODE == False:
- taskqueue.put((image, frameCounter))
- if frameCounter == 0:
- printLogMsg("Start recording")
- frameCounter += 1
- else:
- if frameCounter:
- printLogMsg(f"Stop recording (frame counter = {frameCounter})")
- frameCounter = 0
- continue
- else:
- # 若沒有影像跳出迴圈
- printLogMsg("no frame")
- break
- # 傳入 None 終止工作行程
- taskqueue.put((None, None))
- # 等待工作行程結束
- proc.join()
- # 釋放資源
- vidCap.release()
- # 關閉所有 OpenCV 視窗
- cv2.destroyAllWindows()
- if __name__ == '__main__':
- captureRTSP('rtsp://ipcam.stream:8554/bars')
複製代碼
在現場的網路攝影機是 24 小時全天運作的,但是機器作動的頻率很低,大部分的時間都是處於閒置狀態,而在機器閒置的時間,現場的燈也都是關閉的。正常開燈的時候網路攝影機使用一般的攝像頭,對應的影格率(FPS)為 15,但當網路攝影機遇到關燈的狀態時,會自動遷換為紅外線攝像頭,影像雖然還是 RGB 格式,但是色彩會轉為灰階,而此時對應的影格率(FPS)就會降為 10。 由於機器只會在開燈的時候作動,所以我在程式中加上判斷影像色彩的條件,若整張影像的 RGB 三個 channels 都完全有相同的值,就判定為紅外線影像,將其直接忽略,減少不必要的計算。 另外為了方便開發與測試,我們靠著 DEV_MODE 變數設定開發模式,當程式處於開發模式的時候,會開啟監看視窗,顯示幾種關鍵的影像內容,同時開發模式也不會儲存任何影像。 在儲存影片檔案時,會自動將獨立的影片片段分開儲存,檔案名稱則自動以開始錄製的時間來命名,影片的解析度、影格率都維持跟來源影片相同。
https://officeguide.cc/python-opencv-open-display-save-rtsp-stream-tutorial-examples/ 參考資料
|