Skip to content

Commit

Permalink
Use PutMessageV3
Browse files Browse the repository at this point in the history
  • Loading branch information
EinKrebs committed Feb 11, 2025
1 parent 8d75a4e commit 9d7535a
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 1 deletion.
2 changes: 2 additions & 0 deletions include/msgproto.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ const char MessageTypeCat = 42;
const char MessageTypeCatV2 = 54;
const char MessageTypePut = 43;
const char MessageTypePutV2 = 53;
const char MessageTypePutV3 = 54;
const char MessageTypeCommandComplete = 44;
const char MessageTypeReadyForQuery = 45;
const char MessageTypeCopyData = 46;
const char MessageTypeList = 48;
const char MessageTypeObjectMeta = 49;
const char MessageTypePutComplete = 55;

const char MessageTypeDelete = 47;

Expand Down
3 changes: 3 additions & 0 deletions include/yproxy_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@ class YProxyWriter : YProxyConnector {
private:
std::string createXPath();

int readPutCompleteResponce(int client_fd_);

ssize_t modcount_;
XLogRecPtr insertion_rec_ptr_;
std::string storage_path_;
uint16_t key_version;

public:
std::string getExternalStoragePath() { return storage_path_; }
Expand Down
53 changes: 52 additions & 1 deletion src/yproxy_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,61 @@ int YProxyWriter::prepareYproxyConnection() {
return -1;
}

rb = readPutCompleteResponce(client_fd_);
if (rb != 0) {
// TODO: handle
return rb;
}

return 0;
}



int YProxyWriter::readPutCompleteResponce(int client_fd_) {
int len = MSG_HEADER_SIZE;
char buffer[len];
// try to read small number of bytes in one op
// if failed, give up
int rc = ::read(client_fd_, buffer, len);
if (rc != len) {
// handle
return -1;
}

uint64_t msgLen = 0;
for (int i = 0; i < 8; i++) {
msgLen <<= 8;
msgLen += uint8_t(buffer[i]);
}

if (msgLen != MSG_HEADER_SIZE + PROTO_HEADER_SIZE + 2) {
// protocol violation
return 1;
}

// substract header
msgLen -= len;

char data[msgLen];
rc = ::read(client_fd_, data, msgLen);
if (rc < 0) {
return -1;
}
if (uint64_t(rc) != msgLen) {
// handle
return -1;
}

if (data[0] != MessageTypePutComplete) {
return 2;
}
uint16_t kv = uint8_t(data[4]) + (1 << 8) * uint16_t(data[5]);
key_version = kv;

return 0;
}

std::vector<char> YProxyWriter::ConstructPutRequest(std::string fileName) {
uint64_t settingsCnt = 4;

Expand All @@ -96,7 +147,7 @@ std::vector<char> YProxyWriter::ConstructPutRequest(std::string fileName) {
builder.endDescription();

builder
.addProto(MessageTypePutV2,
.addProto(MessageTypePutV3,
adv_->use_gpg_crypto ? EncryptRequest : NoEncryptRequest)
.addString(fileName)
.addUInt64(settingsCnt);
Expand Down

0 comments on commit 9d7535a

Please sign in to comment.