Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add datasets from parameters for Query and Update #1714

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
141 changes: 81 additions & 60 deletions src/engine/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
#include "util/MemorySize/MemorySize.h"
#include "util/OnDestructionDontThrowDuringStackUnwinding.h"
#include "util/ParseableDuration.h"
#include "util/TypeIdentity.h"
#include "util/TypeTraits.h"
#include "util/http/HttpUtils.h"
#include "util/http/websocket/MessageSender.h"

using namespace std::string_literals;
using namespace ad_utility::url_parser::sparqlOperation;

template <typename T>
using Awaitable = Server::Awaitable<T>;
Expand Down Expand Up @@ -169,28 +172,47 @@
// _____________________________________________________________________________
ad_utility::url_parser::ParsedRequest Server::parseHttpRequest(
const ad_utility::httpUtils::HttpRequest auto& request) {
using namespace ad_utility::use_type_identity;
// For an HTTP request, `request.target()` yields the HTTP Request-URI.
// This is a concatenation of the URL path and the query strings.
using namespace ad_utility::url_parser::sparqlOperation;
auto parsedUrl = ad_utility::url_parser::parseRequestTarget(request.target());
ad_utility::url_parser::ParsedRequest parsedRequest{
std::move(parsedUrl.path_), std::move(parsedUrl.parameters_), None{}};

// Some valid requests (e.g. QLever's custom commands like retrieving index
// statistics) don't have a query. So an empty operation is not necessarily an
// error.
auto setOperationIfSpecifiedInParams =
[&parsedRequest]<typename Operation>(string_view paramName) {
auto operation = ad_utility::url_parser::getParameterCheckAtMostOnce(
parsedRequest.parameters_, paramName);
if (operation.has_value()) {
parsedRequest.operation_ = Operation{operation.value()};
parsedRequest.parameters_.erase(paramName);
}
};
auto setOperationIfSpecifiedInParams = [&parsedRequest]<typename Operation>(
TI<Operation>,
string_view paramName) {
auto operation = ad_utility::url_parser::getParameterCheckAtMostOnce(
parsedRequest.parameters_, paramName);
if (operation.has_value()) {
parsedRequest.operation_ = Operation{operation.value(), {}};
parsedRequest.parameters_.erase(paramName);
}
};
auto addToDatasetClausesIfOperationIs = [&parsedRequest]<typename Operation>(
TI<Operation>,
const std::string& key,
bool isNamed) {
if (Operation* op = std::get_if<Operation>(&parsedRequest.operation_)) {
ad_utility::appendVector(op->datasetClauses_,
ad_utility::url_parser::parseDatasetClausesFrom(
parsedRequest.parameters_, key, isNamed));
}
};
auto addDatasetClauses = [&addToDatasetClausesIfOperationIs] {
addToDatasetClausesIfOperationIs(ti<Query>, "default-graph-uri", false);
addToDatasetClausesIfOperationIs(ti<Query>, "named-graph-uri", true);
addToDatasetClausesIfOperationIs(ti<Update>, "using-graph-uri", false);
addToDatasetClausesIfOperationIs(ti<Update>, "using-named-graph-uri", true);
};

if (request.method() == http::verb::get) {
setOperationIfSpecifiedInParams.template operator()<Query>("query");
setOperationIfSpecifiedInParams(ti<Query>, "query");
addDatasetClauses();

if (parsedRequest.parameters_.contains("update")) {
throw std::runtime_error("SPARQL Update is not allowed as GET request.");
}
Expand Down Expand Up @@ -258,17 +280,19 @@
throw std::runtime_error(
R"(Request must only contain one of "query" and "update".)");
}
setOperationIfSpecifiedInParams.template operator()<Query>("query");
setOperationIfSpecifiedInParams.template operator()<Update>("update");

setOperationIfSpecifiedInParams(ti<Query>, "query");
setOperationIfSpecifiedInParams(ti<Update>, "update");
addDatasetClauses();
return parsedRequest;
}
if (contentType.starts_with(contentTypeSparqlQuery)) {
parsedRequest.operation_ = Query{request.body()};
parsedRequest.operation_ = Query{request.body(), {}};
addDatasetClauses();
return parsedRequest;
}
if (contentType.starts_with(contentTypeSparqlUpdate)) {
parsedRequest.operation_ = Update{request.body()};
parsedRequest.operation_ = Update{request.body(), {}};
addDatasetClauses();
return parsedRequest;
}
throw std::runtime_error(absl::StrCat(
Expand Down Expand Up @@ -337,16 +361,6 @@
const auto parsedHttpRequest = parseHttpRequest(request);
const auto& parameters = parsedHttpRequest.parameters_;

auto checkParameterNotPresent = [&parameters](
const std::string& parameterName) {
if (parameters.contains(parameterName)) {
throw NotSupportedException(absl::StrCat(
parameterName, " parameter is currently not supported by QLever."));
}
};
checkParameterNotPresent("default-graph-uri");
checkParameterNotPresent("named-graph-uri");

// We always want to call `Server::checkParameter` with the same first
// parameter.
auto checkParameter = std::bind_front(&ad_utility::url_parser::checkParameter,
Expand Down Expand Up @@ -476,14 +490,13 @@

auto visitQuery = [&checkParameter, &accessTokenOk, &request, &send,
&parameters, &requestTimer,
this](ad_utility::url_parser::sparqlOperation::Query query)
-> Awaitable<void> {
this](Query query) -> Awaitable<void> {

Check warning on line 493 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L493

Added line #L493 was not covered by tests
if (auto timeLimit = co_await verifyUserSubmittedQueryTimeout(
checkParameter("timeout", std::nullopt), accessTokenOk, request,
send)) {
co_return co_await processQueryOrUpdate<OperationType::Query>(
parameters, query.query_, requestTimer, std::move(request), send,
timeLimit.value());
co_return co_await processQueryOrUpdate(parameters, query, requestTimer,
std::move(request), send,
timeLimit.value());

Check warning on line 499 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L497-L499

Added lines #L497 - L499 were not covered by tests
} else {
// If the optional is empty, this indicates an error response has been
// sent to the client already. We can stop here.
Expand All @@ -492,25 +505,22 @@
};
auto visitUpdate =
[&checkParameter, &accessTokenOk, &request, &send, &parameters,
&requestTimer, this, &requireValidAccessToken](
const ad_utility::url_parser::sparqlOperation::Update& update)
-> Awaitable<void> {
&requestTimer, this,
&requireValidAccessToken](const Update& update) -> Awaitable<void> {

Check warning on line 509 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L508-L509

Added lines #L508 - L509 were not covered by tests
requireValidAccessToken("SPARQL Update");
if (auto timeLimit = co_await verifyUserSubmittedQueryTimeout(
checkParameter("timeout", std::nullopt), accessTokenOk, request,
send)) {
co_return co_await processQueryOrUpdate<OperationType::Update>(
parameters, update.update_, requestTimer, std::move(request), send,
timeLimit.value());
co_return co_await processQueryOrUpdate(parameters, update, requestTimer,
std::move(request), send,
timeLimit.value());

Check warning on line 516 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L514-L516

Added lines #L514 - L516 were not covered by tests
} else {
// If the optional is empty, this indicates an error response has been
// sent to the client already. We can stop here.
co_return;
}
};
auto visitNone =
[&response, &send, &request](
ad_utility::url_parser::sparqlOperation::None) -> Awaitable<void> {
auto visitNone = [&response, &send, &request](None) -> Awaitable<void> {

Check warning on line 523 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L523

Added line #L523 was not covered by tests
// If there was no "query", but any of the URL parameters processed before
// produced a `response`, send that now. Note that if multiple URL
// parameters were processed, only the `response` from the last one is sent.
Expand Down Expand Up @@ -549,11 +559,10 @@

// ____________________________________________________________________________
Server::PlannedQuery Server::setupPlannedQuery(
const ad_utility::url_parser::ParamValueMap& params,
const std::vector<DatasetClause>& queryDatasets,
const std::string& operation, QueryExecutionContext& qec,
SharedCancellationHandle handle, TimeLimit timeLimit,
const ad_utility::Timer& requestTimer) const {
auto queryDatasets = ad_utility::url_parser::parseDatasetClauses(params);
PlannedQuery plannedQuery =
parseAndPlan(operation, queryDatasets, qec, handle, timeLimit);
auto& qet = plannedQuery.queryExecutionTree_;
Expand Down Expand Up @@ -784,7 +793,7 @@

// ____________________________________________________________________________
Awaitable<void> Server::processQuery(
const ad_utility::url_parser::ParamValueMap& params, const string& query,
const ad_utility::url_parser::ParamValueMap& params, const Query& query,
ad_utility::Timer& requestTimer,
const ad_utility::httpUtils::HttpRequest auto& request, auto&& send,
TimeLimit timeLimit) {
Expand All @@ -793,7 +802,7 @@
<< ad_utility::toString(mediaType) << "\"" << std::endl;

ad_utility::websocket::MessageSender messageSender =
createMessageSender(queryHub_, request, query);
createMessageSender(queryHub_, request, query.query_);

Check warning on line 805 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L805

Added line #L805 was not covered by tests
auto [cancellationHandle, cancelTimeoutOnDestruction] =
setupCancellationHandle(messageSender.getQueryId(), timeLimit);

Expand All @@ -803,7 +812,7 @@
LOG(INFO) << "Processing the following SPARQL query:"
<< (pinResult ? " [pin result]" : "")
<< (pinSubtrees ? " [pin subresults]" : "") << "\n"
<< query << std::endl;
<< query.query_ << std::endl;

Check warning on line 815 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L815

Added line #L815 was not covered by tests
QueryExecutionContext qec(index_, &cache_, allocator_,
sortPerformanceEstimator_, std::ref(messageSender),
pinSubtrees, pinResult);
Expand All @@ -817,10 +826,10 @@
// an explicit variable instead of directly `co_await`-ing it.
auto coroutine = computeInNewThread(
queryThreadPool_,
[this, &params, &query, &qec, cancellationHandle, &timeLimit,
[this, &query, &qec, cancellationHandle, &timeLimit,

Check warning on line 829 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L829

Added line #L829 was not covered by tests
&requestTimer]() -> std::optional<PlannedQuery> {
return setupPlannedQuery(params, query, qec, cancellationHandle,
timeLimit, requestTimer);
return setupPlannedQuery(query.datasetClauses_, query.query_, qec,
cancellationHandle, timeLimit, requestTimer);

Check warning on line 832 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L831-L832

Added lines #L831 - L832 were not covered by tests
},
cancellationHandle);
auto plannedQueryOpt = co_await std::move(coroutine);
Expand Down Expand Up @@ -938,20 +947,21 @@
}
// ____________________________________________________________________________
json Server::processUpdateImpl(
const ad_utility::url_parser::ParamValueMap& params, const string& update,
const ad_utility::url_parser::ParamValueMap& params, const Update& update,
ad_utility::Timer& requestTimer, TimeLimit timeLimit, auto& messageSender,
ad_utility::SharedCancellationHandle cancellationHandle,
DeltaTriples& deltaTriples) {
auto [pinSubtrees, pinResult] = determineResultPinning(params);
LOG(INFO) << "Processing the following SPARQL update:"
<< (pinResult ? " [pin result]" : "")
<< (pinSubtrees ? " [pin subresults]" : "") << "\n"
<< update << std::endl;
<< update.update_ << std::endl;

Check warning on line 958 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L958

Added line #L958 was not covered by tests
QueryExecutionContext qec(index_, &cache_, allocator_,
sortPerformanceEstimator_, std::ref(messageSender),
pinSubtrees, pinResult);
auto plannedQuery = setupPlannedQuery(params, update, qec, cancellationHandle,
timeLimit, requestTimer);
auto plannedQuery =
setupPlannedQuery(update.datasetClauses_, update.update_, qec,
cancellationHandle, timeLimit, requestTimer);

Check warning on line 964 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L962-L964

Added lines #L962 - L964 were not covered by tests
auto qet = plannedQuery.queryExecutionTree_;

if (!plannedQuery.parsedQuery_.hasUpdateClause()) {
Expand Down Expand Up @@ -984,11 +994,11 @@

// ____________________________________________________________________________
Awaitable<void> Server::processUpdate(
const ad_utility::url_parser::ParamValueMap& params, const string& update,
const ad_utility::url_parser::ParamValueMap& params, const Update& update,
ad_utility::Timer& requestTimer,
const ad_utility::httpUtils::HttpRequest auto& request, auto&& send,
TimeLimit timeLimit) {
auto messageSender = createMessageSender(queryHub_, request, update);
auto messageSender = createMessageSender(queryHub_, request, update.update_);

Check warning on line 1001 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L1001

Added line #L1001 was not covered by tests

auto [cancellationHandle, cancelTimeoutOnDestruction] =
setupCancellationHandle(messageSender.getQueryId(), timeLimit);
Expand Down Expand Up @@ -1021,14 +1031,16 @@
}

// ____________________________________________________________________________
template <Server::OperationType type>
template <typename Operation>
Awaitable<void> Server::processQueryOrUpdate(
const ad_utility::url_parser::ParamValueMap& params,
const string& queryOrUpdate, ad_utility::Timer& requestTimer,
const Operation& operation, ad_utility::Timer& requestTimer,
const ad_utility::httpUtils::HttpRequest auto& request, auto&& send,
TimeLimit timeLimit) {
using namespace ad_utility::httpUtils;

static_assert(ad_utility::SameAsAny<Operation, Query, Update>);

Check warning on line 1042 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L1042

Added line #L1042 was not covered by tests

http::status responseStatus = http::status::ok;

// Put the whole query processing in a try-catch block. If any exception
Expand All @@ -1041,11 +1053,12 @@
// access to the runtimeInformation in the case of an error.
std::optional<PlannedQuery> plannedQuery;
try {
if constexpr (type == OperationType::Query) {
co_await processQuery(params, queryOrUpdate, requestTimer, request, send,
if constexpr (std::is_same_v<Operation, Query>) {
co_await processQuery(params, operation, requestTimer, request, send,

Check warning on line 1057 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L1056-L1057

Added lines #L1056 - L1057 were not covered by tests
timeLimit);
} else {
co_await processUpdate(params, queryOrUpdate, requestTimer, request, send,
static_assert(std::is_same_v<Operation, Update>);
co_await processUpdate(params, operation, requestTimer, request, send,

Check warning on line 1061 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L1060-L1061

Added lines #L1060 - L1061 were not covered by tests
timeLimit);
}
} catch (const ParseException& e) {
Expand Down Expand Up @@ -1086,8 +1099,16 @@
LOG(ERROR) << metadata.value().query_ << std::endl;
}
}
const std::string& operationStr = [&operation]() -> const std::string& {
if constexpr (std::is_same_v<Operation, Query>) {
return operation.query_;
} else {
static_assert(std::is_same_v<Operation, Update>);
return operation.update_;
}
}();

Check warning on line 1109 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L1102-L1109

Added lines #L1102 - L1109 were not covered by tests
auto errorResponseJson = composeErrorResponseJson(
queryOrUpdate, exceptionErrorMsg.value(), requestTimer, metadata);
operationStr, exceptionErrorMsg.value(), requestTimer, metadata);

Check warning on line 1111 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L1111

Added line #L1111 was not covered by tests
if (plannedQuery.has_value()) {
errorResponseJson["runtimeInformation"] =
nlohmann::ordered_json(plannedQuery.value()
Expand Down
20 changes: 10 additions & 10 deletions src/engine/Server.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,30 +124,28 @@ class Server {
Awaitable<void> process(
const ad_utility::httpUtils::HttpRequest auto& request, auto&& send);

// Indicates which type of operation is being processed.
enum class OperationType { Query, Update };

/// Handle a http request that asks for the processing of an query or update.
/// This is only a wrapper for `processQuery` and `processUpdate` which
/// does the error handling.
/// \param params The key-value-pairs sent in the HTTP GET request.
/// \param queryOrUpdate The query or update.
/// \param operation Must be Query or Update.
/// \param requestTimer Timer that measure the total processing
/// time of this request.
/// \param request The HTTP request.
/// \param send The action that sends a http:response (see the
/// `HttpServer.h` for documentation).
/// \param timeLimit Duration in seconds after which the query will be
/// cancelled.
template <OperationType type>
template <typename Operation>
Awaitable<void> processQueryOrUpdate(
const ad_utility::url_parser::ParamValueMap& params,
const string& queryOrUpdate, ad_utility::Timer& requestTimer,
const Operation& operation, ad_utility::Timer& requestTimer,
const ad_utility::httpUtils::HttpRequest auto& request, auto&& send,
TimeLimit timeLimit);
// Do the actual execution of a query.
Awaitable<void> processQuery(
const ad_utility::url_parser::ParamValueMap& params, const string& query,
const ad_utility::url_parser::ParamValueMap& params,
const ad_utility::url_parser::sparqlOperation::Query& query,
ad_utility::Timer& requestTimer,
const ad_utility::httpUtils::HttpRequest auto& request, auto&& send,
TimeLimit timeLimit);
Expand All @@ -162,7 +160,8 @@ class Server {
FRIEND_TEST(ServerTest, createResponseMetadata);
// Do the actual execution of an update.
Awaitable<void> processUpdate(
const ad_utility::url_parser::ParamValueMap& params, const string& update,
const ad_utility::url_parser::ParamValueMap& params,
const ad_utility::url_parser::sparqlOperation::Update& update,
ad_utility::Timer& requestTimer,
const ad_utility::httpUtils::HttpRequest auto& request, auto&& send,
TimeLimit timeLimit);
Expand All @@ -180,7 +179,7 @@ class Server {
FRIEND_TEST(ServerTest, determineResultPinning);
// Sets up the PlannedQuery s.t. it is ready to be executed.
PlannedQuery setupPlannedQuery(
const ad_utility::url_parser::ParamValueMap& params,
const std::vector<DatasetClause>& queryDatasets,
const std::string& operation, QueryExecutionContext& qec,
SharedCancellationHandle handle, TimeLimit timeLimit,
const ad_utility::Timer& requestTimer) const;
Expand All @@ -192,7 +191,8 @@ class Server {
// Execute an update operation. The function must have exclusive access to the
// DeltaTriples object.
json processUpdateImpl(
const ad_utility::url_parser::ParamValueMap& params, const string& update,
const ad_utility::url_parser::ParamValueMap& params,
const ad_utility::url_parser::sparqlOperation::Update& update,
ad_utility::Timer& requestTimer, TimeLimit timeLimit, auto& messageSender,
ad_utility::SharedCancellationHandle cancellationHandle,
DeltaTriples& deltaTriples);
Expand Down
Loading
Loading