Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add stress testing framework, with basic metrics example to demonstrate. #3241

Draft
wants to merge 16 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,7 @@ if(NOT WITH_API_ONLY)
if(WITH_FUNC_TESTS)
add_subdirectory(functional)
endif()
add_subdirectory(stress)
endif()

include(cmake/opentelemetry-build-external-component.cmake)
Expand Down
6 changes: 6 additions & 0 deletions stress/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Copyright The OpenTelemetry Authors
# SPDX-License-Identifier: Apache-2.0

# Add subdirectories for common and metrics components
add_subdirectory(common)
add_subdirectory(metrics)
14 changes: 14 additions & 0 deletions stress/common/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Copyright The OpenTelemetry Authors
# SPDX-License-Identifier: Apache-2.0

add_library(stress STATIC stress.cc)

# Include directory for the throughput library
target_include_directories(stress PUBLIC ${CMAKE_CURRENT_SOURCE_DIR})

# Set C++ standard
set_target_properties(
stress
PROPERTIES CXX_STANDARD 17
CXX_STANDARD_REQUIRED YES
CXX_EXTENSIONS NO)
142 changes: 142 additions & 0 deletions stress/common/stress.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#include "stress.h"

// Global flags
std::atomic<bool> STOP(
false); // Global flag to stop the stress test when signaled (e.g., via Ctrl+C)
std::atomic<bool> READY(false); // Global flag to synchronize thread start

// StressTest constructor
Stress::Stress(std::function<void()> func, size_t numThreads)
: func_(std::move(func)), stats_(numThreads), numThreads_(numThreads)
{}

// Main function to start the stress test
void Stress::run()
{
std::cout << "Starting stress test with " << numThreads_ << " threads...\n";
auto startTime = std::chrono::steady_clock::now();

READY.store(false, std::memory_order_release);

std::thread controllerThread(&Stress::monitorThroughput, this);

threads_.reserve(numThreads_);
for (size_t i = 0; i < numThreads_; ++i)
{
threads_.emplace_back(&Stress::workerThread, this, i);
}

READY.store(true, std::memory_order_release);

for (auto &thread : threads_)
{
if (thread.joinable())
{
thread.join();
}
}

if (controllerThread.joinable())
{
controllerThread.join();
}

auto endTime = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::seconds>(endTime - startTime);

uint64_t totalCount = 0;
for (const auto &stat : stats_)
{
totalCount += stat.count.load(std::memory_order_relaxed);
}

std::cout << "\nTest completed:\n"
<< "Total iterations: " << formatNumber(totalCount) << "\n"
<< "Duration: " << duration.count() << " seconds\n"
<< "Average throughput: " << formatNumber(totalCount / duration.count())
<< " iterations/sec\n";
}

// Worker thread function
void Stress::workerThread(size_t threadIndex)
{
#ifdef __linux__
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(threadIndex % std::thread::hardware_concurrency(), &cpuset);
pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
#endif

while (!STOP.load(std::memory_order_acquire))
{
func_();
stats_[threadIndex].count.fetch_add(1, std::memory_order_relaxed);
}
}

// Monitoring thread function
void Stress::monitorThroughput()
{
uint64_t lastTotalCount = 0;
auto lastTime = std::chrono::steady_clock::now();
std::vector<uint64_t> throughputHistory;

while (!STOP.load(std::memory_order_acquire))
{
std::this_thread::sleep_for(std::chrono::seconds(SLIDING_WINDOW_SIZE));

auto currentTime = std::chrono::steady_clock::now();
auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(currentTime - lastTime).count();

uint64_t totalCount = 0;
for (const auto &stat : stats_)
{
totalCount += stat.count.load(std::memory_order_relaxed);
}

uint64_t currentCount = totalCount - lastTotalCount;
lastTotalCount = totalCount;
lastTime = currentTime;

if (elapsed > 0)
{
uint64_t throughput = currentCount / elapsed;
throughputHistory.push_back(throughput);

double avg = 0;
uint64_t min = throughput;
uint64_t max = throughput;

for (uint64_t t : throughputHistory)
{
avg += t;
min = std::min(min, t);
max = std::max(max, t);
}
avg /= throughputHistory.size();

std::cout << "\rThroughput: " << formatNumber(throughput)
<< " it/s | Avg: " << formatNumber(static_cast<uint64_t>(avg))
<< " | Min: " << formatNumber(min) << " | Max: " << formatNumber(max) << std::flush;
}
}
std::cout << std::endl;
}

// Helper function to format numbers with commas
std::string Stress::formatNumber(uint64_t num)
{
std::ostringstream oss;
oss.imbue(std::locale(""));
oss << std::fixed << num;
return oss.str();
}

