场景:
- 解析华为M2391相机RTSP协议中的H264数据流
- 单独存储视频流中的时间戳
代码:
"""
A demo python code that ..
1) Connects to an IP cam with RTSP
2) Draws RTP/NAL/H264 packets from the camera
3) Writes them to a file that can be read with any stock video player (say, mplayer, vlc & other ffmpeg based video-players)
Done for educative/demonstrative purposes, not for efficiency..!
written 2015 by Sampsa Riikonen.
"""
import math
import re
import socket
import sys
import time
import bitstring
import logging
logger = logging.getLogger('rtp_h264_save')
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(threadName)s -%(filename)s - %(funcName)s - %(lineno)s - '
'%(levelname)s - %(message)s')
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(formatter)
handler.setLevel(logging.DEBUG)
logger.addHandler(handler)
# File organized as follows:
# 1) Strings manipulation routines
# 2) RTP stream handling routine
# 3) Main program
# *** (1) First, some string searching/manipulation for handling the rtsp strings ***
def getPorts(searchst, st):
""" Searching port numbers from rtsp strings using regular expressions
"""
pat = re.compile(searchst + "=\d*-\d*")
pat2 = re.compile('\d+')
mstring = pat.findall(st)[0] # matched string .. "client_port=1000-1001"
nums = pat2.findall(mstring)
numas = []
for num in nums:
numas.append(int(num))
return numas
def getLength(st):
""" Searching "content-length" from rtsp strings using regular expressions
"""
pat = re.compile("Content-Length: \d*")
pat2 = re.compile('\d+')
mstring = pat.findall(st)[0] # matched string.. "Content-Length: 614"
num = int(pat2.findall(mstring)[0])
return num
def printrec(recst):
""" Pretty-printing rtsp strings
"""
recs = recst.split('\r\n')
for rec in recs:
logger.debug(rec)
def sessionid(recst):
""" Search session id from rtsp strings
"""
recs = recst.split('\r\n')
for rec in recs:
ss = rec.split()
if (ss[0].strip() == "Session:"):
return int(ss[1].split(";")[0].strip())
def setsesid(recst, idn):
""" Sets session id in an rtsp string
"""
return recst.replace("SESID", str(idn))
# ********* (2) The routine for handling the RTP stream ***********
def digestpacket(st):
""" This routine takes a UDP packet, i.e. a string of bytes and ..
(a) strips off the RTP header
(b) adds NAL "stamps" to the packets, so that they are recognized as NAL's
(c) Concantenates frames
(d) Returns a packet that can be written to disk as such and that is recognized by stock media players as h264 stream
"""
startbytes = "\x00\x00\x00\x01" # this is the sequence of four bytes that identifies a NAL packet.. must be in front of every NAL packet.
bt = bitstring.BitArray(
bytes=st) # turn the whole string-of-bytes packet into a string of bits. Very unefficient, but hey, this is only for demoing.
lc = 12 # bytecounter
bc = 12 * 8 # bitcounter
version = bt[0:2].uint # version
p = bt[2] # P
x = bt[3] # X
cc = bt[4:8].uint # CC
m = bt[8] # M
pt = bt[9:16].uint # PT
sn = bt[16:32].uint # sequence number
timestamp = bt[32:64].uint # timestamp
ssrc = bt[64:96].uint # ssrc identifier
# The header format can be found from:
# https://en.wikipedia.org/wiki/Real-time_Transport_Protocol
lc = 12 # so, we have red twelve bytes
bc = 12 * 8 # .. and that many bits
logger.debug("version : {}, p : {}, x : {}, cc : {}, m : {}, pt : {}".format(version, p, x, cc, m, pt))
logger.debug(f"sequence number : {sn} timestamp: {timestamp}" )
logger.debug(f"sync. source identifier {ssrc}")
# st=f.read(4*cc) # csrc identifiers, 32 bits (4 bytes) each
cids = []
for i in range(cc):
cids.append(bt[bc:bc + 32].uint)
bc += 32;
lc += 4;
logger.debug(f"csrc identifiers: {cids}")
if (x):
hid = bt[bc:bc + 16].uint
bc += 16;
lc += 2;
hlen = bt[bc:bc + 16].uint
bc += 16;
lc += 2;
logger.debug("ext. header id : {}, header len: {}".format( hid, hlen))
hst = bt[bc:bc + 32 * hlen]
bc += 32 * hlen;
lc += 4 * hlen;
ts_s = hst[:32].uint
ts_ms = hst[32:64].uint
frame_ts = f'{(ts_s + ts_ms * 232.83 / math.pow(10, 12) - 2208988800):0<.6f}'.replace('.','')
# OK, now we enter the NAL packet, as described here:
#
# https://tools.ietf.org/html/rfc6184#section-1.3
#
# Some quotes from that document:
#
"""
5.3. NAL Unit Header Usage
The structure and semantics of the NAL unit header were introduced in
Section 1.3. For convenience, the format of the NAL unit header is
reprinted below:
+---------------+
|0|1|2|3|4|5|6|7|
+-+-+-+-+-+-+-+-+
|F|NRI| Type |
+---------------+
This section specifies the semantics of F and NRI according to this
specification.
"""
"""
Table 3. Summary of allowed NAL unit types for each packetization
mode (yes = allowed, no = disallowed, ig = ignore)
Payload Packet Single NAL Non-Interleaved Interleaved
Type Type Unit Mode Mode Mode
-------------------------------------------------------------
0 reserved ig ig ig
1-23 NAL unit yes yes no
24 STAP-A no yes no
25 STAP-B no no yes
26 MTAP16 no no yes
27 MTAP24 no no yes
28 FU-A no yes yes
29 FU-B no no yes
30-31 reserved ig ig ig
"""
# This was also very usefull:
# http://stackoverflow.com/questions/7665217/how-to-process-raw-udp-packets-so-that-they-can-be-decoded-by-a-decoder-filter-i
# A quote from that:
"""
First byte: [ 3 NAL UNIT BITS | 5 FRAGMENT TYPE BITS]
Second byte: [ START BIT | RESERVED BIT | END BIT | 5 NAL UNIT BITS]
Other bytes: [... VIDEO FRAGMENT DATA...]
"""
fb = bt[bc] # i.e. "F"
nri = bt[bc + 1:bc + 3].uint # "NRI"
nlu0 = bt[bc:bc + 3] # "3 NAL UNIT BITS" (i.e. [F | NRI])
typ = bt[bc + 3:bc + 8].uint # "Type"
logger.debug("F: {}, NRI: {}, Type : {}".format( fb, nri, typ))
logger.debug("first three bits together : {}".format( bt[bc:bc + 3]))
if (typ == 7 or typ == 8):
# this means we have either an SPS or a PPS packet
# they have the meta-info about resolution, etc.
# more reading for example here:
# http://www.cardinalpeak.com/blog/the-h-264-sequence-parameter-set/
if (typ == 7):
logger.debug(">>>>> SPS packet")
else:
logger.debug(">>>>> PPS packet")
return startbytes.encode() + st[lc:], None
# .. notice here that we include the NAL starting sequence "startbytes" and the "First byte"
elif typ == 6:
return st[lc:], None #
bc += 8;
lc += 1; # let's go to "Second byte"
# ********* WE ARE AT THE "Second byte" ************
# The "Type" here is most likely 28, i.e. "FU-A"
start = bt[bc] # start bit
end = bt[bc + 1] # end bit
nlu1 = bt[bc + 3:bc + 8] # 5 nal unit bits
if (start): # OK, this is a first fragment in a movie frame
logger.debug(">>> first fragment found")
nlu = nlu0 + nlu1 # Create "[3 NAL UNIT BITS | 5 NAL UNIT BITS]"
head = startbytes.encode() + nlu.bytes # .. add the NAL starting sequence
lc += 1
if (start == False and end == False): # intermediate fragment in a sequence, just dump "VIDEO FRAGMENT DATA"
head = b""
lc += 1
elif (end == True): # last fragment in a sequence, just dump "VIDEO FRAGMENT DATA"
head = b""
logger.debug("<<<< last fragment found")
lc += 1
if (typ == 28): # This code only handles "Type" = 28, i.e. "FU-A"
if start:
return head + st[lc:], frame_ts
else:
return head + st[lc:], None
else:
raise (Exception, "unknown frame type for this piece of s***")
# *********** (3) THE MAIN PROGRAM STARTS HERE ****************
# Create an TCP socket for RTSP communication
# further reading:
# https://docs.python.org/2.7/howto/sockets.html
# # ************************ FOR QUICK-TESTING EDIT THIS AREA *********************************************************
# ip = "10.10.10.54" # IP address of your cam
# adr = "rtsp://admin:broadxt333@10.10.10.54/LiveMedia/ch1/Media2" # username, passwd, etc.
# clientports = [60784, 60785] # the client ports we are going to use for receiving video
# video_file = "stream.h264" # filename for dumping the stream
# timestamp_file = "stream.timestamp" # filename for dumping the stream
# rn = 5000 # receive this many packets
# # After running this program, you can try your file defined in fname with "vlc fname" or "mplayer fname" from the command line
# # you might also want to install h264bitstream to analyze your h264 file
# # *******************************************************************************************************************
#
# dest = "DESCRIBE " + adr + " RTSP/1.0\r\nCSeq: 2\r\nUser-Agent: python\r\nAccept: application/sdp\r\n\r\n"
# setu = "SETUP " + adr + "/trackID=1 RTSP/1.0\r\nCSeq: 3\r\nUser-Agent: python\r\nTransport: RTP/AVP;unicast;client_port=" + str(
# clientports[0]) + "-" + str(clientports[1]) + "\r\n\r\n"
# play = "PLAY " + adr + " RTSP/1.0\r\nCSeq: 5\r\nUser-Agent: python\r\nSession: SESID\r\nRange: npt=0.000-\r\n\r\n"
# 检查系统端口是否被占用,占用返回true
def check_os_port_status(port, host='127.0.0.1'):
if port > 65535:
# socket的端口范围0-65535
return False
try:
socket_client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket_client.settimeout(1)
socket_client.connect((host, int(port)))
port_occupied = False
except socket.error:
port_occupied = True
finally:
if socket_client:
socket_client.close()
return port_occupied
# 从某个端口开始查找未使用的端口
def find_unused_port(start_port, host='127.0.0.1'):
# 假设当前端口被占用
cur_port_occupied = True
while cur_port_occupied:
start_port += 1
cur_port_occupied = check_os_port_status(start_port, host)
return start_port
class Rtp_H264():
clientports = [60784, 60785]
instance_num = 0
account = 'admin'
password = '123456'
adr = 'rtsp://{}:{}@{}/LiveMedia/ch1/Media2'
def __new__(cls, *args, **kwargs):
instance = super().__new__(cls)
instance.number = cls.instance_num
cls.instance_num += 1
return instance
def __init__(self, ip='10.10.10.54'):
self.ip = ip
self.rtsp_adr = self.adr.format(self.account, self.password, self.ip)
def get_cur_clientports(self):
port_num = self.clientports.__len__().__divmod__(2)
if port_num[0] < self.instance_num:
self.clientports.sort()
new_client_port_1 = find_unused_port(self.clientports[-1] + 1)
new_client_port_2 = find_unused_port(new_client_port_1 + 1)
self.cur_client_port = [new_client_port_1, new_client_port_2]
self.clientports.extend(self.cur_client_port)
elif port_num[1] > 0:
new_client_port_1 = find_unused_port(self.clientports[-1] + 1)
self.cur_client_port = [self.clientports[-1], new_client_port_1]
self.clientports.append(new_client_port_1)
else:
self.cur_client_port = self.clientports[-2:]
logger.debug(f'当前端口号: {self.cur_client_port}')
def prepare_connect(self):
self.get_cur_clientports()
dest = "DESCRIBE " + self.rtsp_adr + " RTSP/1.0\r\nCSeq: 2\r\nUser-Agent: python\r\nAccept: application/sdp\r\n\r\n"
setu = "SETUP " + self.rtsp_adr + "/trackID=1 RTSP/1.0\r\nCSeq: 3\r\nUser-Agent: python\r\nTransport: RTP/AVP;unicast;client_port=" + str(
self.cur_client_port[0]) + "-" + str(self.cur_client_port[1]) + "\r\n\r\n"
play = "PLAY " + self.rtsp_adr + " RTSP/1.0\r\nCSeq: 5\r\nUser-Agent: python\r\nSession: SESID\r\nRange: npt=0.000-\r\n\r\n"
self.setu = setu.encode()
self.play = play
self.dest = dest.encode()
def save_h264(self, delay=1, video_file='stream.h264', timestamp_file='stream.timestamp'):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((self.ip, 554)) # RTSP should peek out from port 554
logger.debug("\n*** SENDING DESCRIBE ***\n")
s.send(self.dest)
recst = s.recv(4096)
logger.debug("\n*** GOT ****\n")
printrec(recst.decode())
logger.debug("\n*** SENDING SETUP ***\n")
s.send(self.setu)
recst = s.recv(4096)
logger.debug("\n*** GOT ****\n")
printrec(recst.decode())
idn = sessionid(recst.decode())
serverports = getPorts("server_port", recst.decode())
clientports = getPorts("client_port", recst.decode())
logger.debug("****")
logger.debug("ip : {},serverports : {}".format( self.ip, serverports))
logger.debug("****")
s1 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s1.bind(("", clientports[0]))
# we open a port that is visible to the whole internet (the empty string "" takes care of that)
s1.settimeout(5)
# if the socket is dead for 5 s., its thrown into trash
# further reading:
# https://wiki.python.org/moin/UdpCommunication
# Now our port is open for receiving shitloads of videodata. Give the camera the PLAY command..
logger.debug("\n*** SENDING PLAY ***\n")
play = setsesid(self.play, idn)
s.send(play.encode())
recst = s.recv(4096)
logger.debug("\n*** GOT ****\n")
printrec(recst.decode())
logger.debug("\n** STRIPPING RTP INFO AND DUMPING INTO FILE **\n")
f = open(video_file, 'wb') # 视频存储文件
t = open(timestamp_file, 'w') # 时间戳文件存储
end_ts = time.time() + delay * 60 # 指定存储时长
while time.time() < end_ts:
recst = s1.recv(4096)
logger.debug(f"read {len(recst)} bytes")
st, ts = digestpacket(recst)
if ts:
t.write(str(ts)+'\n')
logger.debug(f"dumping {len(st)} bytes")
f.write(st)
f.close()
t.close()
# Before closing the sockets, we should give the "TEARDOWN" command via RTSP, but I am feeling lazy today (after googling, wireshark-analyzing, among other-things).
s.close()
s1.close()
if __name__ == '__main__':
s_h = Rtp_H264()
s_h.prepare_connect()
s_h.save_h264()
|