Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Raft protocol to replicate KV pairs #10

Open
wants to merge 25 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
2051a9d
Raft leader election in progress
lnikon Dec 24, 2024
10396a7
Implement leader election
lnikon Dec 28, 2024
b9b5995
Refactor timer thread
lnikon Dec 28, 2024
614c2bc
A little refactoring
lnikon Dec 29, 2024
2e6374a
Refactor into separate headers & fix bugs
lnikon Dec 29, 2024
e0a354b
Implement coderabbit reviews #1
lnikon Dec 30, 2024
2fa9fca
Fix PR reviews & bugs #2
lnikon Dec 31, 2024
0832944
Fix PR reviews #3
lnikon Dec 31, 2024
2b7359e
Refactor election thread & implement log replication
lnikon Jan 2, 2025
f53b501
Fix state machine modification during log replication
lnikon Jan 3, 2025
497b88d
Implement persistence
lnikon Jan 7, 2025
ee68090
Fix docker image build errors & Implement CR suggestions
lnikon Jan 9, 2025
f68e8d1
Implement CR suggestions
lnikon Jan 9, 2025
3372412
Implement CR suggestions
lnikon Jan 9, 2025
1b8b93b
BestPractice improvement
lnikon Jan 9, 2025
ff8edca
Check fstream failures when persisting raft's state & code style changes
lnikon Jan 11, 2025
7ca37a9
Support dependency injection for raft classes & prepare for mock testing
lnikon Jan 12, 2025
54f02c9
Graceful shutdown in progress
lnikon Jan 14, 2025
d803bec
Bug fixes
lnikon Jan 15, 2025
9d5cd21
Remove unused headers
lnikon Jan 15, 2025
d63a8fe
Graceful shutdown in progress
lnikon Jan 18, 2025
0bab68f
Implement graceful shutdown
lnikon Jan 22, 2025
ece39ad
Add unit test for leader election
lnikon Jan 22, 2025
0834c89
Fix unsynced access to heartbeat atomic
lnikon Jan 23, 2025
cb376b9
Add new unit test
lnikon Jan 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 100 additions & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,105 @@
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "rr - RaftMain",
"type": "cppdbg",
"request": "launch",
"program": "${workspaceFolder}/build/RaftMain",
"miDebuggerServerAddress": "localhost:50505",
"stopAtEntry": false,
"cwd": "${workspaceFolder}",
"environment": [],
"externalConsole": true,
"linux": {
"MIMode": "gdb",
"setupCommands": [
{
"description": "Setup to resolve symbols",
"text": "set sysroot /",
"ignoreFailures": false
}
]
},
"osx": {
"MIMode": "gdb"
},
"windows": {
"MIMode": "gdb"
}
},
Comment on lines +7 to +33
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance RaftMain debug configuration for multi-node debugging.

The configuration needs improvements for debugging multiple Raft nodes:

  1. Add build task to ensure RaftMain is compiled
  2. Add command line arguments for Raft configuration (node ID, peer list)
  3. Make debug port configurable to avoid conflicts