// Signal handler to set the STOP flag when receiving a termination signal
void Stress::stop()
{
STOP.store(true, std::memory_order_release);
}
99 changes: 99 additions & 0 deletions stress/common/stress.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

/**
* A multi-threaded stress test framework to measure throughput and performance of a given workload.
*
* ## Overview
* Multi-threaded stress test framework designed to execute a specified function
* in parallel across multiple threads and measure its throughput. The results are displayed
* dynamically, including current throughput, average throughput, and minimum/maximum throughput
* during the test.
*
* ## Key Features
* - **Multi-threading**: Uses std::thread to execute the workload in parallel across a user-defined
* number of threads.
* - **Thread Safety**: Tracks iteration counts per thread using an aligned and padded structure
* (WorkerStats) to avoid false sharing and ensure efficient thread-safe updates.
* - **Dynamic Metrics**: Continuously calculates and displays throughput (iterations/sec) over
* sliding time windows.
* - **Graceful Termination**: Captures signals (e.g., Ctrl+C) to cleanly stop all threads and
* summarize the results.
* - **Thread Affinity (Linux-only)**: Optionally binds threads to specific CPU cores for consistent
* performance.
*
* ## Implementation Details
* - **Worker Threads**:
* - Each worker thread executes the workload function (func) in a loop until a global STOP flag
* is set.
* - Each thread maintains its own iteration count to minimize contention.
*
* - **Throughput Monitoring**:
* - A separate controller thread monitors throughput by periodically summing up iteration counts
* across threads.
* - Throughput is calculated over a sliding window (SLIDING_WINDOW_SIZE) and displayed
* dynamically.
*
* - **Thread Synchronization**:
* - The STOP flag, an std::atomic<bool>, ensures all threads stop gracefully when signaled.
* - Memory ordering (e.g., std::memory_order_relaxed, std::memory_order_acquire/release) is used
* to optimize performance while maintaining correctness.
*
* - **Final Summary**:
* - At the end of the test, the program calculates and prints the total iterations, duration, and
* average throughput.
*/

#pragma once

#include <atomic>
#include <chrono>
#include <csignal>
#include <cstdint>
#include <functional>
#include <iostream>
#include <sstream>
#include <thread>
#include <vector>

// Configuration constants
constexpr uint64_t SLIDING_WINDOW_SIZE = 2; // Time window for throughput calculation (in seconds)
constexpr size_t CACHE_LINE_SIZE = 64; // Typical CPU cache line size for alignment

// WorkerStats structure for tracking iteration counts per thread
struct alignas(CACHE_LINE_SIZE) WorkerStats
{
std::atomic<uint64_t> count{0}; // Count of iterations for a specific thread
char padding[CACHE_LINE_SIZE -
sizeof(std::atomic<uint64_t>)]; // Padding to ensure proper alignment
};

// StressTest class
class Stress
{
public:
// Constructor
Stress(std::function<void()> func, size_t numThreads = std::thread::hardware_concurrency());

// Main function to start the stress test
void run();

// function to stop the test
void stop();

private:
std::function<void()> func_; // Function to be executed by each thread
std::vector<std::thread> threads_; // Vector to hold worker threads
std::vector<WorkerStats> stats_; // Vector to hold statistics for each thread
const size_t numThreads_; // Number of threads to run
std::atomic<bool> stopFlag_{false}; // signal to stop the test

// Worker thread function
void workerThread(size_t threadIndex);

// Monitoring thread function to calculate and display throughput
void monitorThroughput();

// Helper function to format numbers with commas for readability
static std::string formatNumber(uint64_t num);
};
30 changes: 30 additions & 0 deletions stress/metrics/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Copyright The OpenTelemetry Authors
# SPDX-License-Identifier: Apache-2.0

# Define the metrics executable
add_executable(stress_metrics metrics.cc)

# Link throughput library and OpenTelemetry Metrics API
target_link_libraries(
stress_metrics PRIVATE stress opentelemetry_metrics # OpenTelemetry Metrics
# SDK
)

# Include directories for throughput
target_include_directories(stress_metrics
PRIVATE ${CMAKE_SOURCE_DIR}/stress/common)

# Set properties
set_target_properties(
stress_metrics
PROPERTIES CXX_STANDARD 17
CXX_STANDARD_REQUIRED YES
CXX_EXTENSIONS NO)

# Optional: Installation
if(OPENTELEMETRY_INSTALL)
install(
TARGETS stress_metrics
EXPORT "${PROJECT_NAME}-target"
RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR})
endif()
Loading
Loading