Skip to content

Commit

Permalink
Started implementing proper-close for sockets. The main use-case (sim…
Browse files Browse the repository at this point in the history
…ply close the ftp server without having a running data transfer) should already lead to proper results. However, there will be some memory leaks.
  • Loading branch information
FlorianReimold committed Oct 24, 2024
1 parent 6588420 commit 066fdaa
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 52 deletions.
40 changes: 24 additions & 16 deletions fineftp-server/src/ftp_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,38 @@ namespace fineftp
}

FtpSession::~FtpSession()
{
stop();
completion_handler_();
}

void FtpSession::start()
{
asio::error_code ec;
command_socket_.set_option(asio::ip::tcp::no_delay(true), ec);
if (ec) std::cerr << "Unable to set socket option tcp::no_delay: " << ec.message() << std::endl;

command_strand_.post([me = shared_from_this()]() { me->readFtpCommand(); });
sendFtpMessage(FtpMessage(FtpReplyCode::SERVICE_READY_FOR_NEW_USER, "Welcome to fineFTP Server"));
}

// TODO: I need to check if the command socket AND data socket is always shutdown and closed.
void FtpSession::stop()
{
#ifndef NDEBUG
// TODO: Have a "is stopped" variable, so that this log message is not printed every time
std::cout << "Ftp Session shutting down" << std::endl;
#endif // !NDEBUG

// TODO: protect the two sockets with mutexes, as it is now possible to call stop() from another thread!!!

{
// Properly close command socket.
// When the FtpSession is being destroyed, there are no std::shared_ptr's referring to
// it and hence no possibility of race conditions on command_socket_.
asio::error_code ec;
command_socket_.shutdown(asio::ip::tcp::socket::shutdown_both, ec);
command_socket_.close(ec);
command_socket_.shutdown(asio::ip::tcp::socket::shutdown_both, ec); // NOLINT(bugprone-unused-return-value) -> We already get the value from the ec parameter
command_socket_.close(ec); // NOLINT(bugprone-unused-return-value) -> We already get the value from the ec parameter
}

// When the FtpSession is being destroyed, there are no std::shared_ptr's referring to
Expand All @@ -74,21 +94,9 @@ namespace fineftp
{
// Properly close data socket
asio::error_code ec;
data_socket->shutdown(asio::ip::tcp::socket::shutdown_both, ec);
data_socket->close(ec);
data_socket->shutdown(asio::ip::tcp::socket::shutdown_both, ec); // NOLINT(bugprone-unused-return-value) -> We already get the value from the ec parameter
data_socket->close(ec); // NOLINT(bugprone-unused-return-value) -> We already get the value from the ec parameter
}

completion_handler_();
}

void FtpSession::start()
{
asio::error_code ec;
command_socket_.set_option(asio::ip::tcp::no_delay(true), ec);
if (ec) std::cerr << "Unable to set socket option tcp::no_delay: " << ec.message() << std::endl;

command_strand_.post([me = shared_from_this()]() { me->readFtpCommand(); });
sendFtpMessage(FtpMessage(FtpReplyCode::SERVICE_READY_FOR_NEW_USER, "Welcome to fineFTP Server"));
}

asio::ip::tcp::socket& FtpSession::getSocket()
Expand Down
1 change: 1 addition & 0 deletions fineftp-server/src/ftp_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ namespace fineftp
~FtpSession();

void start();
void stop();

asio::ip::tcp::socket& getSocket();

Expand Down
103 changes: 70 additions & 33 deletions fineftp-server/src/server_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ namespace fineftp
: port_ (port)
, address_ (address)
, acceptor_ (io_service_)
, open_connection_count_(0)
{}

FtpServerImpl::~FtpServerImpl()
Expand All @@ -40,8 +39,6 @@ namespace fineftp

bool FtpServerImpl::start(size_t thread_count)
{
auto ftp_session = std::make_shared<FtpSession>(io_service_, ftp_users_, [this]() { open_connection_count_--; });

// set up the acceptor to listen on the tcp port
asio::error_code make_address_ec;
const asio::ip::tcp::endpoint endpoint(asio::ip::make_address(address_, make_address_ec), port_);
Expand All @@ -52,6 +49,8 @@ namespace fineftp
}

{
const std::lock_guard<std::mutex> acceptor_lock(acceptor_mutex_);

asio::error_code ec;
acceptor_.open(endpoint.protocol(), ec);
if (ec)
Expand All @@ -62,6 +61,8 @@ namespace fineftp
}

{
const std::lock_guard<std::mutex> acceptor_lock(acceptor_mutex_);

asio::error_code ec;
acceptor_.set_option(asio::ip::tcp::acceptor::reuse_address(true), ec);
if (ec)
Expand All @@ -72,6 +73,8 @@ namespace fineftp
}

{
const std::lock_guard<std::mutex> acceptor_lock(acceptor_mutex_);

asio::error_code ec;
acceptor_.bind(endpoint, ec);
if (ec)
Expand All @@ -82,6 +85,8 @@ namespace fineftp
}

{
const std::lock_guard<std::mutex> acceptor_lock(acceptor_mutex_);

asio::error_code ec;
acceptor_.listen(asio::socket_base::max_listen_connections, ec);
if (ec)
Expand All @@ -92,16 +97,13 @@ namespace fineftp
}

