【脚本】在jetson nano上使用摄像头分段录像并开启端口推流
可设置录像保留天数、分段时间、分辨率、码率
#!/usr/bin/env python3
import sys
import os
import time
import datetime
import threading
import glob
import gi
import signal
gi.require_version("Gst", "1.0")
from gi.repository import Gst, GLib, GObject
# ================= 配置 =================
CONFIG = {
# 保存位置
"save_dir": "/home/r/Videos/cctv",
# 分段时长
"segment_duration_ns": 60 * 1_000_000_000, # 60s
# 保留天数
"retention_days": 7,
# 剩余空间低于多少自动删除前面的录像
"free_space_threshold": 50 * 1024 * 1024, # 50MB
# 端口
"stream_port": 5000,
# 摄像头地址
"usb_device": "/dev/video0",
# 分辨率
"width": 1280,
"height": 720,
# 平均码率
"bitrate": 2000000,
# 峰值码率
"peak-bitrate": 4000000,
}
# =======================================
class JetsonDVR:
def __init__(self):
# 初始化 GStreamer
Gst.init(None)
self.mainloop = GLib.MainLoop()
os.makedirs(CONFIG["save_dir"], exist_ok=True)
# 清理线程
self.cleanup_thread = threading.Thread(target=self.cleanup_loop, daemon=True)
self.cleanup_thread.start()
self.pipeline = self.create_pipeline()
# 监听总线消息
bus = self.pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", self.on_message)
# 优雅退出的信号处理
signal.signal(signal.SIGINT, self.handle_sigint)
signal.signal(signal.SIGTERM, self.handle_sigint)
def create_pipeline(self):
print(f"[*] 初始化管道... 设备: {CONFIG['usb_device']}")
source = (
f"v4l2src device={CONFIG['usb_device']} ! "
f"image/jpeg, width={CONFIG['width']}, height={CONFIG['height']}, framerate=30/1 ! "
"jpegdec ! "
"nvvidconv ! "
"video/x-raw(memory:NVMM), format=I420"
)
# encoder = (
# f" ! nvv4l2h264enc maxperf-enable=1 insert-sps-pps=1 bitrate={CONFIG['bitrate']} ! "
# "h264parse ! tee name=t"
# )
# 使用 h265
encoder = (
" ! nvv4l2h265enc maxperf-enable=1 insert-sps-pps=1 "
f"control-rate=1 bitrate={CONFIG['bitrate']} peak-bitrate={CONFIG['peak-bitrate']} ! "
"video/x-h265,stream-format=byte-stream ! "
"h265parse ! tee name=t"
)
record_branch = (
f" t. ! queue ! "
f"splitmuxsink name=sink "
f"max-size-time={CONFIG['segment_duration_ns']} "
f"muxer=mp4mux async-handling=true"
)
# Streaming Branch (推流):
stream_branch = (
f" t. ! queue leaky=1 ! "
f"matroskamux streamable=true ! "
f"tcpserversink host=0.0.0.0 port={CONFIG['stream_port']} "
f"recover-policy=keyframe sync-method=latest-keyframe"
)
pipeline_str = source + encoder + record_branch + stream_branch
print("-" * 60)
print("GStreamer Pipeline String:")
print(pipeline_str)
print("-" * 60)
try:
pipeline = Gst.parse_launch(pipeline_str)
except Exception as e:
print("[!] 管道构建失败:", e)
sys.exit(1)
sink = pipeline.get_by_name("sink")
if sink:
sink.connect("format-location", self.on_format_location)
else:
print("[!] 警告: 找不到 splitmuxsink")
return pipeline
def on_format_location(self, splitmux, fragment_id):
now = datetime.datetime.now()
filename = now.strftime("%Y%m%d-%H%M%S.mp4")
return os.path.join(CONFIG["save_dir"], filename)
def cleanup_loop(self):
retention_sec = CONFIG["retention_days"] * 24 * 3600
while True:
try:
now = time.time()
for f in glob.glob(os.path.join(CONFIG["save_dir"], "*.mp4")):
if os.path.isfile(f):
delete = False
if now - os.path.getmtime(f) > retention_sec:
delete = True
elif CONFIG["free_space_threshold"] > 0:
s_info = os.statvfs(CONFIG["save_dir"])
free_space = s_info.f_bsize * s_info.f_bavail
if free_space < CONFIG["free_space_threshold"]:
delete = True
if delete:
os.remove(f)
print(f"[-] 删除过期录像: {os.path.basename(f)}")
except Exception as e:
print("[!] 清理线程错误:", e)
time.sleep(3600)
def on_message(self, bus, message):
mtype = message.type
if mtype == Gst.MessageType.ERROR:
err, debug = message.parse_error()
print("[!] Pipeline Error:", err)
if debug:
print("[!] Debug Info:", debug)
self.quit()
elif mtype == Gst.MessageType.EOS:
print("[*] End of Stream")
self.quit()
def handle_sigint(self, sig, frame):
print("\n[*] 接收到中断信号,正在停止...")
self.quit()
def quit(self):
if self.mainloop.is_running():
self.mainloop.quit()
def run(self):
print("[*] 启动 DVR 系统...")
self.pipeline.set_state(Gst.State.PLAYING)
try:
self.mainloop.run()
except KeyboardInterrupt:
pass
finally:
self.stop_pipeline()
def stop_pipeline(self):
print("[*] 发送 EOS 并停止管道...")
# 发送 EOS 以正确结束录像文件(防止 MP4 损坏)
self.pipeline.send_event(Gst.Event.new_eos())
# 给一点时间让 EOS 传播
time.sleep(1)
self.pipeline.set_state(Gst.State.NULL)
print("[*] 已退出")
if __name__ == "__main__":
dvr = JetsonDVR()
dvr.run()