Skip to content

Commit

Permalink
Dynamically clean up unused sessions in HLS
Browse files Browse the repository at this point in the history
  • Loading branch information
getroot committed Nov 1, 2024
1 parent 96188e1 commit 484cd35
Show file tree
Hide file tree
Showing 16 changed files with 53 additions and 21 deletions.
2 changes: 1 addition & 1 deletion src/projects/base/common_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ static ov::String StringFromPublisherType(const PublisherType &type)
case PublisherType::Thumbnail:
return "Thumbnail";
case PublisherType::Hls:
return "TS";
return "HLSv3";
}

return "Unknown";
Expand Down
2 changes: 1 addition & 1 deletion src/projects/base/info/stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ namespace info
return _id;
}

ov::String Stream::GetUri()
ov::String Stream::GetUri() const
{
// #vhost name#appname/stream name
ov::String vhost_app_name = _app_info != nullptr ? _app_info->GetVHostAppName().CStr() : "Unknown";
Expand Down
2 changes: 1 addition & 1 deletion src/projects/base/info/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace info
info::stream_id_t GetId() const;

// Get Stream Resource ID in ovenmediaengine (vhost#app/stream)
ov::String GetUri();
ov::String GetUri() const;

void SetMsid(uint32_t);
uint32_t GetMsid();
Expand Down
14 changes: 12 additions & 2 deletions src/projects/modules/http/server/http_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,16 @@ namespace http
{
_interceptor = interceptor;
}

// If there is no interceptor in _need_to_close_interceptors, add it
if (interceptor != nullptr)
{
auto found = std::find(_need_to_close_interceptors.begin(), _need_to_close_interceptors.end(), interceptor);
if (found == _need_to_close_interceptors.end())
{
_need_to_close_interceptors.push_back(interceptor);
}
}
}

// Find interceptor from server
Expand All @@ -241,9 +251,9 @@ namespace http
return;
}

if (_interceptor != nullptr)
for (auto &interceptor : _need_to_close_interceptors)
{
_interceptor->OnClosed(GetSharedPtr(), reason);
interceptor->OnClosed(GetSharedPtr(), reason);
}

if (_http_transaction != nullptr)
Expand Down
1 change: 1 addition & 0 deletions src/projects/modules/http/server/http_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ namespace http
std::shared_ptr<prot::ws::Frame> _websocket_frame = nullptr;

std::shared_ptr<RequestInterceptor> _interceptor = nullptr;
std::vector<std::shared_ptr<RequestInterceptor>> _need_to_close_interceptors;

std::recursive_mutex _close_mutex;
bool _closed = false;
Expand Down
2 changes: 1 addition & 1 deletion src/projects/publishers/hls/hls_application.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,5 @@ class HlsApplication : public pub::Application
bool DeleteStream(const std::shared_ptr<info::Stream> &info) override;

http::CorsManager _cors_manager;
bool _origin_mode = false;
bool _origin_mode = true;
};
6 changes: 2 additions & 4 deletions src/projects/publishers/hls/hls_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ std::shared_ptr<TsHttpInterceptor> HlsPublisher::CreateInterceptor()
try
{
// If this connection has been used by another session in the past, it is reused.
session = std::any_cast<std::shared_ptr<HlsSession>>(connection->GetUserData(stream->GetUri()));
session = std::any_cast<std::shared_ptr<HlsSession>>(connection->GetUserData(stream->GetStreamId()));
}
catch (const std::bad_any_cast &e)
{
Expand Down Expand Up @@ -506,7 +506,7 @@ std::shared_ptr<TsHttpInterceptor> HlsPublisher::CreateInterceptor()
}

// It will be used in CloseHandler
connection->AddUserData(stream->GetUri(), session);
connection->AddUserData(stream->GetStreamId(), session);
session->UpdateLastRequest(connection->GetId());

// Cors Setting
Expand Down Expand Up @@ -555,8 +555,6 @@ std::shared_ptr<TsHttpInterceptor> HlsPublisher::CreateInterceptor()
stream->RemoveSession(session->GetId());
}
}

session->Stop();
}
}
});
Expand Down
2 changes: 1 addition & 1 deletion src/projects/publishers/hls/hls_publisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class HlsPublisher : public pub::Publisher
}
const char *GetPublisherName() const override
{
return "HLS Publisher";
return "HLSv3 Publisher";
}

bool OnCreateHost(const info::Host &host_info) override;
Expand Down
2 changes: 2 additions & 0 deletions src/projects/publishers/hls/hls_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ HlsSession::HlsSession(const info::Session &session_info,
}

logtd("TsSession::TsSession (%d)", session_info.GetId());
MonitorInstance->OnSessionConnected(*stream, PublisherType::Hls);
_number_of_players = 1;
}

HlsSession::~HlsSession()
Expand Down
5 changes: 5 additions & 0 deletions src/projects/publishers/hls/hls_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ HlsStream::~HlsStream()
logtd("TsStream(%s/%s) has been terminated finally", GetApplicationName(), GetName().CStr());
}

ov::String HlsStream::GetStreamId() const
{
return ov::String::FormatString("hlsv3/%s", GetUri().CStr());
}

