Skip to content

Commit

Permalink
Use PutMessageV3 with yproxy (#102)
Browse files Browse the repository at this point in the history
* Use PutMessageV3

* Fix message type codes

* Test shit

* Read PutComplete at the right place

* Change retcodes
  • Loading branch information
EinKrebs authored Feb 13, 2025
1 parent 435800d commit c56ff61
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 1 deletion.
2 changes: 2 additions & 0 deletions include/msgproto.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ const char MessageTypeCat = 42;
const char MessageTypeCatV2 = 54;
const char MessageTypePut = 43;
const char MessageTypePutV2 = 53;
const char MessageTypePutV3 = 56;
const char MessageTypeCommandComplete = 44;
const char MessageTypeReadyForQuery = 45;
const char MessageTypeCopyData = 46;
const char MessageTypeList = 48;
const char MessageTypeObjectMeta = 49;
const char MessageTypePutComplete = 57;

const char MessageTypeDelete = 47;

Expand Down
3 changes: 3 additions & 0 deletions include/yproxy_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@ class YProxyWriter : YProxyConnector {
private:
std::string createXPath();

int readPutCompleteResponce(int client_fd_);

ssize_t modcount_;
XLogRecPtr insertion_rec_ptr_;
std::string storage_path_;
uint16_t key_version;

public:
std::string getExternalStoragePath() { return storage_path_; }
Expand Down
55 changes: 54 additions & 1 deletion src/yproxy_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ bool YProxyWriter::close() {
client_fd_ = -1;
return false;
}

if (readPutCompleteResponce(client_fd_) != 0) {
::close(client_fd_);
client_fd_ = -1;
// TODO: handle
return false;
}

// wait for responce
if (commonReadRFQResponce(client_fd_) != 0) {
::close(client_fd_);
Expand Down Expand Up @@ -77,6 +85,51 @@ int YProxyWriter::prepareYproxyConnection() {
}



int YProxyWriter::readPutCompleteResponce(int client_fd_) {
int len = MSG_HEADER_SIZE;
char buffer[len];
// try to read small number of bytes in one op
// if failed, give up
int rc = ::read(client_fd_, buffer, len);
if (rc != len) {
// handle
return -1;
}

uint64_t msgLen = 0;
for (int i = 0; i < 8; i++) {
msgLen <<= 8;
msgLen += uint8_t(buffer[i]);
}

if (msgLen != MSG_HEADER_SIZE + PROTO_HEADER_SIZE + 2) {
// protocol violation
return -1;
}

// substract header
msgLen -= len;

char data[msgLen];
rc = ::read(client_fd_, data, msgLen);
if (rc < 0) {
return -1;
}
if (uint64_t(rc) != msgLen) {
// handle
return -1;
}

if (data[0] != MessageTypePutComplete) {
return -1;
}
uint16_t kv = uint8_t(data[4]) + (1 << 8) * uint16_t(data[5]);
key_version = kv;

return 0;
}

std::vector<char> YProxyWriter::ConstructPutRequest(std::string fileName) {
uint64_t settingsCnt = 4;

Expand All @@ -96,7 +149,7 @@ std::vector<char> YProxyWriter::ConstructPutRequest(std::string fileName) {
builder.endDescription();

builder
.addProto(MessageTypePutV2,
.addProto(MessageTypePutV3,
adv_->use_gpg_crypto ? EncryptRequest : NoEncryptRequest)
.addString(fileName)
.addUInt64(settingsCnt);
Expand Down

0 comments on commit c56ff61

Please sign in to comment.