Skip to content

Commit

Permalink
add mini-rank information in the message metadata
Browse files Browse the repository at this point in the history
This patch does not yet fix the issue though.

Link: sebhtml/ray#220
Reported-by: Bastien Chevreux <[email protected]>
Signed-off-by: Sébastien Boisvert <[email protected]>
  • Loading branch information
Sébastien Boisvert committed Nov 19, 2013
1 parent 185394f commit ce606c4
Show file tree
Hide file tree
Showing 7 changed files with 282 additions and 50 deletions.
171 changes: 158 additions & 13 deletions RayPlatform/communication/Message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ using namespace std;
#include <assert.h>
#endif

#define ACTOR_MODEL_NOBODY -1
#define ROUTING_NO_VALUE -1
#define NO_VALUE -1
#define ACTOR_MODEL_NOBODY NO_VALUE
#define ROUTING_NO_VALUE NO_VALUE

/**
* We always pad the buffer with actor source and actor destination.
Expand All @@ -61,6 +62,9 @@ void Message::initialize() {

m_routingSource = ROUTING_NO_VALUE;
m_routingDestination = ROUTING_NO_VALUE;

m_miniRankSource = NO_VALUE;
m_miniRankDestination = NO_VALUE;
}

Message::~Message() {
Expand Down Expand Up @@ -114,15 +118,17 @@ int Message::getSource() const{
}

void Message::print(){
uint8_t shortTag=getTag();
//uint8_t shortTag=getTag();

cout<<"Source: "<<getSource()<<" Destination: "<<getDestination();

/*
if(isActorModelMessage(0)) {
cout << " ActorModel: Yes.";
} else {
cout <<" Tag: "<<MESSAGE_TAGS[shortTag];
}
*/

cout<<" RealTag: "<<getTag();
cout<<" Count: "<<getCount();
Expand All @@ -136,6 +142,8 @@ void Message::print(){
cout << " DestinationActor: " << m_destinationActor;
cout << " RoutingSource: " << m_routingSource;
cout << " RoutingDestination: " << m_routingDestination;
cout << " MiniRankSource: " << m_miniRankSource;
cout << " MiniRankDestination: " << m_miniRankDestination;

cout << endl;
}
Expand Down Expand Up @@ -214,12 +222,14 @@ void Message::saveActorMetaData() {
cout << "Error m_sourceActor " << m_sourceActor;
printActorMetaData();
}
assert(m_sourceActor >= 0);
assert(m_destinationActor >= 0);
assert(m_sourceActor >= 0 || m_sourceActor == ACTOR_MODEL_NOBODY);
assert(m_destinationActor >= 0 || m_destinationActor == ACTOR_MODEL_NOBODY);

// check that the overwrite worked.
/*
assert(m_sourceActor != ACTOR_MODEL_NOBODY);
assert(m_destinationActor != ACTOR_MODEL_NOBODY);
*/
#endif

