找回密碼
 註冊
搜索
查看: 103|回復: 0

[教學] Python 使用 OpenCV 開啟、顯示、處理與儲存 RTSP 串流影片教學與範例

[複製鏈接]
發表於 昨天 15:56 | 顯示全部樓層 |閱讀模式
Push to Facebook

Python OpenCV RTSP 串流影片

Python OpenCV  RTSP 串流影片
介紹如何在 Python 中使用 OpenCV 搭配多行程(multiprocessing)的方式開啟、顯示、處理與儲存 RTSP 串流影片。

測試用 RTSP 串流
網路上有一些測試用的 RTSP 串流伺服器,可用於程式的開發與測試:
安裝 OpenCV
安裝 Python 的 OpenCV 模組:
  1. # 安裝 Python 的 OpenCV 模組
  2. pip3 install opencv-python
複製代碼

開啟與顯示 RTSP 串流影像
OpenCV 中的 VideoCapture() 函數可以用來開啟各種不同的影片來源,包含影片檔案、網路攝影機、RTSP 串流等,以下是使用 OpenCV 開啟 RTSP 串流影片的範例。
  1. import cv2

  2. if __name__ == '__main__':

  3.     # 開啟 RTSP 串流
  4.     vidCap = cv2.VideoCapture('rtsp://ipcam.stream:8554/bars')

  5.     # 建立視窗
  6.     cv2.namedWindow('image_display', cv2.WINDOW_AUTOSIZE)

  7.     while True:
  8.         # 從 RTSP 串流讀取一張影像
  9.         ret, image = vidCap.read()

  10.         if ret:
  11.             # 顯示影像
  12.             cv2.imshow('image_display', image)
  13.             cv2.waitKey(10)
  14.         else:
  15.             # 若沒有影像跳出迴圈
  16.             break

  17.     # 釋放資源
  18.     vidCap.release()

  19.     # 關閉所有 OpenCV 視窗
  20.     cv2.destroyAllWindows()
複製代碼

上面這段 Python 指令稿執行之後,就會開啟一個 OpenCV 的視窗,顯示 RTSP 串流的影像:

Python OpenCV RTSP 串流影片

Python OpenCV  RTSP 串流影片

OpenCV 視窗顯示 RTSP 串流影像多行程版本
若在 CPU 處理速度比較慢的機器上,或是遇到影格率(frame rate)較高的影片,可能會出現單一執行緒無法及時處理的問題,這時候就可以考慮使用多行程(multiprocessing)的方式,以一個行程專門收取 RTSP 串流影像,然後將收進來的影像透過佇列(queue)交給另外一個行程來進行顯示,這樣就可以改善整體程式的處理效能,比較不會發生程式處理速度跟不上影片速度的問題。
  1. from multiprocessing import Process, Queue
  2. import cv2

  3. def image_display(taskqueue):
  4.     # 建立視窗
  5.     cv2.namedWindow('image_display', cv2.WINDOW_AUTOSIZE)

  6.     while True:
  7.         # 從工作佇列取得影像
  8.         image = taskqueue.get()

  9.         # 若沒有影像則終止迴圈
  10.         if image is None: break

  11.         # 顯示影像
  12.         cv2.imshow('image_display', image)
  13.         cv2.waitKey(10)

  14. if __name__ == '__main__':

  15.     # 開啟 RTSP 串流
  16.     vidCap = cv2.VideoCapture('rtsp://ipcam.stream:8554/bars')

  17.     # 建立工作佇列
  18.     taskqueue = Queue()

  19.     # 建立並執行工作行程
  20.     proc = Process(target=image_display, args=(taskqueue,))
  21.     proc.start()

  22.     while True:
  23.         # 從 RTSP 串流讀取一張影像
  24.         ret, image = vidCap.read()

  25.         if ret:
  26.             # 將影像放入工作佇列
  27.             taskqueue.put(image)
  28.         else:
  29.             # 若沒有影像跳出迴圈
  30.             break

  31.     # 傳入 None 終止工作行程
  32.     taskqueue.put(None)

  33.     # 等待工作行程結束
  34.     proc.join()

  35.     # 釋放資源
  36.     vidCap.release()

  37.     # 關閉所有 OpenCV 視窗
  38.     cv2.destroyAllWindows()
