Skip to content

Commit

Permalink
HAWQ-1627. Support setting the max protocol message size when talking
Browse files Browse the repository at this point in the history
with HDFS
  • Loading branch information
interma committed Jul 2, 2018
1 parent c6146c0 commit 2199107
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 2 deletions.
2 changes: 2 additions & 0 deletions depends/libhdfs3/src/common/SessionConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ SessionConfig::SessionConfig(const Config & conf) {
&rpcMaxRetryOnConnect, "rpc.client.connect.retry", 10, bind(CheckRangeGE<int32_t>, _1, _2, 1)
}, {
&rpcTimeout, "rpc.client.timeout", 3600 * 1000
}, {
&rpcMaxDataLength, "ipc.maximum.data.length", 64 * 1024 * 1024
}, {
&defaultReplica, "dfs.default.replica", 3, bind(CheckRangeGE<int32_t>, _1, _2, 1)
}, {
Expand Down
9 changes: 9 additions & 0 deletions depends/libhdfs3/src/common/SessionConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,14 @@ class SessionConfig {
this->rpcTimeout = rpcTimeout;
}

int32_t getRpcMaxDataLength() const {
return rpcMaxDataLength;
}

void setRpcMaxDataLength(int32_t rpcMaxLength) {
this->rpcMaxDataLength = rpcMaxLength;
}

bool doesNotRetryAnotherNode() const {
return notRetryAnotherNode;
}
Expand Down Expand Up @@ -334,6 +342,7 @@ class SessionConfig {
int32_t rpcMaxHARetry;
int32_t rpcSocketLingerTimeout;
int32_t rpcTimeout;
int32_t rpcMaxDataLength; //ipc.maximum.data.length
bool rpcTcpNoDelay;
std::string rpcAuthMethod;

Expand Down
13 changes: 12 additions & 1 deletion depends/libhdfs3/src/rpc/RpcChannel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "WriteBuffer.h"

#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/io/zero_copy_stream_impl_lite.h>

#define RPC_HEADER_MAGIC "hrpc"
#define RPC_HEADER_VERSION 9
Expand Down Expand Up @@ -756,6 +757,8 @@ static exception_ptr HandlerRpcResponseException(exception_ptr e) {

void RpcChannelImpl::readOneResponse(bool writeLock) {
int readTimeout = key.getConf().getReadTimeout();
int maxLength = key.getConf().getRpcMaxLength();

std::vector<char> buffer(128);
RpcResponseHeaderProto curRespHeader;
RpcResponseHeaderProto::RpcStatusProto status;
Expand All @@ -768,7 +771,15 @@ void RpcChannelImpl::readOneResponse(bool writeLock) {
buffer.resize(headerSize);
in->readFully(&buffer[0], headerSize, readTimeout);

if (!curRespHeader.ParseFromArray(&buffer[0], headerSize)) {
// use CodedInputStream around the buffer, so we can set TotalBytesLimit on it
ArrayInputStream ais(&buffer[0], headerSize);
CodedInputStream cis(&ais);
cis.SetTotalBytesLimit(maxLength, maxLength/2);

// use ParseFromCodedStream instead of ParseFromArray, so it can consume the above CodedInputStream
//
// if just use ParseFromArray, we have no chance to set TotalBytesLimit (64MB default)
if (!curRespHeader.ParseFromCodedStream(&cis)) {
THROW(HdfsRpcException,
"RPC channel to \"%s:%s\" got protocol mismatch: RPC channel cannot parse response header.",
key.getServer().getHost().c_str(), key.getServer().getPort().c_str())
Expand Down
13 changes: 12 additions & 1 deletion depends/libhdfs3/src/rpc/RpcConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class RpcConfig {
tcpNoDelay = conf.isRpcTcpNoDelay();
lingerTimeout = conf.getRpcSocketLingerTimeout();
rpcTimeout = conf.getRpcTimeout();
rpcMaxLength = conf.getRpcMaxDataLength();
}

size_t hash_value() const;
Expand Down Expand Up @@ -117,6 +118,14 @@ class RpcConfig {
this->rpcTimeout = rpcTimeout;
}

int getRpcMaxLength() const {
return rpcMaxLength;
}

void setRpcMaxLength(int rpcTimeout) {
this->rpcMaxLength = rpcTimeout;
}

bool operator ==(const RpcConfig & other) const {
return this->maxIdleTime == other.maxIdleTime
&& this->pingTimeout == other.pingTimeout
Expand All @@ -126,7 +135,8 @@ class RpcConfig {
&& this->maxRetryOnConnect == other.maxRetryOnConnect
&& this->tcpNoDelay == other.tcpNoDelay
&& this->lingerTimeout == other.lingerTimeout
&& this->rpcTimeout == other.rpcTimeout;
&& this->rpcTimeout == other.rpcTimeout
&& this->rpcMaxLength == other.rpcMaxLength;
}

private:
Expand All @@ -138,6 +148,7 @@ class RpcConfig {
int maxRetryOnConnect;
int lingerTimeout;
int rpcTimeout;
int rpcMaxLength;
bool tcpNoDelay;
};

Expand Down
9 changes: 9 additions & 0 deletions src/backend/utils/misc/etc/hdfs-client.xml
Original file line number Diff line number Diff line change
Expand Up @@ -337,4 +337,13 @@ HA -->
</description>
</property>

<property>
<name>ipc.maximum.data.length</name>
<value>67108864</value>
<description>
The max protobuf message size when talking with HDFS.
Increase the value if encounter "Requested data length XXX is longer
than maximum configured RPC length XXX" error.
</description>
</property>
</configuration>

0 comments on commit 2199107

Please sign in to comment.