Skip to content

Commit

Permalink
Configuration apply queueing (microsoft#4590)
Browse files Browse the repository at this point in the history
## Change
Adds a queue table to the configuration database and some code to
synchronize the application of configurations.

Every apply in the queue puts a row in the table, with its instance
identifier (it should also be in the history) and a named object that it
will keep alive as long as it is in the queue. This allows for other
queued processes to check for dead queue items.

A global named mutex must be held in order to apply, or even check if
one is at the front of the queue. If not at the front of the queue, the
waiting operation will release the mutex and wait for N * 100ms where N
is their perceived position in the queue. This should prevent repeated
contention on the global mutex as the queued items sort themselves via
the wait.
  • Loading branch information
JohnMcPMS authored Jun 28, 2024
1 parent 4505e94 commit 3c60491
Show file tree
Hide file tree
Showing 19 changed files with 863 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ namespace AppInstaller::SQLite::Builder
// Specify the ordering to use.
StatementBuilder& OrderBy(std::string_view column);
StatementBuilder& OrderBy(const QualifiedColumn& column);
StatementBuilder& OrderBy(std::initializer_list<std::string_view> columns);

// Specify the ordering behavior.
StatementBuilder& Ascending();
Expand Down
6 changes: 6 additions & 0 deletions src/AppInstallerSharedLib/SQLiteStatementBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,12 @@ namespace AppInstaller::SQLite::Builder
return *this;
}

StatementBuilder& StatementBuilder::OrderBy(std::initializer_list<std::string_view> columns)
{
OutputColumns(m_stream, " ORDER BY ", columns);
return *this;
}

StatementBuilder& StatementBuilder::Ascending()
{
m_stream << " ASC";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,149 @@ public void ApplySet_Progress()
this.VerifySummaryEvent(configurationSet, result, ConfigurationUnitResultSource.Precondition);
}

/// <summary>
/// Ensures that multiple apply operations are sequenced.
/// </summary>
[Fact]
public void ApplySet_Sequenced()
{
ConfigurationSet configurationSet = this.ConfigurationSet();
ConfigurationUnit configurationUnitApply = this.ConfigurationUnit().Assign(new { Intent = ConfigurationUnitIntent.Apply });
configurationSet.Units = new ConfigurationUnit[] { configurationUnitApply };

ManualResetEvent startProcessing = new ManualResetEvent(true);
TestConfigurationProcessorFactory factory = new TestConfigurationProcessorFactory();
factory.CreateSetProcessorDelegate = (f, c) =>
{
WaitOn(startProcessing);
return f.DefaultCreateSetProcessor(c);
};

TestConfigurationSetProcessor setProcessor = factory.CreateTestProcessor(configurationSet);
TestConfigurationUnitProcessor unitProcessorApply = setProcessor.CreateTestProcessor(configurationUnitApply);
unitProcessorApply.TestSettingsDelegate = () => new TestSettingsResultInstance(configurationUnitApply) { TestResult = ConfigurationTestResult.Negative };

ManualResetEvent applyEventWaiting = new ManualResetEvent(false);
ManualResetEvent completeApplyEvent = new ManualResetEvent(false);
unitProcessorApply.ApplySettingsDelegate = () =>
{
applyEventWaiting.Set();
WaitOn(completeApplyEvent);
return new ApplySettingsResultInstance(configurationUnitApply);
};

ConfigurationSet configurationSetThatWaits = this.ConfigurationSet();
ConfigurationUnit configurationUnitThatWaits = this.ConfigurationUnit().Assign(new { Intent = ConfigurationUnitIntent.Apply });
configurationSetThatWaits.Units = new ConfigurationUnit[] { configurationUnitThatWaits };

TestConfigurationSetProcessor setThatWaitsProcessor = factory.CreateTestProcessor(configurationSetThatWaits);
TestConfigurationUnitProcessor unitThatWaitsProcessor = setProcessor.CreateTestProcessor(configurationUnitThatWaits);
unitThatWaitsProcessor.TestSettingsDelegate = () => new TestSettingsResultInstance(configurationUnitApply) { TestResult = ConfigurationTestResult.Negative };

ManualResetEvent waitingUnitApply = new ManualResetEvent(false);
unitThatWaitsProcessor.ApplySettingsDelegate = () =>
{
WaitOn(waitingUnitApply);
return new ApplySettingsResultInstance(configurationUnitThatWaits);
};

ConfigurationProcessor processor = this.CreateConfigurationProcessorWithDiagnostics(factory);

var applySetOperation = processor.ApplySetAsync(configurationSet, ApplyConfigurationSetFlags.None);
WaitOn(applyEventWaiting);

startProcessing.Reset();
var waitingSetOperation = processor.ApplySetAsync(configurationSetThatWaits, ApplyConfigurationSetFlags.None);
AutoResetEvent waitingProgress = new AutoResetEvent(false);
ConfigurationSetState progressState = ConfigurationSetState.Unknown;
waitingSetOperation.Progress += (result, changeData) =>
{
if (changeData.Change == ConfigurationSetChangeEventType.SetStateChanged)
{
progressState = changeData.SetState;
waitingProgress.Set();
}
};

startProcessing.Set();
WaitOn(waitingProgress);
Assert.Equal(ConfigurationSetState.Pending, progressState);

completeApplyEvent.Set();
WaitOn(waitingProgress);
Assert.Equal(ConfigurationSetState.InProgress, progressState);

waitingUnitApply.Set();
WaitOn(waitingProgress);
Assert.Equal(ConfigurationSetState.Completed, progressState);
}

