From 5124d089a7854da2b2d22c27278225c8cd70bf5b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=E1=BB=93=20V=C4=83n=20To=C3=A0n?= Date: Tue, 18 Jun 2024 01:41:24 +0700 Subject: [PATCH] Fix: Add save on disk --- muxer.go | 10 ++++-- muxer_server.go | 64 +++++++++++++++++++++++++++++------ pkg/playlist/media.go | 20 ++++++++--- pkg/playlist/media_segment.go | 2 +- pkg/storage/factory_disk.go | 10 +++--- pkg/storage/factory_test.go | 2 +- pkg/storage/file_disk.go | 12 ++++--- 7 files changed, 93 insertions(+), 27 deletions(-) diff --git a/muxer.go b/muxer.go index 25eff9d..620e0ed 100644 --- a/muxer.go +++ b/muxer.go @@ -76,6 +76,10 @@ type Muxer struct { // than saving them on RAM, but allows to preserve RAM. Directory string + // Using Directory to save, and using ram to cache segments. + // This segment has save to disk when close. + IsPlayBack bool + // Deprecated: replaced with SegmentMinDuration SegmentDuration time.Duration // Deprecated: replaced with PartMinDuration @@ -124,7 +128,7 @@ func (m *Muxer) Start() error { default: if m.SegmentCount < 3 { - return fmt.Errorf("The minimum number of HLS segments is 3") + return fmt.Errorf("the minimum number of HLS segments is 3") } } @@ -151,7 +155,7 @@ func (m *Muxer) Start() error { } if m.Directory != "" { - m.storageFactory = storage.NewFactoryDisk(m.Directory) + m.storageFactory = storage.NewFactoryDisk(m.Directory, m.IsPlayBack) } else { m.storageFactory = storage.NewFactoryRAM() } @@ -162,6 +166,8 @@ func (m *Muxer) Start() error { videoTrack: m.VideoTrack, audioTrack: m.AudioTrack, prefix: m.prefix, + isPlayBack: m.IsPlayBack, + playList: "", } m.server.initialize() diff --git a/muxer_server.go b/muxer_server.go index 35c7044..ada0bee 100644 --- a/muxer_server.go +++ b/muxer_server.go @@ -5,6 +5,8 @@ import ( "math" "net/http" "net/url" + "os" + "path" "path/filepath" "strconv" "strings" @@ -294,6 +296,8 @@ func filterOutHLSParams(rawQuery string) string { } func generateMediaPlaylistFMP4( + playList string, + isPlayaBack bool, isDeltaUpdate bool, variant MuxerVariant, segments []muxerSegment, @@ -311,6 +315,8 @@ func generateMediaPlaylistFMP4( Version: 10, TargetDuration: targetDuration, MediaSequence: segmentDeleteCount, + Playlist: playList, + IsPlayBack: isPlayaBack, } if variant == MuxerVariantLowLatency { @@ -432,6 +438,8 @@ func generateMediaPlaylistFMP4( } func generateMediaPlaylist( + playList string, + isPlayBack bool, isDeltaUpdate bool, variant MuxerVariant, segments []muxerSegment, @@ -450,6 +458,8 @@ func generateMediaPlaylist( } return generateMediaPlaylistFMP4( + playList, + isPlayBack, isDeltaUpdate, variant, segments, @@ -479,6 +489,8 @@ type muxerServer struct { nextSegmentParts []*muxerPart nextPartID uint64 init []byte + playList string + isPlayBack bool } func (s *muxerServer) initialize() { @@ -560,7 +572,10 @@ func (s *muxerServer) handle(w http.ResponseWriter, r *http.Request) { case (s.variant != MuxerVariantMPEGTS && strings.HasSuffix(name, ".mp4")) || (s.variant == MuxerVariantMPEGTS && strings.HasSuffix(name, ".ts")): - s.handleSegmentOrPart(name, w) + + dir := path.Dir(r.RequestURI) + identifier := path.Base(dir) + s.handleSegmentOrPart(name, identifier, w) } } @@ -642,6 +657,8 @@ func (s *muxerServer) handleMediaPlaylist( } byts, err := generateMediaPlaylist( + s.playList, + s.isPlayBack, isDeltaUpdate, s.variant, s.segments, @@ -691,6 +708,8 @@ func (s *muxerServer) handleMediaPlaylist( } byts, err := generateMediaPlaylist( + s.playList, + s.isPlayBack, isDeltaUpdate, s.variant, s.segments, @@ -741,23 +760,39 @@ func (s *muxerServer) handleInitFile(w http.ResponseWriter) { w.Write(init) } -func (s *muxerServer) handleSegmentOrPart(fname string, w http.ResponseWriter) { +func localReader(fpath string) (io.ReadCloser, error) { + r, err := os.Open(fpath) + if err != nil { + return nil, err + } + + return r, nil +} + +func (s *muxerServer) handleSegmentOrPart(fname string, key string, w http.ResponseWriter) { switch { case strings.HasPrefix(fname, s.prefix+"_"+"seg"): s.mutex.Lock() segment, ok := s.segmentsByName[fname] s.mutex.Unlock() + var rw io.ReadCloser if !ok { - return + r, err := localReader("./.videos/" + key + "/" + fname) + rw = r + if err != nil { + return + } + } else { + r, err := segment.reader() + rw = r + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } } - r, err := segment.reader() - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } - defer r.Close() + defer rw.Close() w.Header().Set("Cache-Control", "max-age="+segmentMaxAge) w.Header().Set( @@ -770,7 +805,7 @@ func (s *muxerServer) handleSegmentOrPart(fname string, w http.ResponseWriter) { }(), ) w.WriteHeader(http.StatusOK) - io.Copy(w, r) + io.Copy(w, rw) case s.variant == MuxerVariantLowLatency && strings.HasPrefix(fname, s.prefix+"_"+"part"): s.mutex.Lock() @@ -852,6 +887,15 @@ func (s *muxerServer) publishSegmentInner(segment muxerSegment) error { toDelete := s.segments[0] if toDeleteSeg, ok := toDelete.(*muxerSegmentFMP4); ok { + + // Update Playlist + u := toDeleteSeg.name + plse := &playlist.MediaSegment{ + Duration: toDeleteSeg.getDuration(), + URI: u, + DateTime: &toDeleteSeg.startNTP, + } + s.playList += plse.Marshal() for _, part := range toDeleteSeg.parts { delete(s.partsByName, part.getName()) } diff --git a/pkg/playlist/media.go b/pkg/playlist/media.go index b3ee73b..7d986cc 100644 --- a/pkg/playlist/media.go +++ b/pkg/playlist/media.go @@ -80,7 +80,9 @@ type Media struct { Map *MediaMap // #EXT-X-SKIP - Skip *MediaSkip + Skip *MediaSkip + IsPlayBack bool + Playlist string // segments // at least one is required @@ -366,9 +368,13 @@ func (m Media) Marshal() ([]byte, error) { ret += m.PartInf.marshal() } - ret += "#EXT-X-MEDIA-SEQUENCE:" + strconv.FormatInt(int64(m.MediaSequence), 10) + "\n" + start := strconv.FormatInt(int64(m.MediaSequence), 10) + if m.IsPlayBack { + start = "7" + } + ret += "#EXT-X-MEDIA-SEQUENCE:" + start + "\n" - if m.DiscontinuitySequence != nil { + if m.DiscontinuitySequence != nil && !m.IsPlayBack { ret += "#EXT-X-DISCONTINUITY-SEQUENCE:" + strconv.FormatInt(int64(m.MediaSequence), 10) + "\n" } @@ -380,12 +386,16 @@ func (m Media) Marshal() ([]byte, error) { ret += m.Map.marshal() } - if m.Skip != nil { + if m.Skip != nil && !m.IsPlayBack { ret += m.Skip.marshal() } + if m.IsPlayBack { + ret += m.Playlist + } + for _, seg := range m.Segments { - ret += seg.marshal() + ret += seg.Marshal() } for _, part := range m.Parts { diff --git a/pkg/playlist/media_segment.go b/pkg/playlist/media_segment.go index ddbcab0..f1b8370 100644 --- a/pkg/playlist/media_segment.go +++ b/pkg/playlist/media_segment.go @@ -47,7 +47,7 @@ func (s MediaSegment) validate() error { return nil } -func (s MediaSegment) marshal() string { +func (s MediaSegment) Marshal() string { ret := "" if s.DateTime != nil { diff --git a/pkg/storage/factory_disk.go b/pkg/storage/factory_disk.go index 24c6cc2..5f504f7 100644 --- a/pkg/storage/factory_disk.go +++ b/pkg/storage/factory_disk.go @@ -5,17 +5,19 @@ import ( ) type factoryDisk struct { - dirPath string + dirPath string + isPlayBack bool } // NewFactoryDisk allocates a disk-backed factory. -func NewFactoryDisk(dirPath string) Factory { +func NewFactoryDisk(dirPath string, isPlayBack bool) Factory { return &factoryDisk{ - dirPath: dirPath, + dirPath: dirPath, + isPlayBack: isPlayBack, } } // NewFile implements Factory. func (s *factoryDisk) NewFile(fileName string) (File, error) { - return newFileDisk(filepath.Join(s.dirPath, fileName)) + return newFileDisk(filepath.Join(s.dirPath, fileName), s.isPlayBack) } diff --git a/pkg/storage/factory_test.go b/pkg/storage/factory_test.go index 96c719b..b4a399f 100644 --- a/pkg/storage/factory_test.go +++ b/pkg/storage/factory_test.go @@ -25,7 +25,7 @@ func TestStorage(t *testing.T) { require.NoError(t, err) defer os.RemoveAll(dir) - s = NewFactoryDisk(dir) + s = NewFactoryDisk(dir, false) } seg, err := s.NewFile("myseg.mp4") diff --git a/pkg/storage/file_disk.go b/pkg/storage/file_disk.go index 711c0e6..30a3b21 100644 --- a/pkg/storage/file_disk.go +++ b/pkg/storage/file_disk.go @@ -7,21 +7,23 @@ import ( ) type fileDisk struct { + noRemove bool fpath string f *os.File parts []*partDisk finalSize uint64 } -func newFileDisk(fpath string) (File, error) { +func newFileDisk(fpath string, noRemove bool) (File, error) { f, err := os.Create(fpath) if err != nil { return nil, err } return &fileDisk{ - fpath: fpath, - f: f, + noRemove: noRemove, + fpath: fpath, + f: f, }, nil } @@ -47,7 +49,9 @@ func (s *fileDisk) Finalize() { // Remove implements File. func (s *fileDisk) Remove() { - os.Remove(s.fpath) + if !s.noRemove { + os.Remove(s.fpath) + } } // NewPart implements File.