Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TransferEngine] Support Status return value #125

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions dependencies.sh
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,13 @@ echo "*** Download and installing [golang-1.22] ***"
wget https://go.dev/dl/go1.22.10.linux-amd64.tar.gz
sudo tar -C /usr/local -xzf go1.22.10.linux-amd64.tar.gz

echo "*** Download and installing [abseil-cpp] ***"
cd ${REPO_ROOT}/thirdparties
git clone ${GITHUB_PROXY}/abseil/abseil-cpp.git
cd abseil-cpp
mkdir -p build
cd build
CXXFLAGS="-fPIC" CFLAGS="-fPIC" cmake .. -DCMAKE_POSITION_INDEPENDENT_CODE=ON
make -j$(nproc) && sudo make install

echo "*** Dependencies Installed! ***"
8 changes: 4 additions & 4 deletions mooncake-integration/vllm/vllm_adaptor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,13 @@ int VLLMAdaptor::transferSync(const char *target_hostname, uintptr_t buffer,
entry.target_id = handle;
entry.target_offset = peer_buffer_address;

int ret = engine_->submitTransfer(batch_id, {entry});
if (ret < 0) return -1;
Status s = engine_->submitTransfer(batch_id, {entry});
if (!s.ok()) return -1;

TransferStatus status;
while (true) {
int ret = engine_->getTransferStatus(batch_id, 0, status);
LOG_ASSERT(!ret);
Status s = engine_->getTransferStatus(batch_id, 0, status);
LOG_ASSERT(s.ok());
if (status.s == TransferStatusEnum::COMPLETED) {
engine_->freeBatchID(batch_id);
return 0;
Expand Down
1 change: 1 addition & 0 deletions mooncake-integration/vllm/vllm_adaptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <stack>
#include <vector>

#include "common/base/status.h"
#include "transfer_engine.h"
#include "transport/rdma_transport/rdma_transport.h"
#include "transport/transport.h"
Expand Down
19 changes: 10 additions & 9 deletions mooncake-transfer-engine/example/transfer_engine_bench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <sstream>
#include <unordered_map>

#include "common/base/status.h"
#include "transfer_engine.h"
#include "transport/transport.h"

Expand Down Expand Up @@ -159,7 +160,7 @@ static inline std::string calculateRate(uint64_t data_bytes, double duration) {
volatile bool running = true;
std::atomic<size_t> total_batch_count(0);

int initiatorWorker(TransferEngine *engine, SegmentID segment_id, int thread_id,
Status initiatorWorker(TransferEngine *engine, SegmentID segment_id, int thread_id,
void *addr) {
bindToSocket(thread_id % NR_SOCKETS);
TransferRequest::OpCode opcode;
Expand All @@ -183,7 +184,7 @@ int initiatorWorker(TransferEngine *engine, SegmentID segment_id, int thread_id,
size_t batch_count = 0;
while (running) {
auto batch_id = engine->allocateBatchID(FLAGS_batch_size);
int ret = 0;
Status s;
std::vector<TransferRequest> requests;
for (int i = 0; i < FLAGS_batch_size; ++i) {
TransferRequest entry;
Expand All @@ -198,14 +199,14 @@ int initiatorWorker(TransferEngine *engine, SegmentID segment_id, int thread_id,
requests.emplace_back(entry);
}

ret = engine->submitTransfer(batch_id, requests);
LOG_ASSERT(!ret);
s = engine->submitTransfer(batch_id, requests);
LOG_ASSERT(s.ok());
for (int task_id = 0; task_id < FLAGS_batch_size; ++task_id) {
bool completed = false;
TransferStatus status;
while (!completed) {
int ret = engine->getTransferStatus(batch_id, task_id, status);
LOG_ASSERT(!ret);
Status s = engine->getTransferStatus(batch_id, task_id, status);
LOG_ASSERT(s.ok());
if (status.s == TransferStatusEnum::COMPLETED)
completed = true;
else if (status.s == TransferStatusEnum::FAILED) {
Expand All @@ -216,13 +217,13 @@ int initiatorWorker(TransferEngine *engine, SegmentID segment_id, int thread_id,
}
}

ret = engine->freeBatchID(batch_id);
LOG_ASSERT(!ret);
s = engine->freeBatchID(batch_id);
LOG_ASSERT(s.ok());
batch_count++;
}
LOG(INFO) << "Worker " << thread_id << " stopped!";
total_batch_count.fetch_add(batch_count);
return 0;
return Status::OK();
}

std::string formatDeviceNames(const std::string &device_names) {
Expand Down
2 changes: 2 additions & 0 deletions mooncake-transfer-engine/include/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
#include <ctime>
#include <thread>

#include "absl/strings/str_cat.h"
#include "absl/strings/str_format.h"
#include "error.h"

#if defined(__x86_64__)
Expand Down
284 changes: 284 additions & 0 deletions mooncake-transfer-engine/include/common/base/status.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,284 @@
// Copyright 2025 KVCache.AI
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// The design of this code is adapted from the RocksDB project with some
// modifications.
// https://github.com/facebook/rocksdb/blob/main/include/rocksdb/status.h

#ifndef STATUS_H
#define STATUS_H

#include <cstdint>
#include <ostream>
#include <string>
#include <utility>

#include "absl/strings/string_view.h"

namespace mooncake {

class Status final {
public:
// The code of the status.
enum class Code : uint16_t {
kOk = 0,
kInvalidArgument = 1,
kTooManyRequests = 2,
kAddressNotRegistered = 3,
kBatchBusy = 4,
kDeviceNotFound = 6,
kAddressOverlapped = 7,
kDns = 101,
kSocket = 102,
kMalformedJson = 103,
kRejectHandshake = 104,
kMetadata = 200,
kEndpoint = 201,
kContext = 202,
kNuma = 300,
kClock = 301,
kMemory = 302,
kNotImplmented = 999,
kMaxCode
};

// Builds an OK Status.
Status() = default;

~Status() { delete[] message_; }

// Constructs a Status object containing a status code and message.
// If 'code == Code::kOk', 'msg' is ignored and an object identical to an OK
// status is constructed.
Status(Code code, absl::string_view message);

Status(const Status& s);
Status& operator=(const Status& s);
Status(Status&& s);
Status& operator=(Status&& s);

// Returns the stored status code.
Code code() const { return code_; }

// Return the error message (if any).
absl::string_view message() const {
if (message_) {
return message_;
} else {
return absl::string_view();
}
}

// Returns true if the Status is OK.
ABSL_MUST_USE_RESULT bool ok() const { return Code::kOk == code_; }

// Returns true iff the status indicates an InvalidArgument error.
ABSL_MUST_USE_RESULT bool IsInvalidArgument() const {
return Code::kInvalidArgument == code_;
}

// Returns true iff the status indicates a TooManyRequests error.
ABSL_MUST_USE_RESULT bool IsTooManyRequests() const {
return Code::kTooManyRequests == code_;
}

// Returns true iff the status indicates an AddressNotRegistered error.
ABSL_MUST_USE_RESULT bool IsAddressNotRegistered() const {
return Code::kAddressNotRegistered == code_;
}

// Returns true iff the status indicates a BatchBusy error.
ABSL_MUST_USE_RESULT bool IsBatchBusy() const {
return Code::kBatchBusy == code_;
}

// Returns true iff the status indicates an DeviceNotFound error.
ABSL_MUST_USE_RESULT bool IsDeviceNotFound() const {
return Code::kDeviceNotFound == code_;
}

// Returns true iff the status indicates an AddressOverlapped error.
ABSL_MUST_USE_RESULT bool IsAddressOverlapped() const {
return Code::kAddressOverlapped == code_;
}

// Returns true iff the status indicates a dns error.
ABSL_MUST_USE_RESULT bool IsDns() const {
return Code::kDns == code_;
}

// Returns true iff the status indicates an Socket error.
ABSL_MUST_USE_RESULT bool IsSocket() const {
return Code::kSocket == code_;
}

// Returns true iff the status indicates a MalformedJson error.
ABSL_MUST_USE_RESULT bool IsMalformedJson() const {
return Code::kMalformedJson == code_;
}

// Returns true iff the status indicates a RejectHandshake error.
ABSL_MUST_USE_RESULT bool IsRejectHandshake() const {
return Code::kRejectHandshake == code_;
}

// Returns true iff the status indicates a Metadata error.
ABSL_MUST_USE_RESULT bool IsMetadata() const {
return Code::kMetadata == code_;
}

// Returns true iff the status indicates an Endpoint error.
ABSL_MUST_USE_RESULT bool IsEndpoint() const {
return Code::kEndpoint == code_;
}

// Returns true iff the status indicates a Context error.
ABSL_MUST_USE_RESULT bool IsContext() const {
return Code::kContext == code_;
}

// Returns true iff the status indicates a Numa error.
ABSL_MUST_USE_RESULT bool IsNuma() const {
return Code::kNuma == code_;
}

// Returns true iff the status indicates a Clock error.
ABSL_MUST_USE_RESULT bool IsClock() const {
return Code::kClock == code_;
}

// Returns true iff the status indicates a Memory error.
ABSL_MUST_USE_RESULT bool IsMemory() const {
return Code::kMemory == code_;
}

// Returns true iff the status indicates a NotImplmented error.
ABSL_MUST_USE_RESULT bool IsNotImplmented() const {
return Code::kNotImplmented == code_;
}

// Return a combination of the error code name and message.
std::string ToString() const;

bool operator==(const Status& s) const;
bool operator!=(const Status& s) const;

// Return a status of an appropriate type.
static Status OK() { return Status(); }
static Status InvalidArgument(absl::string_view msg) {
return Status(Code::kInvalidArgument, msg);
}
static Status TooManyRequests(absl::string_view msg) {
return Status(Code::kTooManyRequests, msg);
}
static Status AddressNotRegistered(absl::string_view msg) {
return Status(Code::kAddressNotRegistered, msg);
}
static Status BatchBusy(absl::string_view msg) {
return Status(Code::kBatchBusy, msg);
}
static Status DeviceNotFound(absl::string_view msg) {
return Status(Code::kDeviceNotFound, msg);
}
static Status AddressOverlapped(absl::string_view msg) {
return Status(Code::kAddressOverlapped, msg);
}
static Status Dns(absl::string_view msg) {
return Status(Code::kDns, msg);
}
static Status Socket(absl::string_view msg) {
return Status(Code::kSocket, msg);
}
static Status MalformedJson(absl::string_view msg) {
return Status(Code::kMalformedJson, msg);
}
static Status RejectHandshake(absl::string_view msg) {
return Status(Code::kRejectHandshake, msg);
}
static Status Metadata(absl::string_view msg) {
return Status(Code::kMetadata, msg);
}
static Status Endpoint(absl::string_view msg) {
return Status(Code::kEndpoint, msg);
}
static Status Context(absl::string_view msg) {
return Status(Code::kContext, msg);
}
static Status Numa(absl::string_view msg) {
return Status(Code::kNuma, msg);
}
static Status Clock(absl::string_view msg) {
return Status(Code::kClock, msg);
}
static Status Memory(absl::string_view msg) {
return Status(Code::kMemory, msg);
}
static Status NotImplmented(absl::string_view msg) {
return Status(Code::kNotImplmented, msg);
}

// Return a human-readable name of the 'code'.
static std::string_view CodeToString(Code code);

private:
// Return a copy of the message 'msg'.
static const char* CopyMessage(const char* msg);

// The code of the status.
Code code_ = Code::kOk;
// The error message of the status. Refer to the Status definition in RocksDB,
// we don't use 'std::string' type message but 'const char*' type one for the
// performance considerations. A memory allocation in the std::string
// construction could be avoid for the most cases that the Status is OK. And
// the total size of 'message_' is only 8 bytes on a x86-64 platform, while
// the size of a uninitialized strings with SSO (Small String Optimization)
// will be 24 to 32 bytes big, excluding the dynamically allocated memory.
const char* message_ = nullptr;
};

inline Status::Status(const Status& s) : code_(s.code_) {
message_ = (s.message_ == nullptr) ? nullptr : CopyMessage(s.message_);
}

inline Status& Status::operator=(const Status& s) {
if (this != &s) {
code_ = s.code_;
delete[] message_;
message_ = (s.message_ == nullptr) ? nullptr : CopyMessage(s.message_);
}
return *this;
}

inline Status::Status(Status&& s) : Status() { *this = std::move(s); }

inline Status& Status::operator=(Status&& s) {
if (this != &s) {
code_ = std::move(s.code_);
s.code_ = Code::kOk;
delete[] message_;
message_ = nullptr;
std::swap(message_, s.message_);
}
return *this;
}

// Prints a human-readable representation name of the 'code' to 'os'.
std::ostream& operator<<(std::ostream& os, Status::Code code);

// Prints a human-readable representation of 's' to 'os'.
std::ostream& operator<<(std::ostream& os, const Status& s);

} // namespace mooncake

#endif // STATUS_H
Loading