| |
|
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
| -> Python知识库 -> jetson python gstreamer appsink appsrc -> 正文阅读 |
|
|
[Python知识库]jetson python gstreamer appsink appsrc |
|
1.简单demo,拉流rtsp,使用omxh264解码 import gi Gst.init(None) mainloop = GLib.MainLoop() 2.读取appsink数据 import sys Gst.init(None)
def gst_to_opencv(sample): ? ? print(buf.get_size()) ? ? arr = numpy.ndarray( def new_buffer(sink, data): # Create the elements # Create the empty pipeline if not source or not sink or not pipeline:
caps = Gst.caps_from_string("video/x-raw, format=(string)BGR") sink.set_property("caps", caps)
# Build the pipeline if not Gst.Element.link(convert, sink): # Modify the source's properties # Start playing # Wait until error or EOS
# Free resources 3.使用appsrc 推流 import gi Gst.init(None) pipeline_src = Gst.parse_launch("appsrc name=src ! video/x-raw,width=1280,height=720,format=BGRx,framerate=30/1 ! nvvidconv ! omxh264enc ! h264parse ! flvmux ! rtmpsink location=rtmp://10.0.15.64:20107/uav/123") def ndarray_to_gst_buffer(array: numpy.ndarray) -> Gst.Buffer: while True ????arr?= numpy.random.randint(low=0, high=255, size=(1280, 720, 4), dtype=numpy.uint8) ? ? time.sleep(0.03) 4.拉流rtsp,推流到rtmp import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GObject, GLib
import cv2
import numpy
Gst.init(None)
# Gst.Pipeline
image_arr = None
#push
pipeline_src = Gst.parse_launch("appsrc name=src ! video/x-raw,width=1280,height=720,format=BGRx,framerate=25/1 ! nvvidconv ! omxh264enc ! h264parse ! flvmux ! rtmpsink location=rtmp://10.0.15.64:20107/uav/123")
app_src = pipeline_src.get_by_name('src')
ret = pipeline_src.set_state(Gst.State.PLAYING)
def ndarray_to_gst_buffer(array: numpy.ndarray) -> Gst.Buffer:
"""Converts numpy array to Gst.Buffer"""
return Gst.Buffer.new_wrapped(array.tobytes())
def gst_to_opencv(sample):
buf = sample.get_buffer()
caps = sample.get_caps()
print(caps.get_structure(0).get_value('format'))
print(caps.get_structure(0).get_value('height'))
print(caps.get_structure(0).get_value('width'))
print(buf.get_size())
arr = numpy.ndarray(
(caps.get_structure(0).get_value('height'),
caps.get_structure(0).get_value('width'),
4),
buffer=buf.extract_dup(0, buf.get_size()),
dtype=numpy.uint8)
#arr1 = numpy.random.randint(low=0, high=255, size=(1280, 720, 4), dtype=numpy.uint8)
app_src.emit("push-buffer", ndarray_to_gst_buffer(arr))
return arr
def new_buffer(sink, data):
global image_arr
sample = sink.emit("pull-sample")
# buf = sample.get_buffer()
# print "Timestamp: ", buf.pts
arr = gst_to_opencv(sample)
image_arr = arr
return Gst.FlowReturn.OK
pipeline = Gst.parse_launch("rtspsrc location=rtsp://admin:admin12345@10.0.15.52:554/h264/ch1/main/av_stream ! rtph264depay ! h264parse ! omxh264dec ! nvvidconv ! video/x-raw,width=1280,height=720,format=BGRx ! appsink name=sink")
sink = pipeline.get_by_name('sink')
sink.set_property("emit-signals", True)
sink.connect("new-sample", new_buffer, sink)
# Start playing
ret = pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
print("Unable to set the pipeline to the playing state.")
exit(-1)
# Wait until error or EOS
bus = pipeline.get_bus()
# Parse message
while True:
message = bus.timed_pop_filtered(10000, Gst.MessageType.ANY)
# print "image_arr: ", image_arr
if image_arr is not None:
#cv2.imshow("appsink image arr", image_arr)
cv2.waitKey(1)
if message:
if message.type == Gst.MessageType.ERROR:
err, debug = message.parse_error()
print(("Error received from element %s: %s" % (
message.src.get_name(), err)))
print(("Debugging information: %s" % debug))
break
elif message.type == Gst.MessageType.EOS:
print("End-Of-Stream reached.")
break
elif message.type == Gst.MessageType.STATE_CHANGED:
if isinstance(message.src, Gst.Pipeline):
old_state, new_state, pending_state = message.parse_state_changed()
print(("Pipeline state changed from %s to %s." %
(old_state.value_nick, new_state.value_nick)))
else:
print("Unexpected message received.")
# Free resources
pipeline.set_state(Gst.State.NULL)
#app_src.emit("end-of-stream")
部分参考文章 1.https://gist.github.com/willpatera/7984486 2.https://gist.github.com/cbenhagen/76b24573fa63e7492fb6 3.Appsrc with numpy input in Python - #8 by gautampt6ul - DeepStream SDK - NVIDIA Developer Forums |
|
|
|
|
| 上一篇文章 下一篇文章 查看所有文章 |
|
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
| 360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年11日历 | -2025/11/29 10:53:36- |
|
| 网站联系: qq:121756557 email:121756557@qq.com IT数码 |