| |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
-> Python知识库 -> jetson python gstreamer appsink appsrc -> 正文阅读 |
|
[Python知识库]jetson python gstreamer appsink appsrc |
1.简单demo,拉流rtsp,使用omxh264解码 import gi Gst.init(None) mainloop = GLib.MainLoop() 2.读取appsink数据 import sys Gst.init(None)
def gst_to_opencv(sample): ? ? print(buf.get_size()) ? ? arr = numpy.ndarray( def new_buffer(sink, data): # Create the elements # Create the empty pipeline if not source or not sink or not pipeline:
caps = Gst.caps_from_string("video/x-raw, format=(string)BGR") sink.set_property("caps", caps)
# Build the pipeline if not Gst.Element.link(convert, sink): # Modify the source's properties # Start playing # Wait until error or EOS
# Free resources 3.使用appsrc 推流 import gi Gst.init(None) pipeline_src = Gst.parse_launch("appsrc name=src ! video/x-raw,width=1280,height=720,format=BGRx,framerate=30/1 ! nvvidconv ! omxh264enc ! h264parse ! flvmux ! rtmpsink location=rtmp://10.0.15.64:20107/uav/123") def ndarray_to_gst_buffer(array: numpy.ndarray) -> Gst.Buffer: while True ????arr?= numpy.random.randint(low=0, high=255, size=(1280, 720, 4), dtype=numpy.uint8) ? ? time.sleep(0.03) 4.拉流rtsp,推流到rtmp import gi gi.require_version('Gst', '1.0') from gi.repository import Gst, GObject, GLib import cv2 import numpy Gst.init(None) # Gst.Pipeline image_arr = None #push pipeline_src = Gst.parse_launch("appsrc name=src ! video/x-raw,width=1280,height=720,format=BGRx,framerate=25/1 ! nvvidconv ! omxh264enc ! h264parse ! flvmux ! rtmpsink location=rtmp://10.0.15.64:20107/uav/123") app_src = pipeline_src.get_by_name('src') ret = pipeline_src.set_state(Gst.State.PLAYING) def ndarray_to_gst_buffer(array: numpy.ndarray) -> Gst.Buffer: """Converts numpy array to Gst.Buffer""" return Gst.Buffer.new_wrapped(array.tobytes()) def gst_to_opencv(sample): buf = sample.get_buffer() caps = sample.get_caps() print(caps.get_structure(0).get_value('format')) print(caps.get_structure(0).get_value('height')) print(caps.get_structure(0).get_value('width')) print(buf.get_size()) arr = numpy.ndarray( (caps.get_structure(0).get_value('height'), caps.get_structure(0).get_value('width'), 4), buffer=buf.extract_dup(0, buf.get_size()), dtype=numpy.uint8) #arr1 = numpy.random.randint(low=0, high=255, size=(1280, 720, 4), dtype=numpy.uint8) app_src.emit("push-buffer", ndarray_to_gst_buffer(arr)) return arr def new_buffer(sink, data): global image_arr sample = sink.emit("pull-sample") # buf = sample.get_buffer() # print "Timestamp: ", buf.pts arr = gst_to_opencv(sample) image_arr = arr return Gst.FlowReturn.OK pipeline = Gst.parse_launch("rtspsrc location=rtsp://admin:admin12345@10.0.15.52:554/h264/ch1/main/av_stream ! rtph264depay ! h264parse ! omxh264dec ! nvvidconv ! video/x-raw,width=1280,height=720,format=BGRx ! appsink name=sink") sink = pipeline.get_by_name('sink') sink.set_property("emit-signals", True) sink.connect("new-sample", new_buffer, sink) # Start playing ret = pipeline.set_state(Gst.State.PLAYING) if ret == Gst.StateChangeReturn.FAILURE: print("Unable to set the pipeline to the playing state.") exit(-1) # Wait until error or EOS bus = pipeline.get_bus() # Parse message while True: message = bus.timed_pop_filtered(10000, Gst.MessageType.ANY) # print "image_arr: ", image_arr if image_arr is not None: #cv2.imshow("appsink image arr", image_arr) cv2.waitKey(1) if message: if message.type == Gst.MessageType.ERROR: err, debug = message.parse_error() print(("Error received from element %s: %s" % ( message.src.get_name(), err))) print(("Debugging information: %s" % debug)) break elif message.type == Gst.MessageType.EOS: print("End-Of-Stream reached.") break elif message.type == Gst.MessageType.STATE_CHANGED: if isinstance(message.src, Gst.Pipeline): old_state, new_state, pending_state = message.parse_state_changed() print(("Pipeline state changed from %s to %s." % (old_state.value_nick, new_state.value_nick))) else: print("Unexpected message received.") # Free resources pipeline.set_state(Gst.State.NULL) #app_src.emit("end-of-stream") 部分参考文章 1.https://gist.github.com/willpatera/7984486 2.https://gist.github.com/cbenhagen/76b24573fa63e7492fb6 3.Appsrc with numpy input in Python - #8 by gautampt6ul - DeepStream SDK - NVIDIA Developer Forums |
|
|
上一篇文章 下一篇文章 查看所有文章 |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 | -2025/1/1 12:23:42- |
|
网站联系: qq:121756557 email:121756557@qq.com IT数码 |