Skip to content

Commit

Permalink
Eliminate some race conditions
Browse files Browse the repository at this point in the history
  • Loading branch information
Bjarne Juul Pasgaard committed Oct 23, 2023
1 parent c1074b0 commit 87bce95
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 85 deletions.
171 changes: 89 additions & 82 deletions fineftp-server/src/ftp_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ namespace fineftp
, command_strand_ (io_service)
, data_type_binary_ (false)
, data_acceptor_ (io_service)
, data_buffer_strand_ (io_service)
, file_rw_strand_ (io_service)
, data_socket_strand_ (io_service)
, ftp_working_directory_("/")
{
}
Expand All @@ -42,18 +41,20 @@ namespace fineftp
#endif // !NDEBUG

{
// Properly close command socket
// TODO: Protect command socket with a mutex, as it may be accessed by multiple threads
// Properly close command socket.
// When the FtpSession is being destroyed, there are no std::shared_ptr's referring to
// it and hence no possibility of race conditions on command_socket_.
asio::error_code ec;
command_socket_.shutdown(asio::ip::tcp::socket::shutdown_both, ec);
command_socket_.close(ec);
}

// When the FtpSession is being destroyed, there are no std::shared_ptr's referring to
// it and hence no possibility of race conditions on data_socket_weak_ptr_.
auto data_socket = data_socket_weakptr_.lock();
if (data_socket)
{
// Properly close data socket
// TODO: Protect data socket with a mutex, as it may be accessed by multiple threads
asio::error_code ec;
data_socket->shutdown(asio::ip::tcp::socket::shutdown_both, ec);
data_socket->close(ec);
Expand All @@ -68,8 +69,8 @@ namespace fineftp
command_socket_.set_option(asio::ip::tcp::no_delay(true), ec);
if (ec) std::cerr << "Unable to set socket option tcp::no_delay: " << ec.message() << std::endl;

command_strand_.post([me = shared_from_this()]() { me->readFtpCommand(); });
sendFtpMessage(FtpMessage(FtpReplyCode::SERVICE_READY_FOR_NEW_USER, "Welcome to fineFTP Server"));
readFtpCommand();
}

asio::ip::tcp::socket& FtpSession::getSocket()
Expand All @@ -89,14 +90,14 @@ namespace fineftp
void FtpSession::sendRawFtpMessage(const std::string& raw_message)
{
command_strand_.post([me = shared_from_this(), raw_message]()
{
const bool write_in_progress = !me->command_output_queue_.empty();
me->command_output_queue_.push_back(raw_message);
if (!write_in_progress)
{
me->startSendingMessages();
}
});
{
const bool write_in_progress = !me->command_output_queue_.empty();
me->command_output_queue_.push_back(raw_message);
if (!write_in_progress)
{
me->startSendingMessages();
}
});
}

void FtpSession::startSendingMessages()
Expand Down Expand Up @@ -150,14 +151,15 @@ namespace fineftp
me->data_acceptor_.close(ec_);
}

me->data_socket_strand_.post([me]()
{
auto data_socket = me->data_socket_weakptr_.lock();
if (data_socket)
{
{
asio::error_code ec_;
data_socket->close(ec_);
}
}
});