//cout << "DEBUG saveActorMetaData tag " << getTag() << endl;
Expand All @@ -235,7 +245,6 @@ void Message::saveActorMetaData() {
memcpy(memory + offset + MESSAGE_META_DATA_ACTOR_DESTINATION, &m_destinationActor, sizeof(int));
//printActorMetaData();

setNumberOfBytes(getNumberOfBytes() + 2 * sizeof(int));
//setCount(m_count + 1);
//m_count += 1; // add 1 uint64_t

Expand Down Expand Up @@ -268,7 +277,7 @@ void Message::loadActorMetaData() {
*/
// the buffer contains the actor metadata. and routing metadata.
int offset = getNumberOfBytes();
offset -= 2 * sizeof(int);
//offset -= 2 * sizeof(int);
//offset -= 2 * sizeof(int);

char * memory = (char*) getBuffer();
Expand All @@ -283,7 +292,7 @@ void Message::loadActorMetaData() {
*/
// remove 1 uint64_t

setNumberOfBytes(getNumberOfBytes() - 2 * sizeof(int));
//setNumberOfBytes(getNumberOfBytes() - 2 * sizeof(int));
//setCount(m_count - 1);
//m_count -= 1;
/*
Expand Down Expand Up @@ -335,20 +344,21 @@ void Message::saveRoutingMetaData() {
// the count already includes the actor addresses
// this is stupid, but hey, nobody aside me is touching this code
// with a ten-foot stick
offset -= 2 * sizeof(int);
// 2013-11-18: this was fixed by grouping the changes to count in saveMetaData().
//offset -= 2 * sizeof(int);

memcpy(memory + offset + MESSAGE_META_DATA_ROUTE_SOURCE, &m_routingSource, sizeof(int));
memcpy(memory + offset + MESSAGE_META_DATA_ROUTE_DESTINATION, &m_routingDestination, sizeof(int));
//printActorMetaData();

setNumberOfBytes(getNumberOfBytes() + 2 * sizeof(int));
//setNumberOfBytes(getNumberOfBytes() + 2 * sizeof(int));
//m_count += 1; // add 1 uint64_t

//cout << "DEBUG saved routing metadata at offset " << offset << " new count " << m_count << endl;
//displayMetaData();

#ifdef CONFIG_ASSERT
assert(getNumberOfBytes() <= (int)(MAXIMUM_MESSAGE_SIZE_IN_BYTES + 4 * sizeof(int)));
//assert(getNumberOfBytes() <= (int)(MAXIMUM_MESSAGE_SIZE_IN_BYTES + 4 * sizeof(int)));
#endif

}
Expand All @@ -370,17 +380,26 @@ void Message::loadRoutingMetaData() {
int offset = getNumberOfBytes();

// m_count contains actor metadata *AND* routing metadata (if necessary)
/*
offset -= 2 * sizeof(int);
offset -= 2 * sizeof(int);
*/

//cout << "Bytes before: " << m_bytes << endl;

char * memory = (char*) getBuffer();

#ifdef CONFIG_ASSERT
assert(memory != NULL);
#endif

//cout << "DEBUG memory " << (void*)memory << " offset " << offset;
//cout << " MESSAGE_META_DATA_ROUTE_SOURCE " << MESSAGE_META_DATA_ROUTE_SOURCE << endl;

memcpy(&m_routingSource, memory + offset + MESSAGE_META_DATA_ROUTE_SOURCE, sizeof(int));
memcpy(&m_routingDestination, memory + offset + MESSAGE_META_DATA_ROUTE_DESTINATION, sizeof(int));

setNumberOfBytes(getNumberOfBytes() - 2 * sizeof(int));
//setNumberOfBytes(getNumberOfBytes() - 2 * sizeof(int));
//m_count -= 1;

#if 0
Expand All @@ -402,6 +421,11 @@ void Message::loadRoutingMetaData() {
print();
}

if(!(m_routingDestination >= 0 || m_routingDestination == ROUTING_NO_VALUE)) {

print();
}

assert(m_routingSource >= 0 || m_routingSource == ROUTING_NO_VALUE);
assert(m_routingDestination >= 0 || m_routingDestination == ROUTING_NO_VALUE);
#endif
Expand Down Expand Up @@ -431,16 +455,137 @@ int Message::getNumberOfBytes() const {
return m_bytes;
}

void Message::runAssertions() {
void Message::runAssertions(int size) {

#ifdef CONFIG_ASSERT

assert(m_source >= 0);
assert(m_source < size);

assert(m_destination >= 0);
assert(m_destination < size);

assert(m_sourceActor >= 0);
assert(m_destinationActor >= 0);

if(!(m_miniRankSource >= 0 || m_miniRankSource == NO_VALUE)) {
print();
}

assert(m_miniRankSource >= 0 || m_miniRankSource == NO_VALUE);
if(!(m_miniRankSource < size || m_miniRankSource == NO_VALUE)) {

cout << "Error" << endl;
print();
cout << "m_miniRankSource " << m_miniRankSource;
cout << " size " << size << endl;

}
assert(m_miniRankSource < size || m_miniRankSource == NO_VALUE);

assert(m_miniRankDestination >= 0 || m_miniRankDestination == NO_VALUE);
assert(m_miniRankDestination < size || m_miniRankDestination == NO_VALUE);

assert(m_routingSource >= 0 || m_routingSource == ROUTING_NO_VALUE);
assert(m_routingSource < size || m_routingSource == NO_VALUE);

assert(m_routingDestination >= 0 || m_routingDestination == ROUTING_NO_VALUE);
assert(m_routingDestination < size || m_routingDestination == NO_VALUE);

#endif

}

void Message::saveMetaData() {

//runAssertions(size);

#ifdef CONFIG_ASSERT

int bytes = getNumberOfBytes();
uint32_t checksumBefore = computeCyclicRedundancyCode32((uint8_t*)getBuffer(), bytes);
#endif

this->saveMiniRankMetaData();
this->saveActorMetaData();
this->saveRoutingMetaData();

#ifdef CONFIG_ASSERT
assert(getNumberOfBytes() >= 0);
assert(getNumberOfBytes() <= MAXIMUM_MESSAGE_SIZE_IN_BYTES);

uint32_t checksumAfter = computeCyclicRedundancyCode32((uint8_t*)getBuffer(), bytes);

assert(checksumBefore == checksumAfter);
#endif

// we need this for the transfer.
setNumberOfBytes(getNumberOfBytes() + MESSAGE_META_DATA_SIZE);
}

void Message::loadMetaData() {

// we just needed this for the transfer.
setNumberOfBytes(getNumberOfBytes() - MESSAGE_META_DATA_SIZE);

//cout << "loadMetaData MESSAGE_META_DATA_SIZE " << MESSAGE_META_DATA_SIZE << endl;

//cout << "loadMetaData bytes: " << getNumberOfBytes() << endl;

#ifdef CONFIG_ASSERT
assert(getNumberOfBytes() >= 0);
if(!(getNumberOfBytes() <= MAXIMUM_MESSAGE_SIZE_IN_BYTES)) {
cout << "Error: " << getNumberOfBytes() << endl;
}
assert(getNumberOfBytes() <= MAXIMUM_MESSAGE_SIZE_IN_BYTES);
#endif

this->loadRoutingMetaData();
this->loadActorMetaData();
this->loadMiniRankMetaData();

}

void Message::saveMiniRankMetaData() {

int sourceMiniRank = getSourceMiniRank();
int destinationMiniRank = getDestinationMiniRank();

memcpy(getBufferBytes() + getNumberOfBytes() + MESSAGE_META_DATA_MINIRANK_SOURCE,
&sourceMiniRank, sizeof(sourceMiniRank));

memcpy(getBufferBytes() + getNumberOfBytes() + MESSAGE_META_DATA_MINIRANK_DESTINATION,
&destinationMiniRank, sizeof(destinationMiniRank));
}

void Message::loadMiniRankMetaData() {

int sourceMiniRank = -1;
int destinationMiniRank = -1;

memcpy(
&sourceMiniRank,
getBufferBytes() + getNumberOfBytes() + MESSAGE_META_DATA_MINIRANK_SOURCE,
sizeof(sourceMiniRank));

memcpy(
&destinationMiniRank,
getBufferBytes() + getNumberOfBytes() + MESSAGE_META_DATA_MINIRANK_DESTINATION,
sizeof(destinationMiniRank));

setMiniRanks(sourceMiniRank, destinationMiniRank);
}

void Message::setMiniRanks(int source, int destination) {

m_miniRankSource = source;
m_miniRankDestination = destination;
}

int Message::getSourceMiniRank() {
return m_miniRankSource;
}

int Message::getDestinationMiniRank() {
return m_miniRankDestination;
}
29 changes: 23 additions & 6 deletions RayPlatform/communication/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ see <http://www.gnu.org/licenses/>
#define MESSAGE_META_DATA_ACTOR_DESTINATION ( MESSAGE_META_DATA_ACTOR_SOURCE + sizeof(int) )
#define MESSAGE_META_DATA_ROUTE_SOURCE ( MESSAGE_META_DATA_ACTOR_DESTINATION + sizeof(int) )
#define MESSAGE_META_DATA_ROUTE_DESTINATION ( MESSAGE_META_DATA_ROUTE_SOURCE + sizeof(int))
#define MESSAGE_META_DATA_CHECKSUM ( MESSAGE_META_DATA_ROUTE_DESTINATION + sizeof(int))
#define MESSAGE_META_DATA_MINIRANK_SOURCE ( MESSAGE_META_DATA_ROUTE_DESTINATION + sizeof(int) )
#define MESSAGE_META_DATA_MINIRANK_DESTINATION ( MESSAGE_META_DATA_MINIRANK_SOURCE + sizeof(int) )
#define MESSAGE_META_DATA_CHECKSUM ( MESSAGE_META_DATA_MINIRANK_DESTINATION + sizeof(int))
#define MESSAGE_META_DATA_SIZE ( MESSAGE_META_DATA_CHECKSUM + sizeof(uint32_t) )


Expand Down Expand Up @@ -115,13 +117,24 @@ class Message{
* Must be >=0 and <= MPI_Comm_size()-1 */
Rank m_source;

int m_miniRankSource;
int m_miniRankDestination;

int m_sourceActor;
int m_destinationActor;

int m_routingSource;
int m_routingDestination;

void initialize();

void loadActorMetaData();
void saveActorMetaData();
void loadMiniRankMetaData();
void saveMiniRankMetaData();
void loadRoutingMetaData();
void saveRoutingMetaData();

public:
Message();
~Message();
Expand Down Expand Up @@ -173,21 +186,25 @@ class Message{
int getSourceActor() const;
void setSourceActor(int sourceActor);
void setDestinationActor(int destinationActor);
void saveActorMetaData();
void loadActorMetaData();

int getMetaDataSize() const;
void printActorMetaData();

void setRoutingSource(int source);
void setRoutingDestination(int destination);
void saveRoutingMetaData();

int getRoutingSource() const;
int getRoutingDestination() const;
void loadRoutingMetaData();
void displayMetaData();

void runAssertions();
void runAssertions(int size);

void saveMetaData();
void loadMetaData();

void setMiniRanks(int source, int destination);
int getDestinationMiniRank();
int getSourceMiniRank();
};

#endif
Loading

0 comments on commit ce606c4

Please sign in to comment.