複製代碼


儲存 RTSP 串流影像
以下是使用 OpenCV 讀取 RTSP 串流影像之後,將其儲存成影片檔案的範例,在儲存影片時維持原影片的解析度與影格率,而編碼則採用 mp4v,靠著計算影格數的方式錄製 10 秒鐘的 RTSP 串流影像,儲存為 output.mp4:

  1. from multiprocessing import Process, Queue
  2. import cv2

  3. def image_save(taskqueue, width, height, fps):

  4.     # 指定影片編碼
  5.     #fourcc = cv2.VideoWriter_fourcc(*'XVID')
  6.     #fourcc = cv2.VideoWriter_fourcc(*'H264')
  7.     fourcc = cv2.VideoWriter_fourcc(*'mp4v')

  8.     # 建立 VideoWriter 物件
  9.     writer = cv2.VideoWriter('output.mp4', fourcc, fps, (width, height))

  10.     while True:
  11.         # 從工作佇列取得影像
  12.         image = taskqueue.get()

  13.         # 若沒有影像則終止迴圈
  14.         if image is None: break

  15.         # 儲存影像
  16.         writer.write(image)

  17.     # 釋放資源
  18.     writer.release()

  19. if __name__ == '__main__':

  20.     # 開啟 RTSP 串流
  21.     vidCap = cv2.VideoCapture('rtsp://ipcam.stream:8554/bars')

  22.     # 取得影像的尺寸大小
  23.     width = int(vidCap.get(cv2.CAP_PROP_FRAME_WIDTH))
  24.     height = int(vidCap.get(cv2.CAP_PROP_FRAME_HEIGHT))

  25.     # 取得影格率
  26.     fps = vidCap.get(cv2.CAP_PROP_FPS)

  27.     # 建立工作佇列
  28.     taskqueue = Queue()

  29.     # 建立並執行工作行程
  30.     proc = Process(target=image_save, args=(taskqueue, width, height, fps))
  31.     proc.start()

  32.     # 計數器
  33.     frame_counter = 0

  34.     # 總錄製幀數(10 秒鐘)
  35.     total_frames = fps * 10

  36.     while frame_counter < total_frames:
  37.         # 從 RTSP 串流讀取一張影像
  38.         ret, image = vidCap.read()

  39.         if ret:
  40.             # 將影像放入工作佇列
  41.             taskqueue.put(image)
  42.             frame_counter += 1
  43.         else:
  44.             # 若沒有影像跳出迴圈
  45.             break

  46.     # 傳入 None 終止工作行程
  47.     taskqueue.put(None)

  48.     # 等待工作行程結束
  49.     proc.join()

  50.     # 釋放資源
  51.     vidCap.release()
複製代碼


