Skip to content

Commit

Permalink
fix a regression introduced in a01f97e
Browse files Browse the repository at this point in the history
Internal buffers must be aligned on 8 bytes because a lot of framework
code assumes that this alignment is there.

Signed-off-by: Sébastien Boisvert <[email protected]>
  • Loading branch information
Sébastien Boisvert committed Nov 5, 2013
1 parent a01f97e commit b862e4a
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 57 deletions.
12 changes: 4 additions & 8 deletions RayPlatform/communication/Message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,6 @@ using namespace std;
*
*/
#define MESSAGE_META_DATA_ACTOR_SOURCE (0 * sizeof(int) )
#define MESSAGE_META_DATA_ACTOR_DESTINATION (1 * sizeof(int) )
#define MESSAGE_META_DATA_ROUTE_SOURCE (2 * sizeof(int) )
#define MESSAGE_META_DATA_ROUTE_DESTINATION (3 * sizeof(int) )
#define MESSAGE_META_DATA_CHECKSUM (4 * sizeof(int) )


Message::Message() {

Expand All @@ -72,8 +66,10 @@ void Message::initialize() {
Message::~Message() {
}

/** buffer must be allocated or else it will CORE DUMP. */
Message::Message(MessageUnit*b,int c,Rank dest,MessageTag tag,Rank source){
/**
* buffer must be allocated or else it will CORE DUMP.
*/
Message::Message(void*b,int c,Rank dest,MessageTag tag,Rank source){

initialize();

Expand Down
76 changes: 46 additions & 30 deletions RayPlatform/communication/Message.h
Original file line number Diff line number Diff line change
@@ -1,48 +1,57 @@
/*
RayPlatform: a message-passing development framework
Copyright (C) 2010, 2011, 2012 Sébastien Boisvert
RayPlatform: a message-passing development framework
Copyright (C) 2010, 2011, 2012 Sébastien Boisvert
http://github.com/sebhtml/RayPlatform
http://github.com/sebhtml/RayPlatform
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, version 3 of the License.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, version 3 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You have received a copy of the GNU Lesser General Public License
along with this program (lgpl-3.0.txt).
see <http://www.gnu.org/licenses/>
You have received a copy of the GNU Lesser General Public License
along with this program (lgpl-3.0.txt).
see <http://www.gnu.org/licenses/>
*/

#ifndef _Message_H
#define _Message_H

#define MESSAGE_META_DATA_ACTOR_SOURCE 0
#define MESSAGE_META_DATA_ACTOR_DESTINATION ( MESSAGE_META_DATA_ACTOR_SOURCE + sizeof(int) )
#define MESSAGE_META_DATA_ROUTE_SOURCE ( MESSAGE_META_DATA_ACTOR_DESTINATION + sizeof(int) )
#define MESSAGE_META_DATA_ROUTE_DESTINATION ( MESSAGE_META_DATA_ROUTE_SOURCE + sizeof(int))
#define MESSAGE_META_DATA_CHECKSUM ( MESSAGE_META_DATA_ROUTE_DESTINATION + sizeof(int))
#define MESSAGE_META_DATA_SIZE ( MESSAGE_META_DATA_CHECKSUM + sizeof(uint32_t) )



#include <RayPlatform/core/types.h>

#include <stdint.h>

/**
* In Ray, every message is a Message.
* the inbox and the outbox are arrays of Message's
* All the code in Ray utilise Message to communicate information.
* MPI_Datatype is always MPI_UNSIGNED_LONG_LONG
* m_count is >=0 and <= MAXIMUM_MESSAGE_SIZE_IN_BYTES/sizeof(uint64_t)
* (default is 4000/8 = 500 ).
*
* in the buffer:
*
* Application payload (works)
* int source actor (works)
* int destination actor (works)
* int routing source (works)
* int routing destination (works)
* int checksum (probably broken)
*
* In Ray, every message is a Message.
* the inbox and the outbox are arrays of Message's
* All the code in Ray utilise Message to communicate information.
* MPI_Datatype is always MPI_UNSIGNED_LONG_LONG
* m_count is >=0 and <= MAXIMUM_MESSAGE_SIZE_IN_BYTES/sizeof(uint64_t)
* (default is 4000/8 = 500 ).
*
* in the buffer:
*
* Application payload (works)
* int source actor (works)
* int destination actor (works)
* int routing source (works)
* int routing destination (works)
* int checksum (probably broken)
*
* The offset (see Message.cpp) are relative to the number of bytes *BEFORE*
* adding this metadata.
* Note that the m_count is the number of MessageUnit for the message
Expand Down Expand Up @@ -116,7 +125,14 @@ class Message{
public:
Message();
~Message();
Message(MessageUnit * b,int c,Rank dest,MessageTag tag,Rank source);

/**
* numberOf8Bytes is there for historical reasons.
* You can call setNumberOfBytes afterward to set the number of bytes anyway.
*
*/
Message(void* buffer, int numberOf8Bytes, Rank destination, MessageTag tag,
Rank source);
MessageUnit *getBuffer();
char * getBufferBytes();
int getCount() const;
Expand Down
6 changes: 3 additions & 3 deletions RayPlatform/communication/MessagesHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ void MessagesHandler::probeAndRead(Rank source,MessageTag tag,
assert(count >= 0);
#endif

MessageUnit staticBuffer[MAXIMUM_MESSAGE_SIZE_IN_BYTES/sizeof(MessageUnit)+2];
char staticBuffer[MAXIMUM_MESSAGE_SIZE_IN_BYTES + MESSAGE_META_DATA_SIZE];

MPI_Recv(staticBuffer,count,datatype,actualSource,actualTag,MPI_COMM_WORLD,&status);

Expand Down Expand Up @@ -1124,9 +1124,9 @@ void MessagesHandler::receiveMessages(StaticVector*inbox,RingAllocator*inboxAllo
assert(count >= 0);
#endif

MessageUnit*incoming=NULL;
char * incoming=NULL;
if(count > 0){
incoming=(MessageUnit*)inboxAllocator->allocate(count * sizeof(char));
incoming=(char*)inboxAllocator->allocate(count * sizeof(char));
}

MPI_Recv(incoming,count,datatype,actualSource,actualTag,MPI_COMM_WORLD,&status);
Expand Down
74 changes: 58 additions & 16 deletions RayPlatform/core/ComputeCore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@ void ComputeCore::sendMessages(){
assert(forceMemoryReallocation || aMessage->getCount()==0||numberOfMessages==m_size);
#endif

char * buffer=(char*)(MessageUnit*)m_outboxAllocator.allocate(MAXIMUM_MESSAGE_SIZE_IN_BYTES);
char * buffer=(char*)m_outboxAllocator.allocate(MAXIMUM_MESSAGE_SIZE_IN_BYTES);

#ifdef ASSERT
assert(buffer!=NULL);
Expand Down Expand Up @@ -739,7 +739,7 @@ void ComputeCore::sendMessages(){
// we need a buffer to store the actor metadata
if(message->getBuffer() == NULL) {
void * buffer = m_outboxAllocator.allocate(0);
message->setBuffer((MessageUnit*)buffer);
message->setBuffer(buffer);
}

/*
Expand Down Expand Up @@ -889,8 +889,8 @@ void ComputeCore::verifyMessageChecksums(){
assert(message->getNumberOfBytes() >= 0);
#endif

// the last MessageUnit contains the crc32.
// note that the CRC32 feature only works for MessageUnit at least
// the last Message Unit contains the crc32.
// note that the CRC32 feature only works for Message Unit at least
// 32 bits.
int numberOfBytes = m_inbox.at(i)->getNumberOfBytes();

Expand Down Expand Up @@ -1056,7 +1056,7 @@ void ComputeCore::constructor(int argc,char**argv,int miniRankNumber,int numberO
#endif

// checksum calculation is only tested
// for cases with sizeof(MessageUnit)>=4 bytes
// for cases with sizeof(Message Unit)>=4 bytes

m_routerIsEnabled=false;

Expand Down Expand Up @@ -1183,33 +1183,75 @@ void ComputeCore::configureEngine() {
cout<<"[ComputeCore] the outbox capacity is "<<m_maximumNumberOfOutboxMessages<<" message"<<endl;
#endif

int maximumMessageSizeInByte=MAXIMUM_MESSAGE_SIZE_IN_BYTES;
int maximumMessageSizeInBytes = MAXIMUM_MESSAGE_SIZE_IN_BYTES;

bool useActorModel = true;

// add a message unit to store the checksum or the routing information
// with 64-bit integers as MessageUnit, this is 4008 bytes or 501 MessageUnit maximum
// with 64-bit integers as Message Unit, this is 4008 bytes or 501 Message Unit maximum
// also add 8 bytes for actor metadata
// finally, add 4 bytes for the checksum, if necessary.

if(m_miniRanksAreEnabled || m_doChecksum || m_routerIsEnabled || useActorModel){

int headerSize = 0;

headerSize = MESSAGE_META_DATA_SIZE;

/*
// actor model
maximumMessageSizeInByte += 2 * sizeof(int) * sizeof(char);
headerSize += ( 2 * sizeof(int) * sizeof(char) );
// routing
maximumMessageSizeInByte += 2 * sizeof(int) * sizeof(char);
headerSize += ( 2 * sizeof(int) * sizeof(char));
// checksum
maximumMessageSizeInByte += 1 * sizeof(uint32_t) * sizeof(char);
headerSize += ( 1 * sizeof(uint32_t) * sizeof(char) );
*/

cout << "DEBUG MAXIMUM_MESSAGE_SIZE_IN_BYTES " << MAXIMUM_MESSAGE_SIZE_IN_BYTES;
cout << " headerSize " << headerSize << endl;

// align this on 8 bytes because there is still something in the code
// that is incorrect apparently.
//
// TODO: find why this needs to be aligned on 8 bytes. This is related to
// sizeof(MessageUnit) being 8 bytes.
//
// => it is likely because of the underlying system that evolved to
// use 64 bits per message unit.

bool align = true;
//align = false;

int base = 8;
int remainder = headerSize % base;

if(align && remainder != 0) {

int toAdd = base - remainder;

headerSize += toAdd;
}

cout << " headerSize (with padding) " << headerSize << endl;

maximumMessageSizeInBytes += headerSize;
}

cout << "DEBUG maximumMessageSizeInBytes " << maximumMessageSizeInBytes << endl;

#ifdef CONFIG_ASSERT
assert(maximumMessageSizeInBytes >= MAXIMUM_MESSAGE_SIZE_IN_BYTES);
assert(maximumMessageSizeInBytes >= MAXIMUM_MESSAGE_SIZE_IN_BYTES + 20);// we need 20 bytes for storing message header
#endif

m_inboxAllocator.constructor(m_maximumAllocatedInboxBuffers,
maximumMessageSizeInByte,
maximumMessageSizeInBytes,
"RAY_MALLOC_TYPE_INBOX_ALLOCATOR",false);

m_outboxAllocator.constructor(m_maximumAllocatedOutboxBuffers,
maximumMessageSizeInByte,
maximumMessageSizeInBytes,
"RAY_MALLOC_TYPE_OUTBOX_ALLOCATOR",false);

if(m_miniRanksAreEnabled){
Expand All @@ -1219,18 +1261,18 @@ void ComputeCore::configureEngine() {
#endif

m_bufferedInboxAllocator.constructor(m_maximumAllocatedOutboxBuffers,
maximumMessageSizeInByte,
maximumMessageSizeInBytes,
"RAY_MALLOC_TYPE_INBOX_ALLOCATOR/Buffered",false);

m_bufferedOutboxAllocator.constructor(m_maximumAllocatedOutboxBuffers,
maximumMessageSizeInByte,
maximumMessageSizeInBytes,
"RAY_MALLOC_TYPE_OUTBOX_ALLOCATOR/Buffered",false);

}

#ifdef CONFIG_DEBUG_CORE
cout<<"[ComputeCore] allocated "<<m_maximumAllocatedInboxBuffers<<" buffers of size "<<maximumMessageSizeInByte<<" for inbox messages"<<endl;
cout<<"[ComputeCore] allocated "<<m_maximumAllocatedOutboxBuffers<<" buffers of size "<<maximumMessageSizeInByte<<" for outbox messages"<<endl;
cout<<"[ComputeCore] allocated "<<m_maximumAllocatedInboxBuffers<<" buffers of size "<<maximumMessageSizeInBytes<<" for inbox messages"<<endl;
cout<<"[ComputeCore] allocated "<<m_maximumAllocatedOutboxBuffers<<" buffers of size "<<maximumMessageSizeInBytes<<" for outbox messages"<<endl;
#endif

}
Expand Down

0 comments on commit b862e4a

Please sign in to comment.