return;
}
Expand Down Expand Up @@ -250,10 +252,9 @@ namespace fineftp
if (last_command_ == "QUIT")
{
// Close command socket
command_strand_.wrap([me = shared_from_this()]()
command_strand_.post([me = shared_from_this()]()
{
// Properly close command socket
// TODO: Protect command socket with a mutex, as it may be accessed by multiple threads
asio::error_code ec;
me->command_socket_.shutdown(asio::ip::tcp::socket::shutdown_both, ec);
me->command_socket_.close(ec);
Expand Down Expand Up @@ -1145,17 +1146,19 @@ namespace fineftp
void FtpSession::sendDirectoryListing(const std::map<std::string, Filesystem::FileStatus>& directory_content)
{
auto data_socket = std::make_shared<asio::ip::tcp::socket>(io_service_);
data_socket_weakptr_ = data_socket;

data_acceptor_.async_accept(*data_socket
, [data_socket, directory_content, me = shared_from_this()](auto ec)
, data_socket_strand_.wrap([data_socket, directory_content, me = shared_from_this()](auto ec)
{
if (ec)
{
me->sendFtpMessage(FtpReplyCode::TRANSFER_ABORTED, "Data transfer aborted: " + ec.message());
return;
}
// TODO: close acceptor after connect?

me->data_socket_weakptr_ = data_socket;

// TODO: close acceptor after connect?
// Create a Unix-like file list
std::stringstream stream; // NOLINT(misc-const-correctness) Reason: False detection, this cannot be made const
for (const auto& entry : directory_content)
Expand All @@ -1180,22 +1183,23 @@ namespace fineftp
// Send the string out
me->addDataToBufferAndSend(dir_listing_rawdata, data_socket);
me->addDataToBufferAndSend(std::shared_ptr<std::vector<char>>(), data_socket);// Nullpointer indicates end of transmission
});
}));
}

void FtpSession::sendNameList(const std::map<std::string, Filesystem::FileStatus>& directory_content)
{
auto data_socket = std::make_shared<asio::ip::tcp::socket>(io_service_);
data_socket_weakptr_ = data_socket;

data_acceptor_.async_accept(*data_socket
, [data_socket, directory_content, me = shared_from_this()](auto ec)
, data_socket_strand_.wrap([data_socket, directory_content, me = shared_from_this()](auto ec)
{
if (ec)
{
me->sendFtpMessage(FtpReplyCode::TRANSFER_ABORTED, "Data transfer aborted: " + ec.message());
return;
}

me->data_socket_weakptr_ = data_socket;

// Create a file list
std::stringstream stream; // NOLINT(misc-const-correctness) Reason: False detection, this cannot be made const
Expand All @@ -1214,16 +1218,15 @@ namespace fineftp
// Send the string out
me->addDataToBufferAndSend(dir_listing_rawdata, data_socket);
me->addDataToBufferAndSend(std::shared_ptr<std::vector<char>>(), data_socket);// Nullpointer indicates end of transmission
});
}));
}

void FtpSession::sendFile(const std::shared_ptr<ReadableFile>& file)
{
auto data_socket = std::make_shared<asio::ip::tcp::socket>(io_service_);
data_socket_weakptr_ = data_socket;

data_acceptor_.async_accept(*data_socket
, [data_socket, file, me = shared_from_this()](auto ec)
, data_socket_strand_.wrap([data_socket, file, me = shared_from_this()](auto ec)
{
if (ec)
{
Expand All @@ -1237,6 +1240,8 @@ namespace fineftp
}
else
{
me->data_socket_weakptr_ = data_socket;

// Send the file
asio::async_write(*data_socket
, asio::buffer(file->data(), file->size())
Expand All @@ -1252,12 +1257,12 @@ namespace fineftp
}
});
}
});
}));
}