儲存長時間串流影片
若需要儲存長時間的 RTSP 串流影片,可以利用以下範例,將串流切成固定長度的影片,依照編號或時間戳記來命名儲存的影片檔案,方便後續處理。
  1. from multiprocessing import Process, Queue
  2. import cv2
  3. from datetime import datetime

  4. def image_save(taskqueue, width, height, fps, frames_per_file):

  5.     # 指定影片編碼
  6.     #fourcc = cv2.VideoWriter_fourcc(*'XVID')
  7.     #fourcc = cv2.VideoWriter_fourcc(*'H264')
  8.     fourcc = cv2.VideoWriter_fourcc(*'mp4v')

  9.     writer = None

  10.     while True:
  11.         # 從工作佇列取得影像
  12.         image, frame_counter = taskqueue.get()

  13.         # 若沒有影像則終止迴圈
  14.         if image is None: break

  15.         if frame_counter % frames_per_file == 0:

  16.             if writer: writer.release()

  17.             # 建立 VideoWriter 物件(以數字編號)
  18.             # index = int(frame_counter // frames_per_file)
  19.             # writer = cv2.VideoWriter(f'output-{index}.mp4', fourcc, fps, (width, height))

  20.             # 建立 VideoWriter 物件(以時間命名)
  21.             now = datetime.now()
  22.             timestamp = now.strftime("%Y-%m-%d-%H-%M-%S")
  23.             writer = cv2.VideoWriter(f'output-{timestamp}.mp4', fourcc, fps, (width, height))

  24.         # 儲存影像
  25.         writer.write(image)

  26.     # 釋放資源
  27.     writer.release()

  28. if __name__ == '__main__':

  29.     # 開啟 RTSP 串流
  30.     vidCap = cv2.VideoCapture('rtsp://ipcam.stream:8554/bars')

  31.     # 取得影像的尺寸大小
  32.     width = int(vidCap.get(cv2.CAP_PROP_FRAME_WIDTH))
  33.     height = int(vidCap.get(cv2.CAP_PROP_FRAME_HEIGHT))

  34.     # 取得影格率
  35.     fps = vidCap.get(cv2.CAP_PROP_FPS)

  36.     # 建立工作佇列
  37.     taskqueue = Queue()

  38.     # 計數器
  39.     frame_counter = 0

  40.     # 總錄製幀數(30 秒鐘)
  41.     total_frames = fps * 30

  42.     # 每個檔案的幀數(10 秒鐘)
  43.     frames_per_file = fps * 10

  44.     # 建立並執行工作行程
  45.     proc = Process(target=image_save, args=(taskqueue, width, height, fps, frames_per_file))
  46.     proc.start()

  47.     while frame_counter < total_frames:
  48.         # 從 RTSP 串流讀取一張影像
  49.         ret, image = vidCap.read()

  50.         if ret:
  51.             # 將影像放入工作佇列
  52.             taskqueue.put((image, frame_counter))
  53.             frame_counter += 1
  54.         else:
  55.             # 若沒有影像跳出迴圈
  56.             break

  57.     # 傳入 None 終止工作行程
  58.     taskqueue.put((None, None))

  59.     # 等待工作行程結束
  60.     proc.join()

  61.     # 釋放資源
  62.     vidCap.release()
複製代碼

這裡為了方便起見,檔案名稱用的時間戳記是放在工作行程中產生的,所以時間可能會有一些誤差,如果需要非常精確的時間,就要將時間戳記改為接收串流時同時取得時間。


