Skip to content

Commit

Permalink
Take option 2 and fail the 'remaining' at the 99% error (#4061)
Browse files Browse the repository at this point in the history
  • Loading branch information
bmarchant authored Jun 3, 2020
1 parent fa73603 commit f128622
Showing 1 changed file with 18 additions and 24 deletions.
42 changes: 18 additions & 24 deletions hoot-core/src/main/cpp/hoot/core/io/OsmApiWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ bool OsmApiWriter::apply()
if (_allThreadsFailed())
{
_changeset.failRemainingChangeset();
continue;
_threadsCanExit = true;
break;
}
// Only queue up enough work to keep all the threads busy with times QUEUE_SIZE_MULTIPLIER
// so that results can come back and update the changeset for more atomic changesets instead
Expand All @@ -196,7 +197,7 @@ bool OsmApiWriter::apply()
// all of the threads are idle and not waiting for something to come back
// There are two things that can be done here, first is to put everything that is
// "ready to send" in a changeset and send it OR move everything to the error state

/*
// Option #1: Get all of the remaining elements as a single changeset
_changesetMutex.lock();
_changeset.calculateRemainingChangeset(changeset_info);
Expand All @@ -205,6 +206,15 @@ bool OsmApiWriter::apply()
_pushChangesets(changeset_info);
// Let the threads know that the remaining changeset is the "remaining" changeset
_threadsCanExit = true;
*/
LOG_STATUS("Apply Changeset: Remaining elements unsendable...");
// Option #2: Move everything to the error state and exit
_changesetMutex.lock();
_changeset.failRemainingChangeset();
_changesetMutex.unlock();
// Let the threads know that the remaining changeset has failed
_threadsCanExit = true;
break;
}
else
{
Expand Down Expand Up @@ -352,16 +362,6 @@ void OsmApiWriter::_changesetThreadFunc(int index)
_changesetMutex.unlock();
// Update the size of the current changeset that is open
changesetSize += workInfo->size();
// Remove the "remaining" file if the remaining was successful
if (workInfo->getLast())
{
_changeset.updateRemainingChangeset();
// Let the threads know that the remaining changeset is the "remaining" changeset
_threadsCanExit = true;
// Looping should end the thread because all of the remaining elements have now been sent
stop_thread = true;
continue;
}
// When the current changeset is nearing the 50k max (or the specified max), close the changeset
// otherwise keep it open and go again
if (changesetSize > _maxChangesetSize - (int)(_maxPushSize * 1.5))
Expand Down Expand Up @@ -474,7 +474,12 @@ void OsmApiWriter::_changesetThreadFunc(int index)
}
else
{
if (_changeset.hasElementsToSend() && !_changeset.isDone() && queueSize == 0)
if (_threadsCanExit)
{
stop_thread = true;
_updateThreadStatus(index, ThreadStatus::Completed);
}
else if (!_changeset.isDone() && queueSize == 0)
{
// This is a bad state where the producer thread says all elements are sent and
// waits for all threads to join but the changeset isn't "done".
Expand All @@ -492,17 +497,6 @@ void OsmApiWriter::_changesetThreadFunc(int index)
id = -1;
}
_threadStatusMutex.unlock();
// In this case there are elements that have been sent and not reported back
// BUT there are no threads that are waiting for them either. Every thread
// except the "first" worker thread will exit here. The first worker thread
// will wait for the producer thread to calculate the remaining changeset and
// push in on the queue. It then loops around and picks up the remaining
// changeset and processes it.
if (_threadsAreIdle() && index != 0 && _threadsCanExit)
{
stop_thread = true;
_updateThreadStatus(index, ThreadStatus::Completed);
}
}
else
{
Expand Down

0 comments on commit f128622

Please sign in to comment.