/// <summary>
/// Ensures that a consistency check apply is not blocked.
/// </summary>
[Fact]
public void ApplySet_ConsistencyCheckNotSequenced()
{
ConfigurationSet configurationSet = this.ConfigurationSet();
ConfigurationUnit configurationUnitApply = this.ConfigurationUnit().Assign(new { Intent = ConfigurationUnitIntent.Apply });
configurationSet.Units = new ConfigurationUnit[] { configurationUnitApply };

ManualResetEvent startProcessing = new ManualResetEvent(true);
TestConfigurationProcessorFactory factory = new TestConfigurationProcessorFactory();
factory.CreateSetProcessorDelegate = (f, c) =>
{
WaitOn(startProcessing);
return f.DefaultCreateSetProcessor(c);
};

TestConfigurationSetProcessor setProcessor = factory.CreateTestProcessor(configurationSet);
TestConfigurationUnitProcessor unitProcessorApply = setProcessor.CreateTestProcessor(configurationUnitApply);
unitProcessorApply.TestSettingsDelegate = () => new TestSettingsResultInstance(configurationUnitApply) { TestResult = ConfigurationTestResult.Negative };

ManualResetEvent applyEventWaiting = new ManualResetEvent(false);
ManualResetEvent completeApplyEvent = new ManualResetEvent(false);
unitProcessorApply.ApplySettingsDelegate = () =>
{
applyEventWaiting.Set();
WaitOn(completeApplyEvent);
return new ApplySettingsResultInstance(configurationUnitApply);
};

ConfigurationSet configurationSetThatWaits = this.ConfigurationSet();
ConfigurationUnit configurationUnitThatWaits = this.ConfigurationUnit().Assign(new { Intent = ConfigurationUnitIntent.Apply });
configurationSetThatWaits.Units = new ConfigurationUnit[] { configurationUnitThatWaits };

TestConfigurationSetProcessor setThatWaitsProcessor = factory.CreateTestProcessor(configurationSetThatWaits);
TestConfigurationUnitProcessor unitThatWaitsProcessor = setProcessor.CreateTestProcessor(configurationUnitThatWaits);
unitThatWaitsProcessor.TestSettingsDelegate = () => new TestSettingsResultInstance(configurationUnitApply) { TestResult = ConfigurationTestResult.Negative };

ManualResetEvent waitingUnitApply = new ManualResetEvent(false);
unitThatWaitsProcessor.ApplySettingsDelegate = () =>
{
WaitOn(waitingUnitApply);
return new ApplySettingsResultInstance(configurationUnitThatWaits);
};

ConfigurationProcessor processor = this.CreateConfigurationProcessorWithDiagnostics(factory);

var applySetOperation = processor.ApplySetAsync(configurationSet, ApplyConfigurationSetFlags.None);
WaitOn(applyEventWaiting);

startProcessing.Reset();
var waitingSetOperation = processor.ApplySetAsync(configurationSetThatWaits, ApplyConfigurationSetFlags.PerformConsistencyCheckOnly);
Assert.True(waitingSetOperation.AsTask().Wait(10000));

completeApplyEvent.Set();
}

private static void WaitOn(WaitHandle waitable)
{
if (!waitable.WaitOne(10000))
{
throw new TimeoutException();
}
}