Apply this diff:

         {
             "name": "rr - RaftMain",
             "type": "cppdbg",
             "request": "launch",
             "program": "${workspaceFolder}/build/RaftMain",
-            "miDebuggerServerAddress": "localhost:50505",
+            "miDebuggerServerAddress": "localhost:${input:debugPort}",
+            "args": [
+                "--node-id", "${input:nodeId}",
+                "--config", "${workspaceFolder}/assets/raft_config.json"
+            ],
             "stopAtEntry": false,
             "cwd": "${workspaceFolder}",
             "environment": [],
             "externalConsole": true,
+            "preLaunchTask": "build",
             "linux": {
                 "MIMode": "gdb",

Add to the end of the file:

    "inputs": [
        {
            "id": "debugPort",
            "type": "promptString",
            "description": "Debug port for remote debugging",
            "default": "50505"
        },
        {
            "id": "nodeId",
            "type": "promptString",
            "description": "Raft node ID",
            "default": "1"
        }
    ]

{
"name": "Debug - RaftMain",
"type": "cppdbg",
"request": "launch",
"program": "${workspaceFolder}/build/RaftMain", // Path to the compiled executable
"args": [
"--id",
"1",
"--nodes",
"0.0.0.0:8080"
], // Arguments to pass to the program
"stopAtEntry": false, // Set to true to stop at the program's entry point
"cwd": "${workspaceFolder}", // Current working directory
"environment": [],
"externalConsole": false, // Set to true if you want to use an external terminal
"MIMode": "gdb", // Use "lldb" if you're using clang on macOS
"setupCommands": [
{
"description": "Enable pretty-printing for gdb",
"text": "-enable-pretty-printing",
"ignoreFailures": true
}
],
"preLaunchTask": "build", // Ensure your program is built before launching
"miDebuggerPath": "/usr/bin/gdb", // Path to the gdb or lldb debugger
"logging": {
"trace": true, // Enable trace for debugging the launch.json config
"traceResponse": true,
"engineLogging": false
},
"launchCompleteCommand": "exec-run",
"targetArchitecture": "x86_64",
"pipeTransport": {
"pipeCwd": "",
"pipeProgram": "/bin/bash",
"pipeArgs": [
"-c"
],
"debuggerPath": "/usr/bin/gdb"
},
},
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix JSON syntax errors

Remove trailing commas after the pipeTransport objects:

             "pipeTransport": {
                 // ...
                 "debuggerPath": "/usr/bin/gdb"
-            },
+            }
         },

Also applies to: 105-105

🧰 Tools
🪛 Biome (1.9.4)

[error] 74-74: Expected a property but instead found '}'.

Expected a property here.

(parse)

{
"name": "Debug Attach - RaftMain",
"type": "cppdbg",
"request": "attach",
"program": "${workspaceFolder}/build/RaftMain", // Path to the compiled executable
"processId": "${command:pickProcess}",
"MIMode": "gdb", // Use "lldb" if you're using clang on macOS
"setupCommands": [
{
"description": "Enable pretty-printing for gdb",
"text": "-enable-pretty-printing",
"ignoreFailures": true
}
],
"preLaunchTask": "build", // Ensure your program is built before launching
"miDebuggerPath": "/usr/bin/gdb", // Path to the gdb or lldb debugger
"logging": {
"trace": true, // Enable trace for debugging the launch.json config
"traceResponse": true,
"engineLogging": false
},
"targetArchitecture": "x86_64",
"pipeTransport": {
"pipeCwd": "",
"pipeProgram": "/bin/bash",
"pipeArgs": [
"-c"
],
"debuggerPath": "/usr/bin/gdb"
},
},
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix JSON syntax error

Remove the trailing comma after the pipeTransport object.

Apply this diff:

                 ],
                 "debuggerPath": "/usr/bin/gdb"
-            },
+            }
         },
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
},
],
"debuggerPath": "/usr/bin/gdb"
}
},
🧰 Tools
🪛 Biome (1.9.4)

[error] 64-64: Expected a property but instead found '}'.

Expected a property here.

(parse)

{
"name": "Debug - LSMTreeTest",
"type": "cppdbg",
Expand Down Expand Up @@ -84,7 +183,7 @@
"args": [
"-c",
"./assets/tkvpp_config.json"
], // Arguments to pass to the program
],
"stopAtEntry": false, // Set to true to stop at the program's entry point
"cwd": "${workspaceFolder}", // Current working directory
"environment": [],
Expand Down
37 changes: 26 additions & 11 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,22 @@ set(CMAKE_CXX_STANDARD 23)
set(CMAKE_CXX_STANDARD_REQUIRED On)
set(CMAKE_CXX_EXTENSIONS Off)

set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wall -Wextra -pedantic-errors")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic-errors")
set(TINYKVPP_COMMON_C_FLAGS "${CMAKE_C_FLAGS} -Wall -Wextra -pedantic-errors")
set(TINYKVPP_COMMON_CXX_FLAGS
"${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic-errors")

message("Using compiler: ${CMAKE_CXX_COMPILER_ID}")