bool HlsStream::Start()
{
if (GetState() != State::CREATED)
Expand Down
2 changes: 2 additions & 0 deletions src/projects/publishers/hls/hls_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ class HlsStream : public pub::Stream, public mpegts::PackagerSink
std::tuple<RequestResult, std::shared_ptr<const ov::Data>> GetMediaPlaylistData(const ov::String &variant_name, bool rewind);
std::tuple<RequestResult, std::shared_ptr<const ov::Data>> GetSegmentData(const ov::String &variant_name, uint32_t number);

ov::String GetStreamId() const;

private:
bool Start() override;
bool Stop() override;
Expand Down
6 changes: 2 additions & 4 deletions src/projects/publishers/llhls/llhls_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ std::shared_ptr<LLHlsHttpInterceptor> LLHlsPublisher::CreateInterceptor()
try
{
// If this connection has been used by another session in the past, it is reused.
session = std::any_cast<std::shared_ptr<LLHlsSession>>(connection->GetUserData(stream->GetUri()));
session = std::any_cast<std::shared_ptr<LLHlsSession>>(connection->GetUserData(stream->GetStreamId()));
}
catch (const std::bad_any_cast &e)
{
Expand Down Expand Up @@ -501,7 +501,7 @@ std::shared_ptr<LLHlsHttpInterceptor> LLHlsPublisher::CreateInterceptor()
}

// It will be used in CloseHandler
connection->AddUserData(stream->GetUri(), session);
connection->AddUserData(stream->GetStreamId(), session);
session->UpdateLastRequest(connection->GetId());

// Cors Setting
Expand Down Expand Up @@ -545,8 +545,6 @@ std::shared_ptr<LLHlsHttpInterceptor> LLHlsPublisher::CreateInterceptor()
stream->RemoveSession(session->GetId());
}
}

session->Stop();
}
}
});
Expand Down
20 changes: 15 additions & 5 deletions src/projects/publishers/llhls/llhls_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ LLHlsSession::LLHlsSession(const info::Session &session_info,
_session_key = ov::Random::GenerateString(8);
}

if (_origin_mode == true)
{
MonitorInstance->OnSessionConnected(*stream, PublisherType::LLHls);
_number_of_players = 1;
}

logtd("LLHlsSession::LLHlsSession (%d)", session_info.GetId());
}

Expand Down Expand Up @@ -85,6 +91,8 @@ bool LLHlsSession::Start()

bool LLHlsSession::Stop()
{
logtd("LLHlsSession(%u) : Pending request size(%d)", GetId(), _pending_requests.size());

return Session::Stop();
}

Expand Down Expand Up @@ -187,8 +195,6 @@ void LLHlsSession::OnMessageReceived(const std::any &message)
return;
}

logtd("LLHlsSession::OnMessageReceived(%u) - %s", GetId(), exchange->ToString().CStr());

auto request = exchange->GetRequest();
auto request_uri = request->GetParsedUri();

Expand Down Expand Up @@ -472,8 +478,11 @@ void LLHlsSession::ResponsePlaylist(const std::shared_ptr<http::svr::HttpExchang

response->AppendData(playlist);

MonitorInstance->OnSessionConnected(*GetStream(), PublisherType::LLHls);
_number_of_players += 1;
if (_origin_mode == false)
{
MonitorInstance->OnSessionConnected(*GetStream(), PublisherType::LLHls);
_number_of_players += 1;
}
}
else if (result == LLHlsStream::RequestResult::Accepted && holdIfAccepted == true)
{
Expand Down Expand Up @@ -574,7 +583,7 @@ void LLHlsSession::ResponseChunklist(const std::shared_ptr<http::svr::HttpExchan
response->AppendData(chunklist);

// If a client uses previously cached llhls.m3u8 and requests chunklist
if (_number_of_players == 0)
if (_origin_mode == false && _number_of_players == 0)
{
MonitorInstance->OnSessionConnected(*GetStream(), PublisherType::LLHls);
_number_of_players += 1;
Expand Down Expand Up @@ -788,6 +797,7 @@ void LLHlsSession::OnPlaylistUpdated(const int32_t &track_id, const int64_t &msn
{
logtd("LLHlsSession::OnPlaylistUpdated track_id: %d, msn: %lld, part: %lld", track_id, msn, part);
// Find the pending request

auto it = _pending_requests.begin();
while (it != _pending_requests.end())
{
Expand Down
1 change: 0 additions & 1 deletion src/projects/publishers/llhls/llhls_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ class LLHlsSession : public pub::Session

bool AddPendingRequest(const std::shared_ptr<http::svr::HttpExchange> &exchange, const RequestType &type, const ov::String &file_name, const int32_t &track_id, const int64_t &segment_number, const int64_t &partial_number, const bool &skip, const bool &legacy, const bool &rewind);

// Session runs on a single thread, so it doesn't need mutex
std::list<PendingRequest> _pending_requests;

// ID list of connections requesting this session
Expand Down
5 changes: 5 additions & 0 deletions src/projects/publishers/llhls/llhls_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ LLHlsStream::~LLHlsStream()
logtd("LLHlsStream(%s/%s) has been terminated finally", GetApplicationName(), GetName().CStr());
}

ov::String LLHlsStream::GetStreamId() const
{
return ov::String::FormatString("llhls/%s", GetUri().CStr());
}

bool LLHlsStream::Start()
{
if (GetState() != State::CREATED)
Expand Down
2 changes: 2 additions & 0 deletions src/projects/publishers/llhls/llhls_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ class LLHlsStream : public pub::Stream, public bmff::FMp4StorageObserver
explicit LLHlsStream(const std::shared_ptr<pub::Application> application, const info::Stream &info, uint32_t worker_count);
~LLHlsStream() final;

ov::String GetStreamId() const;

void SendVideoFrame(const std::shared_ptr<MediaPacket> &media_packet) override;
void SendAudioFrame(const std::shared_ptr<MediaPacket> &media_packet) override;
void SendDataFrame(const std::shared_ptr<MediaPacket> &media_packet) override;
Expand Down

0 comments on commit 484cd35

Please sign in to comment.