Skip to content

Commit

Permalink
fix mini-rank runtime engine
Browse files Browse the repository at this point in the history
Link: sebhtml/ray#220
Reported-by: Bastien Chevreux <[email protected]>
Signed-off-by: Sébastien Boisvert <[email protected]>
  • Loading branch information
Sébastien Boisvert committed Nov 21, 2013
1 parent ce606c4 commit 15efbd7
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 39 deletions.
91 changes: 73 additions & 18 deletions RayPlatform/communication/Message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
GNU Lesser General Public License for more details.
You have received a copy of the GNU Lesser General Public License
along with this program (lgpl-3.0.txt).
along with this program (lgpl-3.0.txt).
see <http://www.gnu.org/licenses/>
*/
Expand All @@ -25,6 +25,7 @@
#include <RayPlatform/cryptography/crypto.h>

#include <iostream>
#include <iomanip>
using namespace std;

#include <string.h>
Expand All @@ -33,6 +34,8 @@ using namespace std;
#include <assert.h>
#endif

#include <stdio.h>

#define NO_VALUE -1
#define ACTOR_MODEL_NOBODY NO_VALUE
#define ROUTING_NO_VALUE NO_VALUE
Expand Down Expand Up @@ -133,9 +136,11 @@ void Message::print(){
cout<<" RealTag: "<<getTag();
cout<<" Count: "<<getCount();

/*
if(getCount() > 0){
cout<<" Overlay: "<<getBuffer()[0];
}
*/

cout << " Bytes: " << m_bytes;
cout << " SourceActor: " << m_sourceActor;
Expand All @@ -145,6 +150,34 @@ void Message::print(){
cout << " MiniRankSource: " << m_miniRankSource;
cout << " MiniRankDestination: " << m_miniRankDestination;

printBuffer(getBufferBytes(), getNumberOfBytes());

cout << endl;
}

void Message::printBuffer(const char * buffer, int bytes) const {

cout << " Buffer: " << (void*) buffer;
cout << " with " << bytes << " bytes : ";

for(int i = 0 ; i < bytes ; ++i) {

char byte = buffer[i];
/*
* this does not work
cout << " 0x" << hex << setfill('0') << setw(2);
cout << byte;
cout << dec;
*/

/*
* C solution
* \see http://stackoverflow.com/questions/8060170/printing-hexadecimal-characters-in-c
* \see http://stackoverflow.com/questions/10599068/how-do-i-print-bytes-as-hexadecimal
*/
printf(" 0x%02x", byte & 0xff);
cout << dec;
}
cout << endl;
}

Expand Down Expand Up @@ -455,7 +488,7 @@ int Message::getNumberOfBytes() const {
return m_bytes;
}

void Message::runAssertions(int size) {
void Message::runAssertions(int size, bool routing, bool miniRanks) {

#ifdef CONFIG_ASSERT

Expand All @@ -468,29 +501,51 @@ void Message::runAssertions(int size) {
assert(m_sourceActor >= 0);
assert(m_destinationActor >= 0);

if(!(m_miniRankSource >= 0 || m_miniRankSource == NO_VALUE)) {
print();
}
if(miniRanks) {
if(!(m_miniRankSource >= 0)) {
print();
}

assert(m_miniRankSource >= 0 || m_miniRankSource == NO_VALUE);
if(!(m_miniRankSource < size || m_miniRankSource == NO_VALUE)) {
assert(m_miniRankSource >= 0);

if(!(m_miniRankSource < size )) {

cout << "Error" << endl;
print();
cout << "m_miniRankSource " << m_miniRankSource;
cout << " size " << size << endl;

}
assert(m_miniRankSource < size );


assert(m_miniRankDestination >= 0);
assert(m_miniRankDestination < size);

cout << "Error" << endl;
print();
cout << "m_miniRankSource " << m_miniRankSource;
cout << " size " << size << endl;

} else {
assert(m_miniRankSource == NO_VALUE);
assert(m_miniRankDestination == NO_VALUE);
assert(m_miniRankDestination == NO_VALUE);
}
assert(m_miniRankSource < size || m_miniRankSource == NO_VALUE);

assert(m_miniRankDestination >= 0 || m_miniRankDestination == NO_VALUE);
assert(m_miniRankDestination < size || m_miniRankDestination == NO_VALUE);
if(routing) {
assert(m_routingSource >= 0);
assert(m_routingSource < size );

assert(m_routingSource >= 0 || m_routingSource == ROUTING_NO_VALUE);
assert(m_routingSource < size || m_routingSource == NO_VALUE);
assert(m_routingDestination >= 0 );
assert(m_routingDestination < size);
} else {

assert(m_routingDestination >= 0 || m_routingDestination == ROUTING_NO_VALUE);
assert(m_routingDestination < size || m_routingDestination == NO_VALUE);
if(!(m_routingSource == NO_VALUE)) {
print();
}
assert( m_routingSource == ROUTING_NO_VALUE);
assert( m_routingSource == NO_VALUE);

assert( m_routingDestination == ROUTING_NO_VALUE);
assert( m_routingDestination == NO_VALUE);
}

#endif

Expand Down
3 changes: 2 additions & 1 deletion RayPlatform/communication/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,14 +197,15 @@ class Message{
int getRoutingDestination() const;
void displayMetaData();

void runAssertions(int size);
void runAssertions(int size, bool routing, bool miniRanks);

void saveMetaData();
void loadMetaData();

void setMiniRanks(int source, int destination);
int getDestinationMiniRank();
int getSourceMiniRank();
void printBuffer(const char * buffer, int bytes) const;
};

#endif
86 changes: 71 additions & 15 deletions RayPlatform/communication/MessagesHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
*/

//#define GITHUB_ISSUE_220

#include "MessagesHandler.h"
#include "MessageRouter.h"

Expand Down Expand Up @@ -58,7 +60,7 @@ MessagesHandler::~MessagesHandler() {
/**
*
*/
void MessagesHandler::sendAndReceiveMessagesForRankProcess(ComputeCore**cores,int miniRanksPerRank,bool*communicate){
void MessagesHandler::sendAndReceiveMessagesForRankProcess(ComputeCore**cores,int miniRanksPerRank,bool*communicate) {

int deadMiniRanks=0;

Expand Down Expand Up @@ -144,8 +146,15 @@ void MessagesHandler::sendAndReceiveMessagesForRankProcess(ComputeCore**cores,in
* Since all mini-ranks died, it is no longer necessasry to do
* the communication.
*/
if(deadMiniRanks==miniRanksPerRank)
if(deadMiniRanks==miniRanksPerRank) {

/*
cout << "DEBUG deadMiniRanks " << deadMiniRanks;
cout << " miniRanksPerRank " << miniRanksPerRank << endl;
*/

(*communicate)=false;
}
}

#endif
Expand Down Expand Up @@ -190,10 +199,19 @@ void MessagesHandler::sendMessagesForMiniRank(MessageQueue*outbox,RingAllocator*
Rank source=miniRankSource/miniRanksPerRank;
#endif

void*buffer=aMessage->getBuffer();
char * buffer=aMessage->getBufferBytes();
int bytes = aMessage->getNumberOfBytes();
MessageTag tag=aMessage->getTag();

#ifdef GITHUB_ISSUE_220
if(tag == 17) {

cout << "DEBUG buffer for message in buffered outbox" << endl;
Message dummy2;
dummy2.printBuffer(buffer, bytes);
}
#endif

#ifdef ASSERT
assert(destination>=0);
if(destination>=m_size){
Expand All @@ -212,7 +230,15 @@ void MessagesHandler::sendMessagesForMiniRank(MessageQueue*outbox,RingAllocator*
* But it is not possible because it is not thread-safe to do so.
*/
char * copiedBuffer=(char*)outboxBufferAllocator->allocate(MAXIMUM_MESSAGE_SIZE_IN_BYTES);
memcpy(copiedBuffer,buffer, bytes);
memcpy(copiedBuffer, buffer, bytes);

#ifdef GITHUB_ISSUE_220
if(tag == 17) {
cout << "DEBUG Copying " << bytes << " bytes for tag=17" << endl;
Message dummy;
dummy.printBuffer(copiedBuffer, bytes);
}
#endif

/*
* Store minirank information too at the end.
Expand Down Expand Up @@ -259,6 +285,14 @@ void MessagesHandler::sendMessagesForMiniRank(MessageQueue*outbox,RingAllocator*

int count = bytes;

#ifdef GITHUB_ISSUE_220
if(tag == 17) {
cout << "DEBUG MPI_Isend " << count << " bytes to " << destination << endl;
Message dummy;
dummy.printBuffer(copiedBuffer, count);
}
#endif

MPI_Isend(copiedBuffer,count,m_datatype,destination,tag,MPI_COMM_WORLD,request);

#if 0
Expand All @@ -283,7 +317,7 @@ void MessagesHandler::sendMessagesForMiniRank(MessageQueue*outbox,RingAllocator*

#ifdef CONFIG_COMM_PERSISTENT

/*
/*
* receiveMessages is implemented as recommanded by Mr. George Bosilca from
the University of Tennessee (via the Open-MPI mailing list)
Expand Down Expand Up @@ -328,7 +362,7 @@ void MessagesHandler::pumpMessageFromPersistentRing(StaticVector*inbox,RingAlloc

// the request can start again
MPI_Start(m_ring+m_head);

// add the message in the internal inbox
// this inbox is not the real inbox
Message aMessage(incoming,count,m_rank,tag,source);
Expand Down Expand Up @@ -394,6 +428,17 @@ void MessagesHandler::receiveMessagesForMiniRanks(ComputeCore**cores,int miniRan

MPI_Recv(m_staticBuffer,count,datatype,actualSource,actualTag,MPI_COMM_WORLD,&status);

#ifdef GITHUB_ISSUE_220
if(actualTag == 17) {
cout << "DEBUG MPI_Recv from " << actualSource << " with " << count;
cout << " bytes." << endl;

Message dummy;
dummy.printBuffer(m_staticBuffer, count);
}
#endif


//cout << "DEBUG preloading metadata count= " << count << endl;

Message newMessage(m_staticBuffer, count, -1, actualTag, -1); // here the count is the number of MessageUnit
Expand Down Expand Up @@ -454,9 +499,21 @@ void MessagesHandler::receiveMessagesForMiniRanks(ComputeCore**cores,int miniRan
#endif

memcpy(incoming, m_staticBuffer,count*sizeof(char));

#ifdef GITHUB_ISSUE_220
if(actualTag == 17) {
cout << "DEBUG copying " << count << " bytes to incoming buffer";
cout << endl;

Message dummy;
dummy.printBuffer(incoming, count);
}
#endif


}

Message aMessage(incoming,count,miniRankDestination, actualTag,miniRankSource);
Message aMessage(incoming, count, miniRankDestination, actualTag, miniRankSource);
aMessage.setNumberOfBytes(count);

/*
Expand All @@ -483,7 +540,6 @@ void MessagesHandler::receiveMessagesForMiniRanks(ComputeCore**cores,int miniRan
#endif

m_receivedMessages++;

}

#endif
Expand All @@ -502,7 +558,7 @@ void MessagesHandler::receiveMessagesForMiniRanks(ComputeCore**cores,int miniRan
* Starts a non-blocking reception
*/
void MessagesHandler::startNonBlockingReception(int handle){

#ifdef ASSERT
assert(handle<m_numberOfNonBlockingReceives);
assert(handle>=0);
Expand Down Expand Up @@ -530,7 +586,7 @@ void MessagesHandler::init_irecv_testany(RingAllocator*inboxAllocator){
"CONFIG_COMM_IRECV_TESTANY",false);
m_receptionBuffers=(uint8_t*)__Malloc(m_bufferSize*m_numberOfNonBlockingReceives,
"CONFIG_COMM_IRECV_TESTANY",false);

#ifdef ASSERT
assert(m_requests!=NULL);
#endif
Expand Down Expand Up @@ -797,7 +853,7 @@ void MessagesHandler::constructor(int*argc,char***argv){
m_datatype=MPI_BYTE;

#ifdef CONFIG_MINI_RANKS_disabled

int provided;
MPI_Init_thread(argc,argv, MPI_THREAD_FUNNELED, &provided);
bool threads_ok = provided >= MPI_THREAD_FUNNELED;
Expand Down Expand Up @@ -1140,7 +1196,7 @@ void MessagesHandler::receiveMessages(StaticVector*inbox,RingAllocator*inboxAllo
#ifdef COMMUNICATION_IS_VERBOSE
cout<<"call to probeAndRead source="<<source<<""<<endl;
#endif /* COMMUNICATION_IS_VERBOSE */

int source=MPI_ANY_SOURCE;
int tag=MPI_ANY_TAG;

Expand Down Expand Up @@ -1235,13 +1291,13 @@ void MessagesHandler::receiveMessagesForComputeCore(StaticVector*inbox,RingAlloc
Message message;
bufferedInbox->pop(&message);

int count=message.getCount();
int count=message.getNumberOfBytes();
char *incoming=(char*)inboxAllocator->allocate(count*sizeof(char));

/*
* Copy the data, this is slow.
* Copy the data, this is slow but required I think.
*/
memcpy(incoming,message.getBuffer(),count*sizeof(char));
memcpy(incoming,message.getBuffer(), count*sizeof(char));

/*
* Update the buffer so that the code is thread-safe.
Expand Down
Loading

0 comments on commit 15efbd7

Please sign in to comment.