# CFLAGS="-fsanitize=thread -g -O1" CXXFLAGS="-fsanitize=thread -g -O1"
# LDFLAGS="-fsanitize=thread"

if(CMAKE_CXX_COMPILER_ID MATCHES "Clang")
set(CMAKE_C_FLAGS "${TINYKVPP_COMMON_C_FLAGS} -Wthread-safety")
set(CMAKE_CXX_FLAGS "${TINYKVPP_COMMON_CXX_FLAGS} -Wthread-safety")
else()
set(CMAKE_C_FLAGS "${TINYKVPP_COMMON_C_FLAGS}")
set(CMAKE_CXX_FLAGS "${TINYKVPP_COMMON_CXX_FLAGS}")
endif()

set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY "${PROJECT_BINARY_DIR}")
set(CMAKE_LIBRARY_OUTPUT_DIRECTORY "${PROJECT_BINARY_DIR}")
Expand All @@ -27,6 +41,7 @@ find_package(absl CONFIG REQUIRED)
find_package(Celero CONFIG REQUIRED)
find_package(protobuf CONFIG REQUIRED)
find_package(gRPC CONFIG REQUIRED)
find_package(GTest CONFIG REQUIRED)

include_directories(lib)

Expand All @@ -36,12 +51,12 @@ add_subdirectory(bench)
add_subdirectory(examples)

# Custom targets to build docker images and run tests
add_custom_target(GCCReleaseDockerImage
COMMAND sh scripts/build_gcc_release_docker_image.sh
WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}
)

add_custom_target(ClangReleaseDockerImage
COMMAND sh scripts/build_clang_release_docker_image.sh
WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}
)
add_custom_target(
GCCReleaseDockerImage
COMMAND sh scripts/build_gcc_release_docker_image.sh
WORKING_DIRECTORY ${CMAKE_SOURCE_DIR})

add_custom_target(
ClangReleaseDockerImage
COMMAND sh scripts/build_clang_release_docker_image.sh
WORKING_DIRECTORY ${CMAKE_SOURCE_DIR})
3 changes: 1 addition & 2 deletions conan/profiles/debug-clang
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,5 @@ arch=x86_64
build_type=Debug
compiler=clang
compiler.cppstd=23
compiler.libcxx=libstdc++11
compiler.version=18
compiler.version=19
os=Linux
4 changes: 1 addition & 3 deletions conan/profiles/debug-clang-tsan
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,5 @@ arch=x86_64
build_type=Debug
compiler=clang
compiler.cppstd=23
compiler.libcxx=libstdc++11
compiler.version=18
compiler.version=19
os=Linux

2 changes: 1 addition & 1 deletion conan/profiles/debug-gcc
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ CXX=/usr/local/bin/g++
arch=x86_64
build_type=Debug
compiler=gcc
compiler.cppstd=gnu23
compiler.libcxx=libstdc++11
compiler.cppstd=gnu23
compiler.version=13
os=Linux
3 changes: 1 addition & 2 deletions conan/profiles/release-clang
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,5 @@ arch=x86_64
build_type=Release
compiler=clang
compiler.cppstd=23
compiler.libcxx=libstdc++11
compiler.version=18
compiler.version=19
os=Linux
2 changes: 1 addition & 1 deletion conan/profiles/release-gcc
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ CXX=/usr/local/bin/g++
arch=x86_64
build_type=Release
compiler=gcc
compiler.cppstd=gnu23
compiler.cppstd=23
compiler.libcxx=libstdc++11
compiler.version=13
os=Linux
1 change: 1 addition & 0 deletions conanfile.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ abseil/20240116.2
celero/2.9.0
protobuf/5.27.0
grpc/1.67.1
gtest/1.15.0

[tool_requires]
protobuf/5.27.0
Expand Down
1 change: 1 addition & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ project(zkv)

add_subdirectory(absl)
add_subdirectory(embedded)
add_subdirectory(raft)
15 changes: 15 additions & 0 deletions examples/raft/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
cmake_minimum_required(VERSION 3.25)
project(zkv)

