Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/align pts2 webrtc #150

Merged
merged 5 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 27 additions & 9 deletions machinery/src/capture/gortsplib.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,8 +409,9 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets
// called when a MULAW audio RTP packet arrives
if g.AudioG711Media != nil && g.AudioG711Forma != nil {
g.Client.OnPacketRTP(g.AudioG711Media, g.AudioG711Forma, func(rtppkt *rtp.Packet) {
// decode timestamp
pts, ok := g.Client.PacketPTS(g.AudioG711Media, rtppkt)
// decode timestamp
pts2, ok := g.Client.PacketPTS2(g.AudioG711Media, rtppkt)
if !ok {
log.Log.Debug("capture.golibrtsp.Start(): " + "unable to get PTS")
return
Expand All @@ -427,8 +428,9 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets
IsKeyFrame: false,
Packet: rtppkt,
Data: op,
Time: pts,
CompositionTime: pts,
Time: pts2,
TimeLegacy: pts,
CompositionTime: pts2,
Idx: g.AudioG711Index,
IsVideo: false,
IsAudio: true,
Expand All @@ -443,6 +445,7 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets
g.Client.OnPacketRTP(g.AudioMPEG4Media, g.AudioMPEG4Forma, func(rtppkt *rtp.Packet) {
// decode timestamp
pts, ok := g.Client.PacketPTS(g.AudioMPEG4Media, rtppkt)
pts2, ok := g.Client.PacketPTS2(g.AudioMPEG4Media, rtppkt)
if !ok {
log.Log.Error("capture.golibrtsp.Start(): " + "unable to get PTS")
return
Expand All @@ -466,8 +469,9 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets
IsKeyFrame: false,
Packet: rtppkt,
Data: enc,
Time: pts,
CompositionTime: pts,
Time: pts2,
TimeLegacy: pts,
CompositionTime: pts2,
Idx: g.AudioG711Index,
IsVideo: false,
IsAudio: true,
Expand All @@ -480,6 +484,9 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets
// called when a video RTP packet arrives for H264
var filteredAU [][]byte
if g.VideoH264Media != nil && g.VideoH264Forma != nil {

dtsExtractor := h264.NewDTSExtractor2()

g.Client.OnPacketRTP(g.VideoH264Media, g.VideoH264Forma, func(rtppkt *rtp.Packet) {

// This will check if we need to stop the thread,
Expand All @@ -494,6 +501,7 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets

// decode timestamp
pts, ok := g.Client.PacketPTS(g.VideoH264Media, rtppkt)
pts2, ok := g.Client.PacketPTS2(g.VideoH264Media, rtppkt)
if !ok {
log.Log.Debug("capture.golibrtsp.Start(): " + "unable to get PTS")
return
Expand Down Expand Up @@ -571,12 +579,20 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets
return
}

// Extract DTS from RTP packets
dts2, err := dtsExtractor.Extract(filteredAU, pts2)
if err != nil {
log.Log.Error("capture.golibrtsp.Start(): " + err.Error())
return
}

pkt := packets.Packet{
IsKeyFrame: idrPresent,
Packet: rtppkt,
Data: enc,
Time: pts,
CompositionTime: pts,
Time: pts2,
TimeLegacy: pts,
CompositionTime: dts2,
Idx: g.VideoH264Index,
IsVideo: true,
IsAudio: false,
Expand Down Expand Up @@ -639,6 +655,7 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets

// decode timestamp
pts, ok := g.Client.PacketPTS(g.VideoH265Media, rtppkt)
pts2, ok := g.Client.PacketPTS2(g.VideoH265Media, rtppkt)
if !ok {
log.Log.Debug("capture.golibrtsp.Start(): " + "unable to get PTS")
return
Expand Down Expand Up @@ -702,8 +719,9 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets
IsKeyFrame: isRandomAccess,
Packet: rtppkt,
Data: enc,
Time: pts,
CompositionTime: pts,
Time: pts2,
TimeLegacy: pts,
CompositionTime: pts2,
Idx: g.VideoH265Index,
IsVideo: true,
IsAudio: false,
Expand Down
15 changes: 9 additions & 6 deletions machinery/src/capture/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
nextPkt.IsKeyFrame && (timestamp+recordingPeriod-now <= 0 || now-startRecording >= maxRecordingPeriod) {

// Write the last packet
ttime := convertPTS(pkt.Time)
ttime := convertPTS(pkt.TimeLegacy)
if pkt.IsVideo {
if err := myMuxer.Write(videoTrack, pkt.Data, ttime, ttime); err != nil {
log.Log.Error("capture.main.HandleRecordStream(continuous): " + err.Error())
Expand Down Expand Up @@ -242,7 +242,7 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
log.Log.Error("capture.main.HandleRecordStream(continuous): " + err.Error())
}

ttime := convertPTS(pkt.Time)
ttime := convertPTS(pkt.TimeLegacy)
if pkt.IsVideo {
if err := myMuxer.Write(videoTrack, pkt.Data, ttime, ttime); err != nil {
log.Log.Error("capture.main.HandleRecordStream(continuous): " + err.Error())
Expand All @@ -261,7 +261,7 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
recordingStatus = "started"

} else if start {
ttime := convertPTS(pkt.Time)
ttime := convertPTS(pkt.TimeLegacy)
if pkt.IsVideo {
if err := myMuxer.Write(videoTrack, pkt.Data, ttime, ttime); err != nil {
log.Log.Error("capture.main.HandleRecordStream(continuous): " + err.Error())
Expand Down Expand Up @@ -337,7 +337,7 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat

log.Log.Info("capture.main.HandleRecordStream(motiondetection): Start motion based recording ")

var lastDuration time.Duration
var lastDuration int64
var lastRecordingTime int64

//var cws *cacheWriterSeeker
Expand Down Expand Up @@ -444,8 +444,7 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
start = true
}
if start {

ttime := convertPTS(pkt.Time)
ttime := convertPTS(pkt.TimeLegacy)
if pkt.IsVideo {
if err := myMuxer.Write(videoTrack, pkt.Data, ttime, ttime); err != nil {
log.Log.Error("capture.main.HandleRecordStream(motiondetection): " + err.Error())
Expand Down Expand Up @@ -695,3 +694,7 @@ func JpegImage(captureDevice *Capture, communication *models.Communication) imag
func convertPTS(v time.Duration) uint64 {
return uint64(v.Milliseconds())
}

func convertPTS2(v int64) uint64 {
return uint64(v) / 100
}
2 changes: 1 addition & 1 deletion machinery/src/components/Kerberos.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu
if subStreamEnabled && rtspSubClient != nil {
subQueue = packets.NewQueue()
communication.SubQueue = subQueue
subQueue.SetMaxGopCount(1) // GOP time frame is set to prerecording (we'll add 2 gops to leave some room).
subQueue.SetMaxGopCount(3) // GOP time frame is set to prerecording (we'll add 2 gops to leave some room).
subQueue.WriteHeader(videoSubStreams)
go rtspSubClient.Start(context.Background(), "sub", subQueue, configuration, communication)

Expand Down
17 changes: 9 additions & 8 deletions machinery/src/packets/packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ import (
// Packet represents an RTP Packet
type Packet struct {
Packet *rtp.Packet
IsAudio bool // packet is audio
IsVideo bool // packet is video
IsKeyFrame bool // video packet is key frame
Idx int8 // stream index in container format
Codec string // codec name
CompositionTime time.Duration // packet presentation time minus decode time for H264 B-Frame
Time time.Duration // packet decode time
Data []byte // packet data
IsAudio bool // packet is audio
IsVideo bool // packet is video
IsKeyFrame bool // video packet is key frame
Idx int8 // stream index in container format
Codec string // codec name
CompositionTime int64 // packet presentation time minus decode time for H264 B-Frame
Time int64 // packet decode time
TimeLegacy time.Duration
Data []byte // packet data
}
3 changes: 1 addition & 2 deletions machinery/src/packets/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package packets
import (
"io"
"sync"
"time"
)

// time
Expand Down Expand Up @@ -145,7 +144,7 @@ func (self *Queue) Oldest() *QueueCursor {
}

// Create cursor position at specific time in buffered packets.
func (self *Queue) DelayedTime(dur time.Duration) *QueueCursor {
func (self *Queue) DelayedTime(dur int64) *QueueCursor {
cursor := self.newCursor()
cursor.init = func(buf *Buf, videoidx int) BufPos {
i := buf.Tail - 1
Expand Down
18 changes: 10 additions & 8 deletions machinery/src/webrtc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,8 +341,8 @@ func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.C

var cursorError error
var pkt packets.Packet
var previousTimeVideo time.Duration
var previousTimeAudio time.Duration
//var previousTimeVideo int64
//var previousTimeAudio int64

start := false
receivedKeyFrame := false
Expand Down Expand Up @@ -401,15 +401,16 @@ func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.C
if pkt.IsVideo {

// Calculate the difference
bufferDuration := pkt.Time - previousTimeVideo
previousTimeVideo = pkt.Time
//bufferDuration := pkt.Time - previousTimeVideo
//previousTimeVideo = pkt.Time

// Start at the first keyframe
if pkt.IsKeyFrame {
start = true
}
if start {
sample := pionMedia.Sample{Data: pkt.Data, Duration: bufferDuration}
//bufferDurationCasted := time.Duration(bufferDuration) * time.Millisecond
sample := pionMedia.Sample{Data: pkt.Data, PacketTimestamp: uint32(pkt.Time)}
if config.Capture.ForwardWebRTC == "true" {
// We will send the video to a remote peer
// TODO..
Expand All @@ -432,11 +433,12 @@ func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.C
}

// Calculate the difference
bufferDuration := pkt.Time - previousTimeAudio
previousTimeAudio = pkt.Time
//bufferDuration := pkt.Time - previousTimeAudio
//previousTimeAudio = pkt.Time

// We will send the audio
sample := pionMedia.Sample{Data: pkt.Data, Duration: bufferDuration}
//bufferDurationCasted := time.Duration(bufferDuration) * time.Millisecond
sample := pionMedia.Sample{Data: pkt.Data, PacketTimestamp: uint32(pkt.Time)}
if err := audioTrack.WriteSample(sample); err != nil && err != io.ErrClosedPipe {
log.Log.Error("webrtc.main.WriteToTrack(): something went wrong while writing sample: " + err.Error())
}
Expand Down
Loading