IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Python知识库 -> jetson python gstreamer appsink appsrc -> 正文阅读

[Python知识库]jetson python gstreamer appsink appsrc

1.简单demo,拉流rtsp,使用omxh264解码

import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GObject, GLib

Gst.init(None)
# Gst.Pipeline
pipeline = Gst.parse_launch("rtspsrc location=rtsp://admin:admin12345@10.0.15.52:554/h264/ch1/main/av_stream ! rtph264depay ! h264parse ! omxh264dec ! nvvidconv ! video/x-raw ! appsink name=sink")
pipeline.set_state(Gst.State.PLAYING)

mainloop = GLib.MainLoop()
mainloop.run()

2.读取appsink数据

import sys
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst
import cv2
import numpy

Gst.init(None)


image_arr = None

def gst_to_opencv(sample):
? ? buf = sample.get_buffer()
? ? caps = sample.get_caps()
? ? print(caps.get_structure(0).get_value('format'))
? ? print(caps.get_structure(0).get_value('height'))
? ? print(caps.get_structure(0).get_value('width'))

? ? print(buf.get_size())

? ? arr = numpy.ndarray(
? ? ? ? (caps.get_structure(0).get_value('height'),
? ? ? ? ?caps.get_structure(0).get_value('width'),
? ? ? ? ?3),
? ? ? ? buffer=buf.extract_dup(0, buf.get_size()),
? ? ? ? dtype=numpy.uint8)
? ? return arr

def new_buffer(sink, data):
? ? global image_arr
? ? sample = sink.emit("pull-sample")
? ? # buf = sample.get_buffer()
? ? # print "Timestamp: ", buf.pts
? ? arr = gst_to_opencv(sample)
? ? image_arr = arr
? ? return Gst.FlowReturn.OK

# Create the elements
source = Gst.ElementFactory.make("videotestsrc", "source")
convert = Gst.ElementFactory.make("videoconvert", "convert")
sink = Gst.ElementFactory.make("appsink", "sink")

# Create the empty pipeline
pipeline = Gst.Pipeline.new("test-pipeline")

if not source or not sink or not pipeline:
? ? print("Not all elements could be created.")
? ? exit(-1)


sink.set_property("emit-signals", True)
# sink.set_property("max-buffers", 2)
# # sink.set_property("drop", True)
# # sink.set_property("sync", False)

caps = Gst.caps_from_string("video/x-raw, format=(string)BGR")

sink.set_property("caps", caps)


sink.connect("new-sample", new_buffer, sink)

# Build the pipeline
pipeline.add(source)
pipeline.add(convert)
pipeline.add(sink)
if not Gst.Element.link(source, convert):
? ? print("Elements could not be linked.")
? ? exit(-1)

if not Gst.Element.link(convert, sink):
? ? print("Elements could not be linked.")
? ? exit(-1)

# Modify the source's properties
# HD1080 25p
#source.set_property("mode", 7)
# SDI
#source.set_property("connection", 0)

# Start playing
ret = pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
? ? print("Unable to set the pipeline to the playing state.")
? ? exit(-1)

# Wait until error or EOS
bus = pipeline.get_bus()


# Parse message
while True:
? ? message = bus.timed_pop_filtered(10000, Gst.MessageType.ANY)
? ? # print "image_arr: ", image_arr
? ? if image_arr is not None:
? ? ? ? #cv2.imshow("appsink image arr", image_arr)
? ? ? ? cv2.waitKey(1)
? ? if message:
? ? ? ? if message.type == Gst.MessageType.ERROR:
? ? ? ? ? ? err, debug = message.parse_error()
? ? ? ? ? ? print(("Error received from element %s: %s" % (
? ? ? ? ? ? ? ? message.src.get_name(), err)))
? ? ? ? ? ? print(("Debugging information: %s" % debug))
? ? ? ? ? ? break
? ? ? ? elif message.type == Gst.MessageType.EOS:
? ? ? ? ? ? print("End-Of-Stream reached.")
? ? ? ? ? ? break
? ? ? ? elif message.type == Gst.MessageType.STATE_CHANGED:
? ? ? ? ? ? if isinstance(message.src, Gst.Pipeline):
? ? ? ? ? ? ? ? old_state, new_state, pending_state = message.parse_state_changed()
? ? ? ? ? ? ? ? print(("Pipeline state changed from %s to %s." %
? ? ? ? ? ? ? ? ? ? ? ?(old_state.value_nick, new_state.value_nick)))
? ? ? ? else:
? ? ? ? ? ? print("Unexpected message received.")

# Free resources
pipeline.set_state(Gst.State.NULL)

3.使用appsrc 推流

import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GObject, GLib
import cv2
import numpy

Gst.init(None)

pipeline_src = Gst.parse_launch("appsrc name=src ! video/x-raw,width=1280,height=720,format=BGRx,framerate=30/1 ! nvvidconv ! omxh264enc ! h264parse ! flvmux ! rtmpsink location=rtmp://10.0.15.64:20107/uav/123")
app_src = pipeline_src.get_by_name('src')
ret = pipeline_src.set_state(Gst.State.PLAYING)

