思路
1、前端是无法直接播放rstp推流来的视频,所以需要用ffmpeg进行转码。 2、ffmpeg只能推送TCP或者HTTP协议还不支持ws协议。 大致流程图。 代码
效果图。
需要依赖Springboot + netty+ffmpeg-platform
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.3.3</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.42.Final</version>
</dependency>
<dependency>
<groupId>org.bytedeco</groupId>
<artifactId>ffmpeg-platform</artifactId>
<version>4.3.1-1.5.4</version>
</dependency>
netty部分代码 1、简简单单一个Server服务端。
package com.kang.rtsp.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.unix.PreferredDirectByteBufAllocator;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.codec.http.cors.CorsConfig;
import io.netty.handler.codec.http.cors.CorsConfigBuilder;
import io.netty.handler.codec.http.cors.CorsHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class NettyServer {
@Autowired
private RtspHandler rtspHandler;
@Value("${netty.port}")
private Integer port;
public void start() {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup(200);
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
CorsConfig corsConfig = CorsConfigBuilder.forAnyOrigin().allowNullOrigin().allowCredentials().build();
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpResponseEncoder())
.addLast(new HttpRequestDecoder())
.addLast(new ChunkedWriteHandler())
.addLast(new HttpObjectAggregator(64*1024))
.addLast(new CorsHandler(corsConfig))
.addLast(rtspHandler);
}
})
.option(ChannelOption.SO_BACKLOG,128)
.childOption(ChannelOption.SO_KEEPALIVE,true)
.option(ChannelOption.ALLOCATOR, PreferredDirectByteBufAllocator.DEFAULT)
.childOption(ChannelOption.TCP_NODELAY, true).childOption(ChannelOption.SO_KEEPALIVE, true).childOption(ChannelOption.SO_RCVBUF, 128 * 1024).childOption(ChannelOption.SO_SNDBUF, 1024 * 1024).childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(1024 * 1024 / 2, 1024 * 1024));
ChannelFuture sync = bootstrap.bind(port).sync();
log.info("netty启动成功监听端口号{}",port);
sync.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
2、重点是rtspHandler的编写。因为要同时实现http协议和ws协议所以需要自己进行判断。 1、读取信息先判断是不是请求,如果是请求先判断后缀、后面可以扩展通过后缀参数来获取RTSP的链接参数。 2、判断是什么请求,如果是http请求就直接通过http进行播放,如果是ws就需要进行协议升级。 3、WebSocketServerHandshakerFactory(getWebSocketLocation(req), “null”, true, 5 * 1024 * 1024); 第二个参数很坑原本我填的null但是前端报错,说传的null不为null后端不给出回应,但是我看着就是null,我试着改成字符串就好了。 4、将通道交给WebServer保存起来netty的路就只能陪你走就到这了,后面直接把流数据往通道里面发就完事了。
package com.kang.rtsp.netty;
import com.kang.rtsp.controller.TestController;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@Sharable
public class RtspHandler extends SimpleChannelInboundHandler<Object> {
@Autowired
private TestController testController;
private WebSocketServerHandshaker handshaker;
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof FullHttpRequest){
FullHttpRequest req = (FullHttpRequest) msg;
QueryStringDecoder decoder = new QueryStringDecoder(req.uri());
if (!"/live".equals(decoder.path())) {
System.err.println("uri有误");
sendError(ctx, HttpResponseStatus.BAD_REQUEST);
return;
}
if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))){
log.info("HTTP请求");
sendFlvReqHeader(ctx);
}else {
WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory(getWebSocketLocation(req), "null", true, 5 * 1024 * 1024);
handshaker = factory.newHandshaker(req);
if (handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
handshaker.handshake(ctx.channel(), req);
testController.setWsClients(ctx);
}
}
} else if (msg instanceof WebSocketFrame) {
handleWebSocketRequest(ctx, (WebSocketFrame) msg);
}
System.err.println("发送消息走完");
}
private String getWebSocketLocation(FullHttpRequest request) {
String location = request.headers().get(HttpHeaderNames.HOST) + request.uri();
return "ws://" + location;
}
private void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
System.err.println("请求地址有错误");
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status,
Unpooled.copiedBuffer("请求地址有误: " + status + "\r\n", CharsetUtil.UTF_8));
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
private void handleWebSocketRequest(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
return;
}
if (frame instanceof PingWebSocketFrame) {
ctx.write(new PongWebSocketFrame(frame.content().retain()));
return;
}
if (frame instanceof TextWebSocketFrame) {
frame.retain(1);
ctx.channel().writeAndFlush(new TextWebSocketFrame(((TextWebSocketFrame) frame).text()));
return;
}
if (frame instanceof BinaryWebSocketFrame) {
return;
}
}
private void sendFlvReqHeader(ChannelHandlerContext ctx) {
HttpResponse rsp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
rsp.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE)
.set(HttpHeaderNames.CONTENT_TYPE, "video/x-flv").set(HttpHeaderNames.ACCEPT_RANGES, "bytes")
.set(HttpHeaderNames.PRAGMA, "no-cache").set(HttpHeaderNames.CACHE_CONTROL, "no-cache")
.set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED).set(HttpHeaderNames.SERVER, "zhang");
ctx.writeAndFlush(rsp);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
log.info("web客户端被链接"+ctx.channel().id().asLongText());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("异常发生 " + cause.getMessage());
ctx.close();
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerRemoved 被调用" + ctx.channel().id().asLongText());
}
}
系统启动初始化的一些方法
1、我们要优雅的启动netty服务器和提前初始化一下ffmpeg获取他的文件路径
package com.kang.rtsp.init;
import com.kang.rtsp.netty.NettyServer;
import lombok.extern.slf4j.Slf4j;
import org.bytedeco.javacpp.Loader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Slf4j
@Component
public class InitServer implements CommandLineRunner {
@Autowired
private NettyServer nettyServer;
@Override
public void run(String... args) throws Exception {
log.info("启动netty服务器");
nettyServer.start();
}
@PostConstruct
public void loadFFmpeg() {
log.info("正在初始化资源,请稍等...");
String ffmpeg = Loader.load(org.bytedeco.ffmpeg.ffmpeg.class);
String path = ffmpeg.substring(0,ffmpeg.indexOf(".exe"));
System.setProperty("ffmpeg",path);
log.info(System.getProperty("ffmpeg"));
log.info("初始化成功");
}
}
ffmpge启动并初始化流数据
package com.kang.rtsp.ffmpeg;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
@Component
public class ConversionVideo implements ApplicationRunner {
public Process process;
public Integer pushVideoAsRTSP(String id, String fileName){
int flag = -1;
String ffmpegPath = System.getProperty("ffmpeg");
try {
if(process != null){
process.destroy();
System.out.println(">>>>>>>>>>推流视频切换<<<<<<<<<<");
}
String command = ffmpegPath;
command += " -i \"" + id + "\"";
command += " -q 0 -f mpegts -codec:v mpeg1video -s 800x600 " + fileName;
System.out.println("ffmpeg推流命令:" + command);
process = Runtime.getRuntime().exec(command);
}catch (Exception e){
e.printStackTrace();
}
return flag;
}
@Override
public void run(ApplicationArguments args) throws Exception {
ConversionVideo conversionVideo = new ConversionVideo();
conversionVideo.pushVideoAsRTSP("你的rtsp地址或者你的视频地址都可以搞", "http://127.0.0.1:8080/receive");
}
}
1、看看控制层代码,netty调用setwsClients,把通道信息保存在controller中。然后receive获取ffmpge传过来的二进制流 2、调用sendVideo给socket通道发送消息。
package com.kang.rtsp.controller;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.servlet.ServletInputStream;
import javax.servlet.http.HttpServletRequest;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Controller
public class TestController {
private ConcurrentHashMap<String, ChannelHandlerContext> wsClients = new ConcurrentHashMap<>();
@RequestMapping("/receive")
@ResponseBody
public String receive(HttpServletRequest request) {
try {
ServletInputStream inputStream = request.getInputStream();
int len = -1;
while ((len =inputStream.available()) !=-1) {
byte[] data = new byte[len];
inputStream.read(data);
sendVideo(data);
}
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("over");
return "1";
}
public void sendVideo(byte[] data) {
for (Map.Entry<String, ChannelHandlerContext> entry : wsClients.entrySet()) {
try {
if (entry.getValue().channel().isWritable()) {
entry.getValue().writeAndFlush(new BinaryWebSocketFrame(Unpooled.copiedBuffer(data)));
} else {
wsClients.remove(entry.getKey());
}
} catch (java.lang.Exception e) {
wsClients.remove(entry.getKey());
e.printStackTrace();
}
}
}
public void setWsClients(ChannelHandlerContext ctx) {
wsClients.put(ctx.channel().id().asLongText(),ctx);
}
}
前端代码也给上
<!DOCTYPE html>
<html>
<head>
</head>
<body>
<canvas id="video"></canvas>
<script type="text/javascript" src="jsmpeg.min.js"></script>
<script type="text/javascript">
var canvas = document.getElementById('video');
var url = 'ws://127.0.0.1:8866/live';
var player = new JSMpeg.Player(url, {canvas: canvas});
</script>
</body>
</html>
jsmpeg.min.js地址,自己随便下下得了。 https://gitcode.net/mirrors/phoboslab/jsmpeg/-/raw/master/jsmpeg.min.js?inline=false
|