錄製機器作動影片
這裡我的應用場景是將一台網路攝影機架設在工廠內,鏡頭對準特定的機台,然後我希望透過 RTSP 串流影片監看機台的狀況,當機台有動作時自動將作動過程的影片錄製下來,以利後續的 AI 分析,以下是我在開發過程中所用的基本架構範例。
  1. from multiprocessing import Process, Queue
  2. import cv2
  3. from datetime import datetime
  4. import numpy as np

  5. # 開發模式
  6. DEV_MODE = False

  7. def printLogMsg(msg):
  8.     now = datetime.now()
  9.     timestamp = now.strftime("%Y/%m/%d %H:%M:%S")
  10.     print("[{}] {}".format(timestamp, msg))

  11. def image_save(taskqueue, width, height, fps):

  12.     # 指定影片編碼
  13.     fourcc = cv2.VideoWriter_fourcc(*'XVID')
  14.     #fourcc = cv2.VideoWriter_fourcc(*'H264')
  15.     #fourcc = cv2.VideoWriter_fourcc(*'mp4v')

  16.     writer = None

  17.     while True:
  18.         # 從工作佇列取得影像
  19.         image, frameCounter = taskqueue.get()

  20.         # 若沒有影像則終止迴圈
  21.         if image is None: break

  22.         if frameCounter == 0:

  23.             if writer: writer.release()

  24.             # 建立 VideoWriter 物件(以時間命名)
  25.             now = datetime.now()
  26.             timestamp = now.strftime("%Y%m%d-%H%M%S")
  27.             writer = cv2.VideoWriter(f'test-output-{timestamp}.avi', fourcc, fps, (width, height))

  28.         # 儲存影像
  29.         writer.write(image)

  30.     # 釋放資源
  31.     if writer: writer.release()

  32. def captureRTSP(src):
  33.     # 開啟 RTSP 串流
  34.     vidCap = cv2.VideoCapture(src)

  35.     # 取得影像的尺寸大小
  36.     imgWidth = int(vidCap.get(cv2.CAP_PROP_FRAME_WIDTH))
  37.     imgHeight = int(vidCap.get(cv2.CAP_PROP_FRAME_HEIGHT))
  38.     printLogMsg(f"Size: {imgWidth} x {imgHeight}")

  39.     # 取得影格率
  40.     fps = vidCap.get(cv2.CAP_PROP_FPS)
  41.     printLogMsg(f"FPS: {fps}")

  42.     # 建立工作佇列
  43.     taskqueue = Queue()

  44.     # 目標影像 FPS
  45.     desired_fps = 15

  46.     # 建立並執行工作行程
  47.     proc = Process(target=image_save, args=(taskqueue, imgWidth, imgHeight, desired_fps))
  48.     proc.start()

  49.     avgImage = None
  50.     avgImageFloat = None

  51.     # 計數器
  52.     frameCounter = 0

  53.     while vidCap.isOpened():
  54.         # 從 RTSP 串流讀取一張影像
  55.         ret, image = vidCap.read()

  56.         if ret:
  57.             # 忽略黑白影像(紅外線)
  58.             if np.array_equal(image[:,:,0], image[:,:,1]) and np.array_equal(image[:,:,0], image[:,:,2]):
  59.                 avgImage = None
  60.                 avgImageFloat = None
  61.                 if frameCounter:
  62.                     printLogMsg(f"Stop recording (frame counter = {frameCounter})")
  63.                 frameCounter = 0
  64.                 continue

  65.             # 取出中央部分影像,作為變動依據
  66.             cropImage = image[(imgHeight//4):(imgHeight//4*3), (imgWidth//8*3):(imgWidth//8*5)]
  67.             centerImage = cv2.resize(cropImage, (imgHeight//4, imgWidth//8))
  68.             centerArea = imgHeight//4 * imgWidth//8

  69.             # 初始化平均影像
  70.             if avgImage is None:
  71.                 avgImage = cv2.blur(centerImage, (4, 4))
  72.                 avgImageFloat = np.float32(avgImage)
  73.                 printLogMsg("avgImage initialization")
  74.                 continue

  75.             # 模糊處理
  76.             blurImage = cv2.blur(centerImage, (4, 4))

  77.             # 計算目前影格與平均影像的差異值
  78.             diffImage = cv2.absdiff(avgImage, blurImage)

  79.             # 將圖片轉為灰階
  80.             grayImage = cv2.cvtColor(diffImage, cv2.COLOR_BGR2GRAY)

  81.             # 篩選出變動程度大於門檻值的區域
  82.             ret, binImage = cv2.threshold(grayImage, 5, 255, cv2.THRESH_BINARY)

  83.             # 使用型態轉換函數去除雜訊
  84.             kernel = np.ones((5, 5), np.uint8)
  85.             binImage = cv2.morphologyEx(binImage, cv2.MORPH_OPEN, kernel, iterations=2)
  86.             binImage = cv2.morphologyEx(binImage, cv2.MORPH_CLOSE, kernel, iterations=2)

  87.             # 產生等高線
  88.             countours, _ = cv2.findContours(binImage, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

  89.             detected = False
  90.             for c in countours:
  91.                 if cv2.contourArea(c) > centerArea / 5:
  92.                     detected = True
  93.                     if DEV_MODE:
  94.                         # 計算等高線的外框範圍
  95.                         (x, y, w, h) = cv2.boundingRect(c)

  96.                         # 畫出外框
  97.                         cv2.rectangle(centerImage, (x, y), (x + w, y + h), (0, 255, 0), 2)
  98.                     else:
  99.                         break

  100.             if DEV_MODE:
  101.                 # 顯示偵測結果影像
  102.                 cv2.imshow('image', centerImage)
  103.                 cv2.imshow('avgImage', avgImage)
  104.                 cv2.imshow('blurImage', blurImage)
  105.                 if cv2.waitKey(1) & 0xFF == ord('q'):
  106.                     # 傳入 None 終止工作行程
  107.                     taskqueue.put((None, None))
  108.                     break

  109.             # 更新平均影像
  110.             cv2.accumulateWeighted(blurImage, avgImageFloat, 0.1)
  111.             avgImage = cv2.convertScaleAbs(avgImageFloat)

  112.             # 將影像放入工作佇列
  113.             if detected:
  114.                 if DEV_MODE == False:
  115.                     taskqueue.put((image, frameCounter))
  116.                 if frameCounter == 0:
  117.                     printLogMsg("Start recording")
  118.                 frameCounter += 1
  119.             else:
  120.                 if frameCounter:
  121.                     printLogMsg(f"Stop recording (frame counter = {frameCounter})")
  122.                 frameCounter = 0
  123.                 continue
  124.         else:
  125.             # 若沒有影像跳出迴圈
  126.             printLogMsg("no frame")
  127.             break

  128.     # 傳入 None 終止工作行程
  129.     taskqueue.put((None, None))

  130.     # 等待工作行程結束
  131.     proc.join()

  132.     # 釋放資源
  133.     vidCap.release()

  134.     # 關閉所有 OpenCV 視窗
  135.     cv2.destroyAllWindows()

  136. if __name__ == '__main__':
  137.     captureRTSP('rtsp://ipcam.stream:8554/bars')
複製代碼

在現場的網路攝影機是 24 小時全天運作的,但是機器作動的頻率很低,大部分的時間都是處於閒置狀態,而在機器閒置的時間,現場的燈也都是關閉的。正常開燈的時候網路攝影機使用一般的攝像頭,對應的影格率(FPS)為 15,但當網路攝影機遇到關燈的狀態時,會自動遷換為紅外線攝像頭,影像雖然還是 RGB 格式,但是色彩會轉為灰階,而此時對應的影格率(FPS)就會降為 10。
由於機器只會在開燈的時候作動,所以我在程式中加上判斷影像色彩的條件,若整張影像的 RGB 三個 channels 都完全有相同的值,就判定為紅外線影像,將其直接忽略,減少不必要的計算。
當取得串流影像之後,我們將畫面中央的部分取出來,並降低解析度(節省計算量),參考 Python 與 OpenCV 實作移動偵測程式教學,偵測影像中央是否有變動,當出現影像大幅度變動時,就將串流影片儲存下來。
另外為了方便開發與測試,我們靠著 DEV_MODE 變數設定開發模式,當程式處於開發模式的時候,會開啟監看視窗,顯示幾種關鍵的影像內容,同時開發模式也不會儲存任何影像。
在儲存影片檔案時,會自動將獨立的影片片段分開儲存,檔案名稱則自動以開始錄製的時間來命名,影片的解析度、影格率都維持跟來源影片相同。

https://officeguide.cc/python-opencv-open-display-save-rtsp-stream-tutorial-examples/
參考資料
您需要登錄後才可以回帖 登錄 | 註冊

本版積分規則

Archiver|手機版|小黑屋|TShopping

GMT+8, 2026-5-29 00:00 , Processed in 0.103716 second(s), 189 queries .

Powered by Discuz! X3.5

© 2001-2026 Discuz! Team.

快速回復 返回頂部 返回列表