add_executable(RaftMain "main.cpp" "raft.cpp")
set_target_properties(RaftMain PROPERTIES CXX_STANDARD 23)
target_link_libraries(RaftMain PRIVATE DB RaftProtoObjects TKVProtoObjects)

add_executable(RaftTest "raft_test.cpp" "raft.cpp")
set_target_properties(RaftTest PROPERTIES CXX_STANDARD 23)
target_link_libraries(RaftTest Catch2::Catch2WithMain gtest::gtest DB
RaftProtoObjects TKVProtoObjects)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Standardize on a single testing framework

The test executable links against both Catch2 and Google Test frameworks. This is unusual and could lead to:

  • Increased maintenance overhead
  • Confusion in test organization
  • Larger binary size

Choose one framework and update the configuration:

# If choosing Catch2
target_link_libraries(RaftTest PRIVATE
    Catch2::Catch2WithMain
    DB
    RaftProtoObjects
    TKVProtoObjects
)

# OR if choosing Google Test
target_link_libraries(RaftTest PRIVATE
    gtest::gtest
    gtest::gtest_main
    DB
    RaftProtoObjects
    TKVProtoObjects
)


# Register Catch2 tests with CTest
include(Catch)
catch_discover_tests(RaftTest)
85 changes: 85 additions & 0 deletions examples/raft/main.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#include <csignal>
#include <grpcpp/create_channel.h>
#include <grpcpp/security/credentials.h>

#include "raft.h"

#include <grpcpp/server.h>

#include <cxxopts.hpp>

#include <spdlog/spdlog.h>

std::condition_variable gCv;

static void signalHandler(int sig)
{
if (sig == SIGTERM || sig == SIGINT)
{
gCv.notify_all();
}
}
Comment on lines +14 to +20
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Critical Issue: Unsafe use of std::condition_variable in signal handler

Calling std::condition_variable::notify_all() within a signal handler is unsafe and can lead to undefined behavior because it is not an async-signal-safe function. According to the C++ standard, only a limited set of functions are safe to call from within a signal handler.

Suggested fix:

Use an std::atomic<bool> flag to signal the main thread to exit, instead of using a condition variable. Modify the code as follows:

-std::condition_variable gCv;

-static void signalHandler(int sig)
+std::atomic<bool> gShutdown{false};

+void signalHandler(int sig)
 {
     if (sig == SIGTERM || sig == SIGINT)
     {
-        gCv.notify_all();
+        gShutdown.store(true);
     }
 }

 ...

-    std::mutex                   mtx;
-    std::unique_lock<std::mutex> lock(mtx);
-    gCv.wait(lock);
+    while (!gShutdown.load())
+    {
+        std::this_thread::sleep_for(std::chrono::milliseconds(100));
+    }

This replaces the condition variable with an atomic flag that the main thread can periodically check.

Committable suggestion skipped: line range outside the PR's diff.