#ifndef NDEBUG
std::cout << "FTP Server created." << std::endl << "Listening at address " << acceptor_.local_endpoint().address() << " on port " << acceptor_.local_endpoint().port() << ":" << std::endl;
{
const std::lock_guard<std::mutex> acceptor_lock(acceptor_mutex_);
std::cout << "FTP Server created." << std::endl << "Listening at address " << acceptor_.local_endpoint().address() << " on port " << acceptor_.local_endpoint().port() << ":" << std::endl;
}
#endif // NDEBUG

acceptor_.async_accept(ftp_session->getSocket()
, [this, ftp_session](auto ec)
{
open_connection_count_++;

acceptFtpSession(ftp_session, ec);
});
waitForNextFtpSession();

for (size_t i = 0; i < thread_count; i++)
{
Expand All @@ -113,52 +115,87 @@ namespace fineftp

void FtpServerImpl::stop()
{
io_service_.stop();
for (std::thread& thread : thread_pool_)
// Prevent new sessions from being created
{
thread.join();
const std::lock_guard<std::mutex> acceptor_lock(acceptor_mutex_);

// Close acceptor, if necessary
if (acceptor_.is_open())
{
asio::error_code ec;
acceptor_.close(ec); // NOLINT(bugprone-unused-return-value) -> We already get the return value rom the ec parameter
}
}

// Stop all sessions
{
const std::lock_guard<std::mutex> session_list_lock(session_list_mutex_);
for(const auto& session_weak : session_list_)
{
const auto session = session_weak.lock();
if (session)
session->stop();
}
}

// Wait for the io_context to run out of work by joining all threads
{
for (std::thread& thread : thread_pool_)
{
thread.join();
}
thread_pool_.clear();
}
thread_pool_.clear();
}

void FtpServerImpl::acceptFtpSession(const std::shared_ptr<FtpSession>& ftp_session, asio::error_code const& error)
void FtpServerImpl::waitForNextFtpSession()
{
if (error)
// TODO: create proper shutdown callback as lambda

auto shutdown_callback = [this]() { };

auto new_ftp_session = std::make_shared<FtpSession>(io_service_, ftp_users_, shutdown_callback);

{
#ifndef NDEBUG
std::cerr << "Error handling connection: " << error.message() << std::endl;
#endif
return;
}
const std::lock_guard<std::mutex> acceptor_lock(acceptor_mutex_);
acceptor_.async_accept(new_ftp_session->getSocket()
, [this, new_ftp_session](asio::error_code ec) // TODO: replace this with weak ptr to this
{
if (ec)
{
std::cerr << "Error accepting connection: " << ec.message() << std::endl;
return;
}

#ifndef NDEBUG
std::cout << "FTP Client connected: " << ftp_session->getSocket().remote_endpoint().address().to_string() << ":" << ftp_session->getSocket().remote_endpoint().port() << std::endl;
std::cout << "FTP Client connected: " << new_ftp_session->getSocket().remote_endpoint().address().to_string() << ":" << new_ftp_session->getSocket().remote_endpoint().port() << std::endl;
#endif
const std::lock_guard<std::mutex> session_list_lock(this->session_list_mutex_);
this->session_list_.push_back(new_ftp_session);

ftp_session->start();

auto new_session = std::make_shared<FtpSession>(io_service_, ftp_users_, [this]() { open_connection_count_--; });
new_ftp_session->start();

acceptor_.async_accept(new_session->getSocket()
, [this, new_session](auto ec)
{
open_connection_count_++;
acceptFtpSession(new_session, ec);
});
waitForNextFtpSession();
});
}
}

int FtpServerImpl::getOpenConnectionCount()
{
return open_connection_count_;
const std::lock_guard<std::mutex> session_list_lock(session_list_mutex_);
// TODO: 2024-10-23: Check if closed sessions can be in this list and if I need to iterate over this list to count the open ones, only
return session_list_.size();
}

uint16_t FtpServerImpl::getPort()
{
const std::lock_guard<std::mutex> acceptor_lock(acceptor_mutex_);
return acceptor_.local_endpoint().port();
}

std::string FtpServerImpl::getAddress()
{
const std::lock_guard<std::mutex> acceptor_lock(acceptor_mutex_);
return acceptor_.local_endpoint().address().to_string();
}
}
10 changes: 7 additions & 3 deletions fineftp-server/src/server_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <cstddef>
#include <cstdint>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <vector>
Expand Down Expand Up @@ -46,7 +47,7 @@ namespace fineftp
std::string getAddress();

private:
void acceptFtpSession(const std::shared_ptr<FtpSession>& ftp_session, asio::error_code const& error);
void waitForNextFtpSession();

private:
UserDatabase ftp_users_;
Expand All @@ -56,8 +57,11 @@ namespace fineftp

std::vector<std::thread> thread_pool_;
asio::io_service io_service_;
asio::ip::tcp::acceptor acceptor_;

std::atomic<int> open_connection_count_;
mutable std::mutex acceptor_mutex_; //!< Mutex protecting the acceptor. That is necessary, as the user may stop the server (and therefore close the acceptor) from another thread.
asio::ip::tcp::acceptor acceptor_; //!< The acceptor waiting for new sessions

mutable std::mutex session_list_mutex_; //!< Mutex protecting the list of current sessions
std::vector<std::weak_ptr<FtpSession>> session_list_; //!< List of sessions. Only store weak_ptr, so the sessions can delete themselves. This list is used to stop sessions and count connections
};
}

0 comments on commit 066fdaa

Please sign in to comment.