def ndarray_to_gst_buffer(array: numpy.ndarray) -> Gst.Buffer:
? ? """Converts numpy array to Gst.Buffer"""
? ? return Gst.Buffer.new_wrapped(array.tobytes())

while True

????arr?= numpy.random.randint(low=0, high=255, size=(1280, 720, 4), dtype=numpy.uint8)
? ? app_src.emit("push-buffer", ndarray_to_gst_buffer(arr))

? ? time.sleep(0.03)

4.拉流rtsp,推流到rtmp

import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GObject, GLib
import cv2
import numpy

Gst.init(None)
# Gst.Pipeline

image_arr = None


#push
pipeline_src = Gst.parse_launch("appsrc name=src ! video/x-raw,width=1280,height=720,format=BGRx,framerate=25/1 ! nvvidconv ! omxh264enc ! h264parse ! flvmux ! rtmpsink location=rtmp://10.0.15.64:20107/uav/123")
app_src = pipeline_src.get_by_name('src')
ret = pipeline_src.set_state(Gst.State.PLAYING)

def ndarray_to_gst_buffer(array: numpy.ndarray) -> Gst.Buffer:
    """Converts numpy array to Gst.Buffer"""
    return Gst.Buffer.new_wrapped(array.tobytes())



def gst_to_opencv(sample):
    buf = sample.get_buffer()
    caps = sample.get_caps()
    print(caps.get_structure(0).get_value('format'))
    print(caps.get_structure(0).get_value('height'))
    print(caps.get_structure(0).get_value('width'))

    print(buf.get_size())

    arr = numpy.ndarray(
        (caps.get_structure(0).get_value('height'),
         caps.get_structure(0).get_value('width'),
         4),
        buffer=buf.extract_dup(0, buf.get_size()),
        dtype=numpy.uint8)

    #arr1 = numpy.random.randint(low=0, high=255, size=(1280, 720, 4), dtype=numpy.uint8)
    app_src.emit("push-buffer", ndarray_to_gst_buffer(arr))
    return arr

def new_buffer(sink, data):
    global image_arr
    sample = sink.emit("pull-sample")
    # buf = sample.get_buffer()
    # print "Timestamp: ", buf.pts
    arr = gst_to_opencv(sample)
    image_arr = arr
    return Gst.FlowReturn.OK


pipeline = Gst.parse_launch("rtspsrc location=rtsp://admin:admin12345@10.0.15.52:554/h264/ch1/main/av_stream ! rtph264depay ! h264parse ! omxh264dec ! nvvidconv ! video/x-raw,width=1280,height=720,format=BGRx ! appsink name=sink")

sink = pipeline.get_by_name('sink')
sink.set_property("emit-signals", True)
sink.connect("new-sample", new_buffer, sink)

# Start playing
ret = pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
    print("Unable to set the pipeline to the playing state.")
    exit(-1)

# Wait until error or EOS
bus = pipeline.get_bus()


# Parse message
while True:
    message = bus.timed_pop_filtered(10000, Gst.MessageType.ANY)
    # print "image_arr: ", image_arr
    if image_arr is not None:
        #cv2.imshow("appsink image arr", image_arr)
        cv2.waitKey(1)
    if message:
        if message.type == Gst.MessageType.ERROR:
            err, debug = message.parse_error()
            print(("Error received from element %s: %s" % (
                message.src.get_name(), err)))
            print(("Debugging information: %s" % debug))
            break
        elif message.type == Gst.MessageType.EOS:
            print("End-Of-Stream reached.")
            break
        elif message.type == Gst.MessageType.STATE_CHANGED:
            if isinstance(message.src, Gst.Pipeline):
                old_state, new_state, pending_state = message.parse_state_changed()
                print(("Pipeline state changed from %s to %s." %
                       (old_state.value_nick, new_state.value_nick)))
        else:
            print("Unexpected message received.")

# Free resources
pipeline.set_state(Gst.State.NULL)
#app_src.emit("end-of-stream")

部分参考文章

1.https://gist.github.com/willpatera/7984486

2.https://gist.github.com/cbenhagen/76b24573fa63e7492fb6

3.Appsrc with numpy input in Python - #8 by gautampt6ul - DeepStream SDK - NVIDIA Developer Forums

  Python知识库 最新文章
Python中String模块
【Python】 14-CVS文件操作
python的panda库读写文件
使用Nordic的nrf52840实现蓝牙DFU过程
【Python学习记录】numpy数组用法整理
Python学习笔记
python字符串和列表
python如何从txt文件中解析出有效的数据
Python编程从入门到实践自学/3.1-3.2
python变量
上一篇文章      下一篇文章      查看所有文章
加:2022-02-26 11:26:58  更:2022-02-26 11:29:23 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/1 12:23:42-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码