auto main(int argc, char *argv[]) -> int
{
std::signal(SIGTERM, signalHandler);
std::signal(SIGINT, signalHandler);

cxxopts::Options options("raft");
options.add_options()("id", "id of the node", cxxopts::value<id_t>())(
"nodes", "ip addresses of replicas in a correct order", cxxopts::value<std::vector<raft::ip_t>>());

auto parsedOptions = options.parse(argc, argv);
if ((parsedOptions.count("help") != 0U) || (parsedOptions.count("id") == 0U) ||
(parsedOptions.count("nodes") == 0U))
{
spdlog::info("{}", options.help());
return EXIT_SUCCESS;
}

auto nodeId = parsedOptions["id"].as<id_t>();
if (nodeId == 0)
{
spdlog::error("ID of the node should be positve integer");
return EXIT_FAILURE;
}

auto nodeIps = parsedOptions["nodes"].as<std::vector<raft::ip_t>>();
if (nodeIps.empty())
{
spdlog::error("List of node IPs can't be empty");
return EXIT_FAILURE;
}
Comment on lines +46 to +51
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance IP address validation.

The current validation only checks if the list is not empty. Consider adding validation for:

  • IP address format
  • Port number range
  • Duplicate addresses

Example validation function:

bool validateIpAddress(const std::string& ip) {
    std::regex ipRegex(
        "^(?:[0-9]{1,3}\\.){3}[0-9]{1,3}(:[0-9]{1,5})?$"
    );
    if (!std::regex_match(ip, ipRegex)) {
        return false;
    }
    // Additional validation for port range and IP octets
    return true;
}


std::vector<raft::node_client_t> replicas;
for (raft::id_t replicaId{1}; const auto &replicaIp : nodeIps)
{
if (replicaId != nodeId)
{
std::unique_ptr<RaftService::Stub> stub{
RaftService::NewStub(grpc::CreateChannel(replicaIp, grpc::InsecureChannelCredentials()))};

replicas.emplace_back(raft::node_config_t{.m_id = replicaId, .m_ip = replicaIp}, std::move(stub));
}

++replicaId;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix potential index out of bounds error in replica setup.

The replicaId starts from 1 and increments for each IP, but there's no validation that the number of IPs matches the expected node count.

+    if (nodeIps.size() < nodeId) {
+        spdlog::error("Node ID {} is larger than the number of provided IPs {}", nodeId, nodeIps.size());
+        return EXIT_FAILURE;
+    }
+
     std::vector<raft::node_client_t> replicas;
     for (raft::id_t replicaId{1}; const auto &replicaIp : nodeIps)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
std::vector<raft::node_client_t> replicas;
for (raft::id_t replicaId{1}; const auto &replicaIp : nodeIps)
{
if (replicaId != nodeId)
{
std::unique_ptr<RaftService::Stub> stub{
RaftService::NewStub(grpc::CreateChannel(replicaIp, grpc::InsecureChannelCredentials()))};
replicas.emplace_back(raft::node_config_t{.m_id = replicaId, .m_ip = replicaIp}, std::move(stub));
}
++replicaId;
}
if (nodeIps.size() < nodeId) {
spdlog::error("Node ID {} is larger than the number of provided IPs {}", nodeId, nodeIps.size());
return EXIT_FAILURE;
}
std::vector<raft::node_client_t> replicas;
for (raft::id_t replicaId{1}; const auto &replicaIp : nodeIps)
{
if (replicaId != nodeId)
{
std::unique_ptr<RaftService::Stub> stub{
RaftService::NewStub(grpc::CreateChannel(replicaIp, grpc::InsecureChannelCredentials()))};
replicas.emplace_back(raft::node_config_t{.m_id = replicaId, .m_ip = replicaIp}, std::move(stub));
}
++replicaId;
}


raft::consensus_module_t consensusModule({.m_id = nodeId, .m_ip = nodeIps[nodeId - 1]}, std::move(replicas));
if (!consensusModule.init())
{
spdlog::error("Failed to initialize the state machine");
return EXIT_FAILURE;
}
Comment on lines +67 to +72
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance consensus module initialization error handling.

Add timeout and detailed error reporting for initialization failures.

-    if (!consensusModule.init())
+    auto initStart = std::chrono::steady_clock::now();
+    while (!consensusModule.init())
     {
-        spdlog::error("Failed to initialize the state machine");
+        auto elapsed = std::chrono::steady_clock::now() - initStart;
+        if (elapsed > std::chrono::seconds(10)) {
+            spdlog::error("Failed to initialize consensus module after {} seconds", 
+                std::chrono::duration_cast<std::chrono::seconds>(elapsed).count());
+            spdlog::error("Last error: {}", consensusModule.getLastError());
+            return EXIT_FAILURE;
+        }
+        std::this_thread::sleep_for(std::chrono::milliseconds(100));
+    }

Committable suggestion skipped: line range outside the PR's diff.


spdlog::set_level(spdlog::level::debug);
consensusModule.start();

std::mutex mtx;
std::unique_lock<std::mutex> lock(mtx);
gCv.wait(lock);

consensusModule.stop();

return EXIT_SUCCESS;
}
Loading
Loading