From ecc63e5586aefb66fccf7bf38da5972184fa49d3 Mon Sep 17 00:00:00 2001 From: joyield Date: Sun, 30 Jul 2017 15:00:51 +0800 Subject: [PATCH] improve performance fix scan for sentinel groups --- conf/auth.conf | 4 +- conf/latency.conf | 4 +- src/AcceptConnection.cpp | 5 +- src/AcceptConnection.h | 2 +- src/Alloc.h | 5 +- src/Buffer.h | 2 +- src/ClusterServerPool.cpp | 3 +- src/ClusterServerPool.h | 2 +- src/Command.cpp | 5 +- src/Command.h | 16 ++- src/Common.h | 1 + src/ConnectConnection.cpp | 8 -- src/ConnectConnection.h | 5 +- src/Handler.cpp | 19 ++- src/HashFunc.cpp | 13 -- src/HashFunc.h | 13 +- src/Proxy.cpp | 2 - src/Request.cpp | 63 ++++----- src/Request.h | 4 +- src/RequestParser.cpp | 270 ++++++++++++++++++++++++++++--------- src/RequestParser.h | 40 +++--- src/Response.cpp | 49 ++++--- src/Response.h | 4 +- src/SentinelServerPool.cpp | 23 ++-- src/SentinelServerPool.h | 3 +- src/ServerGroup.cpp | 1 + src/ServerPool.h | 10 +- src/Timer.cpp | 6 +- 28 files changed, 367 insertions(+), 215 deletions(-) diff --git a/conf/auth.conf b/conf/auth.conf index 226bd26..169cf0e 100644 --- a/conf/auth.conf +++ b/conf/auth.conf @@ -28,7 +28,7 @@ # Auth bcd { # Mode admin # } -#### password is "abc", the client must send command Auth bcd +#### password is "bcd", the client must send command Auth bcd #### Mode admin, client connection can read and write and admin, #### the CONFIG command need admin permission #### No KeyPrefix, ReadKeyPrefix, WriteKeyPrefix define, all key can be visit @@ -47,7 +47,7 @@ # ReadKeyPrefix User Stats # WriteKeyPrefix User # } -#### password is "cde", the client must send command Auth cde +#### password is "def", the client must send command Auth def #### Mode read, client connection can read and write, but read and write #### keyspace is diffrent, client can GET User.123 and also #### SET User.123 SomeValue, but SET Stats.123 will be deny diff --git a/conf/latency.conf b/conf/latency.conf index 48c72eb..cf4b499 100644 --- a/conf/latency.conf +++ b/conf/latency.conf @@ -10,8 +10,8 @@ ## see latency monitor for specify server ## redis> INFO ServerLatency ServAddr [name] ## -## reset all stats info, include latency monitor -## redis> INFO ResetStats +## reset all stats info, include latency monitor, require admin permission +## redis> CONFIG ResetStat ## ## Examples: ## LatencyMonitor name { diff --git a/src/AcceptConnection.cpp b/src/AcceptConnection.cpp index 677e159..d96865b 100644 --- a/src/AcceptConnection.cpp +++ b/src/AcceptConnection.cpp @@ -198,11 +198,10 @@ void AcceptConnection::parse(Handler* h, Buffer* buf, int pos) req->set(mParser); h->handleRequest(req); } else { - SegmentStr cmd(mParser.cmd()); ResponsePtr res = ResponseAlloc::create(); char err[1024]; - int len = snprintf(err, sizeof(err), "unknown command '%.*s'", - cmd.length(), cmd.data()); + int len = snprintf(err, sizeof(err), "unknown command '%s'", + mParser.cmd()); res->setErr(err, len); h->handleResponse(nullptr, req, res); } diff --git a/src/AcceptConnection.h b/src/AcceptConnection.h index 2f55a1b..bcc800f 100644 --- a/src/AcceptConnection.h +++ b/src/AcceptConnection.h @@ -23,7 +23,7 @@ class AcceptConnection : public Subscribe, public ListNode>, public DequeNode>, - public RefCntObj + public RefCntObj { public: typedef AcceptConnection Value; diff --git a/src/Alloc.h b/src/Alloc.h index 12b6c91..41cce3d 100644 --- a/src/Alloc.h +++ b/src/Alloc.h @@ -122,7 +122,7 @@ thread_local T* Alloc::Free[CacheSize]; template thread_local int Alloc::Size = 0; -template +template class RefCntObj { public: @@ -138,6 +138,7 @@ class RefCntObj } void ref() { + FuncCallTimer(); ++mCnt; } void unref() @@ -156,7 +157,7 @@ class RefCntObj mCnt = 0; } private: - AtomicInt mCnt; + CntType mCnt; }; template diff --git a/src/Buffer.h b/src/Buffer.h index ba8ba41..b7b9a09 100644 --- a/src/Buffer.h +++ b/src/Buffer.h @@ -192,7 +192,7 @@ class Segment } bool empty() const { - return mCur.buf == mEnd.buf && mCur.pos == mEnd.pos; + return mBegin.buf == mEnd.buf && mBegin.pos == mEnd.pos; } private: BufferPos mBegin; diff --git a/src/ClusterServerPool.cpp b/src/ClusterServerPool.cpp index 31bbe21..7927c7d 100644 --- a/src/ClusterServerPool.cpp +++ b/src/ClusterServerPool.cpp @@ -33,7 +33,7 @@ ClusterServerPool::~ClusterServerPool() } } -Server* ClusterServerPool::getServer(Handler* h, Request* req) const +Server* ClusterServerPool::getServer(Handler* h, Request* req, const String& key) const { FuncCallTimer(); switch (req->type()) { @@ -43,7 +43,6 @@ Server* ClusterServerPool::getServer(Handler* h, Request* req) const default: break; } - SegmentStr key(req->key()); int i = mHash.hash(key.data(), key.length(), HashTag); i &= Const::RedisClusterSlotsMask; ServerGroup* g = mSlots[i]; diff --git a/src/ClusterServerPool.h b/src/ClusterServerPool.h index eb4ccc5..af9e27d 100644 --- a/src/ClusterServerPool.h +++ b/src/ClusterServerPool.h @@ -26,7 +26,7 @@ class ClusterServerPool : public ServerPoolTmpl return mServPool; } private: - Server* getServer(Handler* h, Request* req) const; + Server* getServer(Handler* h, Request* req, const String& key) const; void refreshRequest(Handler* h); void handleResponse(Handler* h, ConnectConnection* s, Request* req, Response* res); ServerGroup* getGroup(const String& nodeid) const diff --git a/src/Command.cpp b/src/Command.cpp index 27cab06..6a9ef0d 100644 --- a/src/Command.cpp +++ b/src/Command.cpp @@ -11,7 +11,7 @@ #include "Command.h" const Command Command::CmdPool[Sentinel] = { - {None, "", 0, 0, Read}, + {None, "", 0, MaxArgs, Read}, {Ping, "ping", 1, 2, Read}, {PingServ, "ping", 1, 2, Inner}, {Echo, "echo", 2, 2, Read}, @@ -24,7 +24,7 @@ const Command Command::CmdPool[Sentinel] = { {SentinelSlaves, "sentinel slaves", 3, 3, Inner}, {Cmd, "command", 1, 1, Read}, {Info, "info", 1, 4, Read}, - {Config, "config", 3, 4, Admin}, + {Config, "config", 2, 4, Admin}, {Cluster, "cluster", 2, 2, Inner}, {ClusterNodes, "cluster nodes", 2, 2, SubCmd|Inner}, {Asking, "asking", 1, 1, Inner}, @@ -171,7 +171,6 @@ const Command Command::CmdPool[Sentinel] = { }; Command::CommandMap Command::CmdMap; - void Command::init() { int type = 0; diff --git a/src/Command.h b/src/Command.h index 6c427ef..f15e624 100644 --- a/src/Command.h +++ b/src/Command.h @@ -7,7 +7,9 @@ #ifndef _PREDIXY_COMMAND_H_ #define _PREDIXY_COMMAND_H_ +#include #include "Exception.h" +#include "HashFunc.h" class Command { @@ -224,6 +226,10 @@ class Command { return mode & MultiKeyVal; } + bool isAnyMulti() const + { + return mode & (MultiKey|SMultiKey|MultiKeyVal); + } static void init(); static const Command& get(Type type) { @@ -244,7 +250,15 @@ class Command private: static const int MaxArgs = 100000000; static const Command CmdPool[Sentinel]; - typedef std::map CommandMap; + class H + { + public: + size_t operator()(const String& s) const + { + return Hash::crc16(s.data(), s.length()); + } + }; + typedef std::unordered_map CommandMap; static CommandMap CmdMap; }; diff --git a/src/Common.h b/src/Common.h index aa4bdfe..6ea070a 100644 --- a/src/Common.h +++ b/src/Common.h @@ -29,6 +29,7 @@ namespace Const static const int MaxAddrLen = 128; static const int MaxDcLen = 32; static const int MaxIOVecLen = IOV_MAX; + static const int MaxCmdLen = 32; static const int MaxKeyLen = 512; static const int BufferAllocCacheSize = 64; static const int RequestAllocCacheSize = 32; diff --git a/src/ConnectConnection.cpp b/src/ConnectConnection.cpp index b680172..7f08ebe 100644 --- a/src/ConnectConnection.cpp +++ b/src/ConnectConnection.cpp @@ -218,14 +218,6 @@ void ConnectConnection::handleResponse(Handler* h) } } -void ConnectConnection::send(Handler* h, Request* req) -{ - FuncCallTimer(); - mSendRequests.push_back(req); - logDebug("h %d s %s %d pend req %ld", - h->id(), peer(), fd(), req->id()); -} - void ConnectConnection::close(Handler* h) { SendRequestList* reqs[2] = {&mSentRequests, &mSendRequests}; diff --git a/src/ConnectConnection.h b/src/ConnectConnection.h index dbe85c7..3e1b79e 100644 --- a/src/ConnectConnection.h +++ b/src/ConnectConnection.h @@ -28,8 +28,11 @@ class ConnectConnection : ~ConnectConnection(); bool writeEvent(Handler* h); void readEvent(Handler* h); - void send(Handler* h, Request* req); void close(Handler* h); + void send(Handler* h, Request* req) + { + mSendRequests.push_back(req); + } Server* server() const { return mServ; diff --git a/src/Handler.cpp b/src/Handler.cpp index c22736c..0ceae95 100644 --- a/src/Handler.cpp +++ b/src/Handler.cpp @@ -46,6 +46,8 @@ Handler::~Handler() void Handler::run() { + Request::init(); + Response::init(); auto conf = mProxy->conf(); refreshServerPool(); while (!mStop) { @@ -134,7 +136,7 @@ void Handler::postEvent() } } -void Handler::addPostEvent(AcceptConnection* c, int evts) +inline void Handler::addPostEvent(AcceptConnection* c, int evts) { if (!c->getPostEvent()) { mPostAcceptConns.push_back(c); @@ -143,12 +145,12 @@ void Handler::addPostEvent(AcceptConnection* c, int evts) c->addPostEvent(evts); } -void Handler::addPostEvent(ConnectConnection* c, int evts) +inline void Handler::addPostEvent(ConnectConnection* s, int evts) { - if (!c->getPostEvent()) { - mPostConnectConns.push_back(c); + if (!s->getPostEvent()) { + mPostConnectConns.push_back(s); } - c->addPostEvent(evts); + s->addPostEvent(evts); } void Handler::postAcceptConnectionEvent() @@ -353,7 +355,6 @@ void Handler::handleAcceptConnectionEvent(AcceptConnection* c, int evts) try { if (c->good() && (evts & Multiplexor::ReadEvent)) { c->readEvent(this); - setAcceptConnectionActiveTime(c); } if (c->good() && (evts & Multiplexor::WriteEvent)) { addPostEvent(c, Multiplexor::WriteEvent); @@ -483,7 +484,7 @@ void Handler::handleRequest(Request* req) return; } auto sp = mProxy->serverPool(); - Server* serv = sp->getServer(this, req); + Server* serv = sp->getServer(this, req, key); if (!serv) { directResponse(req, Response::NoServer); return; @@ -634,6 +635,7 @@ bool Handler::preHandleRequest(Request* req, const String& key) void Handler::postHandleRequest(Request* req, ConnectConnection* s) { + FuncCallTimer(); auto c = req->connection(); if (!c) { return; @@ -1127,6 +1129,9 @@ void Handler::configRequest(Request* req, const String& key) configGetRequest(req); } else if (key.equal("set", true)) { configSetRequest(req); + } else if (key.equal("resetstat", true)) { + mProxy->incrStatsVer(); + directResponse(req, Response::Ok); } else { directResponse(req, Response::ConfigSubCmdUnknown); } diff --git a/src/HashFunc.cpp b/src/HashFunc.cpp index e38ef73..d78d2f7 100644 --- a/src/HashFunc.cpp +++ b/src/HashFunc.cpp @@ -45,19 +45,6 @@ const char* Hash::hashTagStr(const char* buf, int& len, const char* tag) return buf; } -long Hash::hash(const char* buf, int len) const -{ - switch (mType) { - case Atol: - return atol(buf, len); - case Crc16: - return crc16(buf, len); - default: - break; - } - return 0; -} - long Hash::atol(const char* buf, int len) { long v = 0; diff --git a/src/HashFunc.h b/src/HashFunc.h index 97eed37..4024432 100644 --- a/src/HashFunc.h +++ b/src/HashFunc.h @@ -33,7 +33,18 @@ class Hash { return mType; } - long hash(const char* buf, int len) const; + long hash(const char* buf, int len) const + { + switch (mType) { + case Atol: + return atol(buf, len); + case Crc16: + return crc16(buf, len); + default: + break; + } + return 0; + } long hash(const char* buf, int len, const char* tag) const { buf = hashTagStr(buf, len, tag); diff --git a/src/Proxy.cpp b/src/Proxy.cpp index 9f5987e..bf1a010 100644 --- a/src/Proxy.cpp +++ b/src/Proxy.cpp @@ -102,8 +102,6 @@ bool Proxy::init(int argc, char* argv[]) } mLatencyMonitorSet.init(mConf->latencyMonitors()); - Request::init(); - Response::init(); ListenSocket* s = new ListenSocket(mConf->bind(), SOCK_STREAM); if (!s->setNonBlock()) { logError("proxy listener set nonblock fail:%s", StrError()); diff --git a/src/Request.cpp b/src/Request.cpp index 0b4b58c..de1a1c9 100644 --- a/src/Request.cpp +++ b/src/Request.cpp @@ -13,39 +13,40 @@ struct GenericRequest Request::GenericCode code; Command::Type type; const char* content; - const Request* req; }; -static GenericRequest GenericRequests[] = { - {Request::Ping, Command::Ping, "*1\r\n$4\r\nping\r\n", nullptr}, - {Request::PingServ, Command::PingServ, "*1\r\n$4\r\nping\r\n", nullptr}, - {Request::ClusterNodes, Command::ClusterNodes, "*2\r\n$7\r\ncluster\r\n$5\r\nnodes\r\n", nullptr}, - {Request::Asking, Command::Asking, "*1\r\n$6\r\nasking\r\n", nullptr}, - {Request::Readonly, Command::Readonly, "*1\r\n$8\r\nreadonly\r\n", nullptr}, - {Request::UnwatchServ, Command::UnwatchServ, "*1\r\n$7\r\nunwatch\r\n", nullptr}, - {Request::DiscardServ, Command::DiscardServ, "*1\r\n$7\r\ndiscard\r\n", nullptr}, - {Request::MgetHead, Command::Mget, "*2\r\n$4\r\nmget\r\n", nullptr}, - {Request::MsetHead, Command::Mset, "*3\r\n$4\r\nmset\r\n", nullptr}, - {Request::MsetnxHead, Command::Msetnx, "*3\r\n$6\r\nmsetnx\r\n", nullptr}, - {Request::TouchHead, Command::Touch, "*2\r\n$5\r\ntouch\r\n", nullptr}, - {Request::ExistsHead, Command::Exists, "*2\r\n$6\r\nexists\r\n", nullptr}, - {Request::DelHead, Command::Del, "*2\r\n$3\r\ndel\r\n", nullptr}, - {Request::UnlinkHead, Command::Unlink, "*2\r\n$6\r\nunlink\r\n", nullptr}, - {Request::PsubscribeHead,Command::Psubscribe, "*2\r\n$10\r\npsubscribe\r\n", nullptr}, - {Request::SubscribeHead,Command::Subscribe, "*2\r\n$9\r\nsubscribe\r\n", nullptr} +static const GenericRequest GenericRequestDefs[] = { + {Request::Ping, Command::Ping, "*1\r\n$4\r\nping\r\n"}, + {Request::PingServ, Command::PingServ, "*1\r\n$4\r\nping\r\n"}, + {Request::ClusterNodes, Command::ClusterNodes, "*2\r\n$7\r\ncluster\r\n$5\r\nnodes\r\n"}, + {Request::Asking, Command::Asking, "*1\r\n$6\r\nasking\r\n"}, + {Request::Readonly, Command::Readonly, "*1\r\n$8\r\nreadonly\r\n"}, + {Request::UnwatchServ, Command::UnwatchServ, "*1\r\n$7\r\nunwatch\r\n"}, + {Request::DiscardServ, Command::DiscardServ, "*1\r\n$7\r\ndiscard\r\n"}, + {Request::MgetHead, Command::Mget, "*2\r\n$4\r\nmget\r\n"}, + {Request::MsetHead, Command::Mset, "*3\r\n$4\r\nmset\r\n"}, + {Request::MsetnxHead, Command::Msetnx, "*3\r\n$6\r\nmsetnx\r\n"}, + {Request::TouchHead, Command::Touch, "*2\r\n$5\r\ntouch\r\n"}, + {Request::ExistsHead, Command::Exists, "*2\r\n$6\r\nexists\r\n"}, + {Request::DelHead, Command::Del, "*2\r\n$3\r\ndel\r\n"}, + {Request::UnlinkHead, Command::Unlink, "*2\r\n$6\r\nunlink\r\n"}, + {Request::PsubscribeHead,Command::Psubscribe, "*2\r\n$10\r\npsubscribe\r\n"}, + {Request::SubscribeHead,Command::Subscribe, "*2\r\n$9\r\nsubscribe\r\n"} }; +thread_local static Request* GenericRequests[Request::CodeSentinel]; + void Request::init() { BufferPtr buf = BufferAlloc::create(); - for (auto& r : GenericRequests) { + for (auto& r : GenericRequestDefs) { Request* req = new Request(); req->mType= r.type; if (buf->room() < (int)strlen(r.content)) { buf = BufferAlloc::create(); } buf = req->mReq.set(buf, r.content); - r.req = req; + GenericRequests[r.code] = req; } } @@ -85,7 +86,7 @@ Request::Request(GenericCode code): mCreateTime(Util::elapsedUSec()), mData(nullptr) { - auto r = GenericRequests[code].req; + auto r = GenericRequests[code]; mType = r->mType; mReq = r->mReq; } @@ -110,38 +111,38 @@ void Request::set(const RequestParser& p, Request* leader) const Request* r = nullptr; switch (mType) { case Command::Mget: - r = GenericRequests[MgetHead].req; + r = GenericRequests[MgetHead]; break; case Command::Mset: - r = GenericRequests[MsetHead].req; + r = GenericRequests[MsetHead]; break; case Command::Msetnx: - r = GenericRequests[MsetnxHead].req; + r = GenericRequests[MsetnxHead]; break; case Command::Touch: - r = GenericRequests[TouchHead].req; + r = GenericRequests[TouchHead]; break; case Command::Exists: - r = GenericRequests[ExistsHead].req; + r = GenericRequests[ExistsHead]; break; case Command::Del: - r = GenericRequests[DelHead].req; + r = GenericRequests[DelHead]; break; case Command::Unlink: - r = GenericRequests[UnlinkHead].req; + r = GenericRequests[UnlinkHead]; break; case Command::Psubscribe: - r = GenericRequests[PsubscribeHead].req; + r = GenericRequests[PsubscribeHead]; break; case Command::Subscribe: - r = GenericRequests[SubscribeHead].req; + r = GenericRequests[SubscribeHead]; break; default: //should never reach break; } mHead = r->mReq; - mReq = p.arg(); + mReq = p.request(); mLeader = leader; if (leader == this) { if (mType == Command::Mset || mType == Command::Msetnx) { diff --git a/src/Request.h b/src/Request.h index 344d815..4428286 100644 --- a/src/Request.h +++ b/src/Request.h @@ -43,7 +43,9 @@ class Request : DelHead, UnlinkHead, PsubscribeHead, - SubscribeHead + SubscribeHead, + + CodeSentinel }; static void init(); public: diff --git a/src/RequestParser.cpp b/src/RequestParser.cpp index b0e6067..8ffc458 100644 --- a/src/RequestParser.cpp +++ b/src/RequestParser.cpp @@ -21,8 +21,6 @@ void RequestParser::reset() mType = Command::None; mCommand = nullptr; mReq.clear(); - mCmd.clear(); - mArg.clear(); mKey.clear(); mStatus = Normal; mState = Idle; @@ -34,23 +32,31 @@ void RequestParser::reset() mByteCnt = 0; } -bool RequestParser::isKey(bool split) const +inline bool RequestParser::isKey(bool split) const { - if (mCommand) { - switch (mCommand->mode & Command::KeyMask) { - case Command::NoKey: - return false; - case Command::MultiKey: - return split ? mArgCnt > 0 : mArgCnt == 1; - case Command::SMultiKey: - return mArgCnt > 0; - case Command::MultiKeyVal: - return split ? (mArgCnt & 1) : mArgCnt == 1; - case Command::KeyAt3: - return mArgCnt == 3; - default: - return mArgCnt == 1; - } + switch (mCommand->mode & Command::KeyMask) { + case Command::NoKey: + return false; + case Command::MultiKey: + return split ? mArgCnt > 0 : mArgCnt == 1; + case Command::SMultiKey: + return mArgCnt > 0; + case Command::MultiKeyVal: + return split ? (mArgCnt & 1) : mArgCnt == 1; + case Command::KeyAt3: + return mArgCnt == 3; + default: + return mArgCnt == 1; + } + return false; +} + +inline bool RequestParser::isSplit(bool split) const +{ + if (mCommand->mode & (Command::MultiKey|Command::MultiKeyVal)) { + return split && mStatus == Normal && isKey(true); + } else if (mCommand->mode & Command::SMultiKey) { + return mStatus == Normal; } return false; } @@ -88,16 +94,21 @@ RequestParser::Status RequestParser::parse(Buffer* buf, int& pos, bool split) } else if (!isspace(ch)) { mReq.begin().buf = buf; mReq.begin().pos = pos; - mCmd.begin() = mReq.begin(); + mCmd[0] = tolower(ch); + mArgLen = 1; mState = InlineCmd; } break; case InlineCmd: if (isspace(ch)) { - mCmd.end().buf = buf; - mCmd.end().pos = pos; + mCmd[mArgLen < Const::MaxCmdLen ? mArgLen : Const::MaxCmdLen - 1] = '\0'; parseCmd(); mState = ch == '\r' ? InlineLF : InlineArg; + } else { + if (mArgLen < Const::MaxCmdLen) { + mCmd[mArgLen] = tolower(ch); + } + ++mArgLen; } break; case InlineArg: @@ -117,7 +128,6 @@ RequestParser::Status RequestParser::parse(Buffer* buf, int& pos, bool split) if (ch >= '0' && ch <= '9') { mArgNum = mArgNum * 10 + (ch - '0'); } else if (ch == '\r') { - //mState = mArgNum > 0 ? ArgNumLF : Error; mArgNum > 0 ? mState = ArgNumLF : error = __LINE__; } else { error = __LINE__; @@ -125,26 +135,128 @@ RequestParser::Status RequestParser::parse(Buffer* buf, int& pos, bool split) break; case ArgNumLF: mArgCnt = 0; - //mState = ch == '\n' ? ArgTag : Error; - ch == '\n' ? mState = ArgTag : error = __LINE__; + ch == '\n' ? mState = CmdTag : error = __LINE__; break; - case ArgTag: - mArgLen = 0; + case CmdTag: if (ch == '$') { - mState = ArgLen; - if (isKey(split)) { - mArg.begin().buf = buf; - mArg.begin().pos = pos; + mArgLen = 0; + mState = CmdLen; + } else { + error = __LINE__; + } + break; + case CmdLen: + if (ch >= '0' && ch <= '9') { + mArgLen = mArgLen * 10 + (ch - '0'); + } else if (ch == '\r') { + mArgLen > 0 ? mState = CmdLenLF : error = __LINE__; + } else { + error = __LINE__; + } + break; + case CmdLenLF: + if (ch == '\n') { + mArgBodyCnt = 0; + mState = mArgLen < Const::MaxCmdLen ? CmdBody : CmdBodyTooLong; + } else { + error = __LINE__; + } + break; + case CmdBody: + if (mArgBodyCnt == mArgLen) { + mCmd[mArgLen] = '\0'; + ch == '\r' ? mState = CmdBodyLF : error = __LINE__; + } else { + mCmd[mArgBodyCnt++] = tolower(ch); + } + break; + case CmdBodyTooLong: + if (mArgBodyCnt == mArgLen) { + mCmd[Const::MaxCmdLen - 1] = '\0'; + ch == '\r' ? mState = CmdBodyLF : error = __LINE__; + } else { + if (mArgBodyCnt < Const::MaxCmdLen) { + mCmd[mArgBodyCnt] = ch; + } + ++mArgBodyCnt; + } + break; + case CmdBodyLF: + if (ch == '\n') { + parseCmd(); + if (++mArgCnt == mArgNum) { + mState = Finished; + goto Done; + } + if (mCommand->mode & Command::KeyMask) { + mState = SArgTag; + } else { + mState = KeyTag; + } + } else { + error = __LINE__; + } + break; + case KeyTag: + mArgLen = 0; + ch == '$' ? mState = KeyLen : error = __LINE__; + break; + case KeyLen: + if (ch >= '0' && ch <= '9') { + mArgLen = mArgLen * 10 + (ch - '0'); + } else if (ch == '\r') { + mArgLen >= 0 ? mState = KeyLenLF : error = __LINE__; + } else { + error = __LINE__; + } + break; + case KeyLenLF: + if (ch == '\n') { + mArgBodyCnt = 0; + mState = KeyBody; + } else { + error = __LINE__; + } + break; + case KeyBody: + if (mArgBodyCnt == 0) { + mKey.begin().buf = buf; + mKey.begin().pos = pos; + } + if (mArgBodyCnt + (end - cursor) > mArgLen) { + pos += mArgLen - mArgBodyCnt; + cursor = buf->data() + pos; + if (*cursor == '\r') { + mState = KeyBodyLF; + mKey.end().buf = buf; + mKey.end().pos = pos; + } else { + error = __LINE__; } + } else { + mArgBodyCnt += end - cursor; + pos = buf->length() - 1; + } + break; + case KeyBodyLF: + if (ch == '\n') { + if (++mArgCnt == mArgNum) { + mState = Finished; + goto Done; + } + mState = ArgTag; } else { error = __LINE__; } break; + case ArgTag: + mArgLen = 0; + ch == '$' ? mState = ArgLen : error = __LINE__; + break; case ArgLen: if (ch >= '0' && ch <= '9') { mArgLen = mArgLen * 10 + (ch - '0'); } else if (ch == '\r') { - //mState = mArgLen >= 0 ? ArgLenLF : Error; mArgLen >= 0 ? mState = ArgLenLF : error = __LINE__; } else { error = __LINE__; @@ -155,11 +267,55 @@ RequestParser::Status RequestParser::parse(Buffer* buf, int& pos, bool split) ch == '\n' ? mState = ArgBody : error = __LINE__; break; case ArgBody: + if (mArgBodyCnt + (end - cursor) > mArgLen) { + pos += mArgLen - mArgBodyCnt; + cursor = buf->data() + pos; + *cursor == '\r' ? mState = ArgBodyLF : error = __LINE__; + } else { + mArgBodyCnt += end - cursor; + pos = buf->length() - 1; + } + break; + case ArgBodyLF: + if (ch == '\n') { + if (++mArgCnt == mArgNum) { + mState = Finished; + goto Done; + } else { + mState = ArgTag; + } + } else { + error = __LINE__; + } + break; + case SArgTag: + mArgLen = 0; + if (ch == '$') { + mState = SArgLen; + if (isSplit(split)) { + mReq.begin().buf = buf; + mReq.begin().pos = pos; + } + } else { + error = __LINE__; + } + break; + case SArgLen: + if (ch >= '0' && ch <= '9') { + mArgLen = mArgLen * 10 + (ch - '0'); + } else if (ch == '\r') { + mArgLen >= 0 ? mState = SArgLenLF : error = __LINE__; + } else { + error = __LINE__; + } + break; + case SArgLenLF: + mArgBodyCnt = 0; + ch == '\n' ? mState = SArgBody : error = __LINE__; + break; + case SArgBody: if (mArgBodyCnt == 0) { - if (mArgCnt == 0) { - mCmd.begin().buf = buf; - mCmd.begin().pos = pos; - } else if (isKey(split)) { + if (isKey(split)) { mKey.begin().buf = buf; mKey.begin().pos = pos; } @@ -168,12 +324,8 @@ RequestParser::Status RequestParser::parse(Buffer* buf, int& pos, bool split) pos += mArgLen - mArgBodyCnt; cursor = buf->data() + pos; if (*cursor == '\r') { - mState = ArgBodyLF; - if (mArgCnt == 0) { - mCmd.end().buf = buf; - mCmd.end().pos = pos; - parseCmd(); - } else if (isKey(split)) { + mState = SArgBodyLF; + if (isKey(split)) { mKey.end().buf = buf; mKey.end().pos = pos; } @@ -185,24 +337,16 @@ RequestParser::Status RequestParser::parse(Buffer* buf, int& pos, bool split) pos = buf->length() - 1; } break; - case ArgBodyLF: + case SArgBodyLF: if (ch == '\n') { if (++mArgCnt == mArgNum) { mState = Finished; goto Done; } else { - mState = ArgTag; - if (mArgCnt > 1 && isKey(split) && mStatus == Normal && - (mCommand->mode&(Command::MultiKey|Command::SMultiKey|Command::MultiKeyVal))) { + mState = SArgTag; + if (isSplit(split)) { goto Done; } - if (mArgCnt > 1 && mCommand && mStatus == Normal && split) { - if (mCommand->isMultiKey()) { - goto Done; - } else if (mCommand->isMultiKeyVal() && (mArgCnt & 1)) { - goto Done; - } - } } } else { error = __LINE__; @@ -230,8 +374,6 @@ RequestParser::Status RequestParser::parse(Buffer* buf, int& pos, bool split) mReq.end().buf = buf; mReq.end().pos = ++pos; mReq.rewind(); - mArg.end() = mReq.end(); - mArg.rewind(); if (mState == Finished) { return mStatus == Normal ? Complete : mStatus; } else { @@ -242,19 +384,18 @@ RequestParser::Status RequestParser::parse(Buffer* buf, int& pos, bool split) void RequestParser::parseCmd() { FuncCallTimer(); - SegmentStr cmd(mCmd); - if (!cmd.complete()) { + if (mArgLen >= Const::MaxCmdLen) { mStatus = CmdError; mType = Command::None; - logNotice("unknown request cmd too long:%.*s...", - cmd.length(), cmd.data()); + logNotice("unknown request cmd too long:%s...", mCmd); return; } - auto c = Command::find(cmd); + auto c = Command::find(mCmd); if (!c) { + mCommand = &Command::get(Command::None); mStatus = CmdError; mType = Command::None; - logNotice("unknown request cmd:%.*s", cmd.length(), cmd.data()); + logNotice("unknown request cmd:%s", mCmd); return; } mType = c->type; @@ -269,8 +410,8 @@ void RequestParser::parseCmd() } if (mArgNum < c->minArgs || mArgNum > c->maxArgs) { mStatus = ArgError; - logNotice("request argument is invalid cmd %.*s argnum %d", - cmd.length(), cmd.data(), mArgNum); + logNotice("request argument is invalid cmd %s argnum %d", + mCmd, mArgNum); return; } switch (mType) { @@ -278,9 +419,8 @@ void RequestParser::parseCmd() case Command::Msetnx: if (!(mArgNum & 1)) { mStatus = ArgError; - logNotice("request argument is invalid cmd %.*s argnum %d", - cmd.length(), cmd.data(), mArgNum); - return; + logNotice("request argument is invalid cmd %s argnum %d", + mCmd, mArgNum); } break; default: diff --git a/src/RequestParser.h b/src/RequestParser.h index c31548a..bb7b687 100644 --- a/src/RequestParser.h +++ b/src/RequestParser.h @@ -14,7 +14,6 @@ class RequestParser { public: static const int MaxAllowInvalidByteCount = 1024; - static const int MaxCmdLen = 32; enum State { Idle, // * or inline command @@ -24,11 +23,27 @@ class RequestParser InlineArg, ArgNum, // 2 ArgNumLF, // \r\n + CmdTag, + CmdLen, + CmdLenLF, + CmdBody, + CmdBodyTooLong, + CmdBodyLF, + KeyTag, + KeyLen, + KeyLenLF, + KeyBody, + KeyBodyLF, ArgTag, // $ $ ArgLen, // 3 5 ArgLenLF, // \r\n \r\n ArgBody, // get hello ArgBodyLF, // \r\n \r\n + SArgTag, // $ $ + SArgLen, // 3 5 + SArgLenLF, // \r\n \r\n + SArgBody, // get hello + SArgBodyLF, // \r\n \r\n Finished, Error @@ -76,19 +91,7 @@ class RequestParser { return mArgNum; } - Segment& arg() - { - return mArg; - } - const Segment& arg() const - { - return mArg; - } - Segment& cmd() - { - return mCmd; - } - const Segment& cmd() const + const char* cmd() const { return mCmd; } @@ -103,14 +106,13 @@ class RequestParser private: void parseCmd(); bool isKey(bool split) const; + bool isSplit(bool split) const; private: Command::Type mType; const Command* mCommand; - // *2\r\n$3\r\nget\r\n$3\r\nkey\r\n - Segment mReq; // |------------------------------| - Segment mCmd; // |-| - Segment mArg; // |-----------| - Segment mKey; // |-| + Segment mReq; + Segment mKey; + char mCmd[Const::MaxCmdLen]; Status mStatus; State mState; bool mInline; diff --git a/src/Response.cpp b/src/Response.cpp index 9212fad..8b699b9 100644 --- a/src/Response.cpp +++ b/src/Response.cpp @@ -12,41 +12,42 @@ struct GenericResponse Response::GenericCode code; Reply::Type type; const char* content; - const Response* res; }; -static GenericResponse GenericResponses[] = { - {Response::Pong, Reply::Status, "+PONG\r\n", nullptr}, - {Response::Ok, Reply::Status, "+OK\r\n", nullptr}, +static const GenericResponse GenericResponseDefs[] = { + {Response::Pong, Reply::Status, "+PONG\r\n"}, + {Response::Ok, Reply::Status, "+OK\r\n"}, {Response::Cmd, Reply::Array, "*0\r\n"}, - {Response::UnknownCmd, Reply::Error, "-ERR unknown command\r\n", nullptr}, - {Response::ArgWrong, Reply::Error, "-ERR argument wrong\r\n", nullptr}, - {Response::InvalidDb, Reply::Error, "-ERR invalid DB index\r\n", nullptr}, - {Response::NoPasswordSet, Reply::Error, "-ERR Client sent AUTH, but no password is set\r\n", nullptr}, - {Response::InvalidPassword, Reply::Error, "-ERR invalid password\r\n", nullptr}, - {Response::Unauth, Reply::Error, "-NOAUTH Authentication required.\r\n", nullptr}, - {Response::PermissionDeny, Reply::Error, "-ERR auth permission deny\r\n", nullptr}, - {Response::NoServer, Reply::Error, "-ERR no server avaliable\r\n", nullptr}, - {Response::NoServerConnection, Reply::Error, "-ERR no server connection avaliable\r\n", nullptr}, - {Response::ServerConnectionClose, Reply::Error, "-ERR server connection close\r\n", nullptr}, - {Response::DeliverRequestFail, Reply::Error, "-ERR deliver request fail\r\n", nullptr}, - {Response::ForbidTransaction, Reply::Error, "-ERR forbid transaction in current server pool\r\n", nullptr}, - {Response::ConfigSubCmdUnknown, Reply::Error, "-ERR CONFIG subcommand must be one of GET, SET\r\n", nullptr}, - {Response::InvalidScanCursor, Reply::Error, "-ERR invalid cursor\r\n", nullptr}, - {Response::ScanEnd, Reply::Array, "*2\r\n$1\r\n0\r\n*0\r\n", nullptr} + {Response::UnknownCmd, Reply::Error, "-ERR unknown command\r\n"}, + {Response::ArgWrong, Reply::Error, "-ERR argument wrong\r\n"}, + {Response::InvalidDb, Reply::Error, "-ERR invalid DB index\r\n"}, + {Response::NoPasswordSet, Reply::Error, "-ERR Client sent AUTH, but no password is set\r\n"}, + {Response::InvalidPassword, Reply::Error, "-ERR invalid password\r\n"}, + {Response::Unauth, Reply::Error, "-NOAUTH Authentication required.\r\n"}, + {Response::PermissionDeny, Reply::Error, "-ERR auth permission deny\r\n"}, + {Response::NoServer, Reply::Error, "-ERR no server avaliable\r\n"}, + {Response::NoServerConnection, Reply::Error, "-ERR no server connection avaliable\r\n"}, + {Response::ServerConnectionClose, Reply::Error, "-ERR server connection close\r\n"}, + {Response::DeliverRequestFail, Reply::Error, "-ERR deliver request fail\r\n"}, + {Response::ForbidTransaction, Reply::Error, "-ERR forbid transaction in current server pool\r\n"}, + {Response::ConfigSubCmdUnknown, Reply::Error, "-ERR CONFIG subcommand must be one of GET, SET\r\n"}, + {Response::InvalidScanCursor, Reply::Error, "-ERR invalid cursor\r\n"}, + {Response::ScanEnd, Reply::Array, "*2\r\n$1\r\n0\r\n*0\r\n"} }; +thread_local static Response* GenericResponses[Response::CodeSentinel]; + void Response::init() { BufferPtr buf = BufferAlloc::create(); - for (auto& r : GenericResponses) { + for (auto& r : GenericResponseDefs) { Response* res = new Response(); res->mType = r.type; if (buf->room() < (int)strlen(r.content)) { buf = BufferAlloc::create(); } buf = res->mRes.set(buf, r.content); - r.res = res; + GenericResponses[r.code] = res; } } @@ -60,7 +61,7 @@ Response::Response(GenericCode code): mType(Reply::None), mInteger(0) { - auto r = GenericResponses[code].res; + auto r = GenericResponses[code]; mType = r->mType; mRes = r->mRes; } @@ -74,7 +75,6 @@ void Response::set(const ResponseParser& p) { mType = p.type(); mInteger = p.integer(); - mHead.clear(); mRes = p.response(); } @@ -83,14 +83,12 @@ void Response::set(int64_t num) { mType = Reply::Integer; mInteger = num; - mHead.clear(); mRes.fset(nullptr, ":%ld\r\n", num); } void Response::setStr(const char* str, int len) { mType = Reply::String; - mHead.clear(); if (len < 0) { len = strlen(str); } @@ -100,7 +98,6 @@ void Response::setStr(const char* str, int len) void Response::setErr(const char* str, int len) { mType = Reply::Error; - mHead.clear(); if (len < 0) { len = strlen(str); } diff --git a/src/Response.h b/src/Response.h index 448f6e8..7634492 100644 --- a/src/Response.h +++ b/src/Response.h @@ -37,7 +37,9 @@ class Response : ForbidTransaction, ConfigSubCmdUnknown, InvalidScanCursor, - ScanEnd + ScanEnd, + + CodeSentinel }; static void init(); static Response* create(GenericCode code, Request* req = nullptr); diff --git a/src/SentinelServerPool.cpp b/src/SentinelServerPool.cpp index a1efca2..6ba5a03 100644 --- a/src/SentinelServerPool.cpp +++ b/src/SentinelServerPool.cpp @@ -39,11 +39,11 @@ void SentinelServerPool::init(const SentinelServerPoolConf& conf) mSentinels[i++] = s; mServs[s->addr()] = s; } - mGroups.resize(conf.groups.size()); + mGroupPool.resize(conf.groups.size()); i = 0; for (auto& gc : conf.groups) { ServerGroup* g = new ServerGroup(this, gc.name); - mGroups[i++] = g; + mGroupPool[i++] = g; for (auto& sc : gc.servers) { Server* s = new Server(this, sc.addr, true); s->setPassword(sc.password.empty() ? conf.password : sc.password); @@ -55,7 +55,7 @@ void SentinelServerPool::init(const SentinelServerPoolConf& conf) } } -Server* SentinelServerPool::getServer(Handler* h, Request* req) const +Server* SentinelServerPool::getServer(Handler* h, Request* req, const String& key) const { FuncCallTimer(); switch (req->type()) { @@ -76,22 +76,21 @@ Server* SentinelServerPool::getServer(Handler* h, Request* req) const default: break; } - if (mGroups.size() == 1) { - return mGroups[0]->getServer(h, req); - } else if (mGroups.size() > 1) { + if (mGroupPool.size() == 1) { + return mGroupPool[0]->getServer(h, req); + } else if (mGroupPool.size() > 1) { switch (mDist) { case Distribution::Modula: { - SegmentStr key(req->key()); long idx = mHash.hash(key.data(), key.length(), mHashTag); - idx %= mGroups.size(); - return mGroups[idx]->getServer(h, req); + idx %= mGroupPool.size(); + return mGroupPool[idx]->getServer(h, req); } break; case Distribution::Random: { - int idx = h->rand() % mGroups.size(); - return mGroups[idx]->getServer(h, req); + int idx = h->rand() % mGroupPool.size(); + return mGroupPool[idx]->getServer(h, req); } break; default: @@ -104,7 +103,7 @@ Server* SentinelServerPool::getServer(Handler* h, Request* req) const void SentinelServerPool::refreshRequest(Handler* h) { logDebug("h %d update sentinel server pool", h->id()); - for (auto g : mGroups) { + for (auto g : mGroupPool) { RequestPtr req = RequestAlloc::create(); req->setSentinels(g->name()); req->setData(g); diff --git a/src/SentinelServerPool.h b/src/SentinelServerPool.h index 2a6acd9..e7ba56e 100644 --- a/src/SentinelServerPool.h +++ b/src/SentinelServerPool.h @@ -19,7 +19,7 @@ class SentinelServerPool : public ServerPoolTmpl SentinelServerPool(Proxy* p); ~SentinelServerPool(); void init(const SentinelServerPoolConf& conf); - Server* getServer(Handler* h, Request* req) const; + Server* getServer(Handler* h, Request* req, const String& key) const; Server* iter(int& cursor) const { return ServerPool::iter(mServPool, cursor); @@ -34,7 +34,6 @@ class SentinelServerPool : public ServerPoolTmpl private: std::vector mSentinels; std::vector mServPool; - std::vector mGroups; Distribution mDist; Hash mHash; char mHashTag[2]; diff --git a/src/ServerGroup.cpp b/src/ServerGroup.cpp index 399a2d5..30b6e9e 100644 --- a/src/ServerGroup.cpp +++ b/src/ServerGroup.cpp @@ -56,6 +56,7 @@ Server* ServerGroup::getServer(Handler* h, Request* req) const } if (s->role() == Server::Master) { serv = s; + break; } } } else if (auto dataCenter = mPool->proxy()->dataCenter()) { diff --git a/src/ServerPool.h b/src/ServerPool.h index b6c9df8..6a61dc7 100644 --- a/src/ServerPool.h +++ b/src/ServerPool.h @@ -78,9 +78,9 @@ class ServerPool auto it = mServs.find(addr); return it == mServs.end() ? nullptr : it->second; } - Server* getServer(Handler* h, Request* req) const + Server* getServer(Handler* h, Request* req, const String& key) const { - return mGetServerFunc(this, h, req); + return mGetServerFunc(this, h, req, key); } Server* iter(int& cursor) const { @@ -92,7 +92,7 @@ class ServerPool } void handleResponse(Handler* h, ConnectConnection* s, Request* req, Response* res); protected: - typedef Server* (*GetServerFunc)(const ServerPool* p, Handler* h, Request* req); + typedef Server* (*GetServerFunc)(const ServerPool* p, Handler* h, Request* req, const String& key); typedef Server* (*IterFunc)(const ServerPool* p, int& cursor); typedef void (*RefreshRequestFunc)(ServerPool* p, Handler* h); typedef void (*HandleResponseFunc)(ServerPool* p, Handler* h, ConnectConnection* s, Request* req, Response* res); @@ -147,9 +147,9 @@ class ServerPoolTmpl : public ServerPool { } private: - static Server* getServer(const ServerPool* p, Handler* h, Request* req) + static Server* getServer(const ServerPool* p, Handler* h, Request* req, const String& key) { - return static_cast(p)->getServer(h, req); + return static_cast(p)->getServer(h, req, key); } static Server* iter(const ServerPool* p, int& cursor) { diff --git a/src/Timer.cpp b/src/Timer.cpp index 0b5382f..9971ea1 100644 --- a/src/Timer.cpp +++ b/src/Timer.cpp @@ -48,12 +48,12 @@ void TimerPoint::report() std::sort(points, points + cnt, [](TimerPoint* p1, TimerPoint* p2) {return p1->elapsed() > p2->elapsed();}); - printf("%16s %12s %8s %s\n","Total(us)", "Count", "Avg(us)", "Point" ); + printf("%16s %12s %8s %s\n","Total(us)", "Count", "Avg(ns)", "Point" ); for (i = 0; i < cnt; ++i) { auto p = points[i]; - printf("%16ld %12ld %8ld %s\n", + printf("%16ld %12ld %9ld %s\n", p->elapsed(), p->count(), - p->elapsed()/p->count(), p->key()); + p->elapsed()*1000/p->count(), p->key()); } if (points != points0) { delete[] points;