private struct ExpectedConfigurationChangeData
{
public ConfigurationSetChangeEventType Change;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "GetConfigurationUnitDetailsResult.h"
#include "GetConfigurationSetDetailsResult.h"
#include "DefaultSetGroupProcessor.h"
#include "ConfigurationSequencer.h"

#include <AppInstallerErrors.h>
#include <AppInstallerStrings.h>
Expand Down Expand Up @@ -520,7 +521,24 @@ namespace winrt::Microsoft::Management::Configuration::implementation

try
{
// TODO: Send pending when blocked by another configuration run
ConfigurationSequencer sequencer{ m_database };

if (!WI_IsFlagSet(flags, ApplyConfigurationSetFlags::PerformConsistencyCheckOnly))
{
if (sequencer.Enqueue(configurationSet))
{
try
{
progress.Progress(implementation::ConfigurationSetChangeData::Create(ConfigurationSetState::Pending));
}
CATCH_LOG();

sequencer.Wait(progress);
}
}

progress.ThrowIfCancelled();

try
{
progress.Progress(implementation::ConfigurationSetChangeData::Create(ConfigurationSetState::InProgress));
Expand Down
164 changes: 164 additions & 0 deletions src/Microsoft.Management.Configuration/ConfigurationSequencer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#include "pch.h"
#include "ConfigurationSequencer.h"
#include <AppInstallerStrings.h>

using namespace std::chrono_literals;

namespace winrt::Microsoft::Management::Configuration::implementation
{
ConfigurationSequencer::ConfigurationSequencer(ConfigurationDatabase& database) : m_database(database) {}

ConfigurationSequencer::~ConfigurationSequencer()
{
// Best effort attempt to remove our queue row
try
{
m_database.RemoveQueueItem(m_queueItemObjectName);
}
CATCH_LOG();
}

// This function creates necessary objects and records this operation into the table.
// It then performs the equivalent of `Wait` with a timeout of 0.
bool ConfigurationSequencer::Enqueue(const Configuration::ConfigurationSet& configurationSet)
{
// Create an arbitrarily named object
std::wstring objectName = L"WinGetConfigQueue_" + AppInstaller::Utility::CreateNewGuidNameWString();
m_queueItemObjectName = AppInstaller::Utility::ConvertToUTF8(objectName);
m_queueItemObject.create(wil::EventOptions::None, objectName.c_str());

m_database.AddQueueItem(configurationSet, m_queueItemObjectName);

// Create shared mutex
constexpr PCWSTR applyMutexName = L"WinGetConfigQueueApplyMutex";

for (int i = 0; !m_applyMutex && i < 2; ++i)
{
if (!m_applyMutex.try_create(applyMutexName, 0, SYNCHRONIZE))
{
m_applyMutex.try_open(applyMutexName, SYNCHRONIZE);
}
}

THROW_LAST_ERROR_IF(!m_applyMutex);

// Probe for an empty queue
DWORD status = 0;
m_applyMutexScope = m_applyMutex.acquire(&status, 0);
THROW_LAST_ERROR_IF(status == WAIT_FAILED);

if (status == WAIT_TIMEOUT)
{
return true;
}

if (GetQueuePosition() == 0)
{
m_database.SetActiveQueueItem(m_queueItemObjectName);
return false;
}
else
{
m_applyMutexScope.reset();
return true;
}
}

// The configuration queue consists of a table in the shared database and cooperative handling of said table.
// At any moment, the active processor must be holding a common named mutex.
// Each active queue entry also holds their own arbitrarily named object, recorded in the table.
//
// The general mechanism to wait is:
// 1. Wait on common named mutex
// 2. Check if first in queue, including probing arbitrary named objects of entries ahead of us
// 3. If not first, wait for X * queue position, where X is sufficiently high to prevent contention on main mutex
void ConfigurationSequencer::Wait(AppInstaller::WinRT::AsyncCancellation& cancellation)
{
THROW_HR_IF(E_NOT_VALID_STATE, !m_applyMutex);

wil::unique_event cancellationEvent;
cancellationEvent.create();

HANDLE waitHandles[2];
waitHandles[0] = cancellationEvent.get();
waitHandles[1] = m_applyMutex.get();

cancellation.Callback([&]() { cancellationEvent.SetEvent(); });
auto clearCancelCallback = wil::scope_exit([&cancellation]() { cancellation.Callback([]() {}); });

for (;;)
{
DWORD waitResult = WaitForMultipleObjects(ARRAYSIZE(waitHandles), waitHandles, FALSE, INFINITE);
THROW_LAST_ERROR_IF(waitResult == WAIT_FAILED);

if (waitResult == WAIT_OBJECT_0)
{
// Cancellation
break;
}
else if (waitResult == WAIT_OBJECT_0 + 1 || waitResult == WAIT_ABANDONED_0 + 1)
{
// We now hold the apply mutex
wil::mutex_release_scope_exit applyMutexScope{ m_applyMutex.get() };

size_t queuePosition = GetQueuePosition();
if (queuePosition == 0)
{
m_applyMutexScope = std::move(applyMutexScope);
m_database.SetActiveQueueItem(m_queueItemObjectName);
break;
}
else
{
applyMutexScope.reset();
std::this_thread::sleep_for(queuePosition * 100ms);
}
}
}
}

size_t ConfigurationSequencer::GetQueuePosition()
{
auto queueItems = m_database.GetQueueItems();

// If we get no queue items at all, we assume that the database doesn't support queueing.
if (queueItems.empty())
{
return 0;
}

size_t result = 0;
bool found = false;

for (const auto& item : queueItems)
{
if (item.ObjectName == m_queueItemObjectName)
{
found = true;
break;
}

std::wstring objectName = AppInstaller::Utility::ConvertToUTF16(item.ObjectName);
QueueObjectType itemObject;
if (itemObject.try_open(objectName.c_str(), SYNCHRONIZE))
{
++result;
}
else
{
// Best effort attempt to remove the dead queue row
try
{
m_database.RemoveQueueItem(item.ObjectName);
}
CATCH_LOG();
}
}

THROW_HR_IF(E_NOT_SET, !found);

return result;
}
}
Loading

0 comments on commit 3c60491

Please sign in to comment.