void FtpSession::addDataToBufferAndSend(const std::shared_ptr<std::vector<char>>& data, const std::shared_ptr<asio::ip::tcp::socket>& data_socket)
{
data_buffer_strand_.post([me = shared_from_this(), data, data_socket]()
data_socket_strand_.post([me = shared_from_this(), data, data_socket]()
{
const bool write_in_progress = (!me->data_buffer_.empty());

Expand All @@ -1272,50 +1277,48 @@ namespace fineftp

void FtpSession::writeDataToSocket(const std::shared_ptr<asio::ip::tcp::socket>& data_socket)
{
data_buffer_strand_.post(
[me = shared_from_this(), data_socket]()
{
auto data = me->data_buffer_.front();
data_socket_strand_.post(
[me = shared_from_this(), data_socket]()
{
auto data = me->data_buffer_.front();

if (data)
{
// Send out the buffer
asio::async_write(*data_socket
, asio::buffer(*data)
, me->data_buffer_strand_.wrap([me, data_socket, data](asio::error_code ec, std::size_t /*bytes_to_transfer*/)
{
me->data_buffer_.pop_front();

if (ec)
{
std::cerr << "Data write error: " << ec.message() << std::endl;
return;
}

if (!me->data_buffer_.empty())
{
me->writeDataToSocket(data_socket);
}
}
));
}
else
{
// we got to the end of transmission
me->data_buffer_.pop_front();
if (data)
{
// Send out the buffer
asio::async_write(*data_socket
, asio::buffer(*data)
, me->data_socket_strand_.wrap([me, data, data_socket](asio::error_code ec, std::size_t /*bytes_to_transfer*/)
{
me->data_buffer_.pop_front();

// Close Data Socket properly
// TODO: Protect data socket with mutex, as it may be accessed from multiple threads
{
asio::error_code ec;
data_socket->shutdown(asio::ip::tcp::socket::shutdown_both, ec);
data_socket->close(ec);
}
if (ec)
{
std::cerr << "Data write error: " << ec.message() << std::endl;
return;
}

me->sendFtpMessage(FtpReplyCode::CLOSING_DATA_CONNECTION, "Done");
}
}
);
if (!me->data_buffer_.empty())
{
me->writeDataToSocket(data_socket);
}
}
));
}
else
{
// we got to the end of transmission
me->data_buffer_.pop_front();

// Close Data Socket properly
{
asio::error_code ec;
data_socket->shutdown(asio::ip::tcp::socket::shutdown_both, ec);
data_socket->close(ec);
}

me->sendFtpMessage(FtpReplyCode::CLOSING_DATA_CONNECTION, "Done");
}
});
}

////////////////////////////////////////////////////////
Expand All @@ -1325,19 +1328,20 @@ namespace fineftp
void FtpSession::receiveFile(const std::shared_ptr<WriteableFile>& file)
{
auto data_socket = std::make_shared<asio::ip::tcp::socket>(io_service_);
data_socket_weakptr_ = data_socket;

data_acceptor_.async_accept(*data_socket
, [data_socket, file, me = shared_from_this()](auto ec)
, data_socket_strand_.wrap([data_socket, file, me = shared_from_this()](auto ec)
{
if (ec)
{
me->sendFtpMessage(FtpReplyCode::TRANSFER_ABORTED, "Data transfer aborted (" + std::to_string(ec.value()) + "): " + ec.message());
std::cerr << "Data transfer aborted: " << ec.message() << std::endl;
me->sendFtpMessage(FtpReplyCode::TRANSFER_ABORTED, "Data transfer aborted");
return;
}

me->data_socket_weakptr_ = data_socket;
me->receiveDataFromSocketAndWriteToFile(file, data_socket);
});
}));
}

void FtpSession::receiveDataFromSocketAndWriteToFile(const std::shared_ptr<WriteableFile>& file, const std::shared_ptr<asio::ip::tcp::socket>& data_socket)
Expand All @@ -1347,7 +1351,7 @@ namespace fineftp
asio::async_read(*data_socket
, asio::buffer(*buffer)
, asio::transfer_at_least(buffer->size())
, file_rw_strand_.wrap([me = shared_from_this(), file, data_socket, buffer](asio::error_code ec, std::size_t length)
, data_socket_strand_.wrap([me = shared_from_this(), file, data_socket, buffer](asio::error_code ec, std::size_t length)
{
buffer->resize(length);
if (ec)
Expand All @@ -1356,7 +1360,7 @@ namespace fineftp
{
me->writeDataToFile(buffer, file);
}
me->endDataReceiving(file);
me->endDataReceiving(file, data_socket);
return;
}
else if (length > 0)
Expand All @@ -1373,13 +1377,16 @@ namespace fineftp
file->write(data->data(), data->size());
}

void FtpSession::endDataReceiving(const std::shared_ptr<WriteableFile>& file)
void FtpSession::endDataReceiving(const std::shared_ptr<WriteableFile>& file, const std::shared_ptr<asio::ip::tcp::socket>& data_socket)
{
file_rw_strand_.post([me = shared_from_this(), file]()
{
file->close();
me->sendFtpMessage(FtpReplyCode::CLOSING_DATA_CONNECTION, "Done");
});
data_socket_strand_.post([me = shared_from_this(), file, data_socket]()
{
file->close();
me->sendFtpMessage(FtpReplyCode::CLOSING_DATA_CONNECTION, "Done");
asio::error_code ec;
data_socket->shutdown(asio::ip::tcp::socket::shutdown_both, ec);
data_socket->close();
});
}

////////////////////////////////////////////////////////
Expand Down
6 changes: 3 additions & 3 deletions fineftp-server/src/ftp_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ namespace fineftp
, const std::shared_ptr<WriteableFile>& file
, const std::function<void(void)>& fetch_more = []() {return; });

void endDataReceiving(const std::shared_ptr<WriteableFile>& file);
void endDataReceiving(const std::shared_ptr<WriteableFile>& file
, const std::shared_ptr<asio::ip::tcp::socket>& data_socket);

////////////////////////////////////////////////////////
// Helpers
Expand Down Expand Up @@ -191,8 +192,7 @@ namespace fineftp
asio::ip::tcp::acceptor data_acceptor_;
std::weak_ptr<asio::ip::tcp::socket> data_socket_weakptr_;
std::deque<std::shared_ptr<std::vector<char>>> data_buffer_;
asio::io_service::strand data_buffer_strand_;
asio::io_service::strand file_rw_strand_;
asio::io_service::strand data_socket_strand_;

// Current state
std::string ftp_working_directory_;
Expand Down

0 comments on commit 87bce95

Please sign in to comment.