Skip to content

Commit

Permalink
Allow operators after child-producing operators
Browse files Browse the repository at this point in the history
Previously, if other operators were performed after a child-producing
operator (such as reconstruction), the behavior would be somewhat
unpredictable, depending on several factors, including whether a
description.json file was present, and whether the pipeline was
internal or external.

This fixes the behavior so that operators can be performed after
child-producing operators. This was done by making sure that the output
from child-producing operators gets passed down as input to the next
operator. This needed to be done both in the external pipeline, and in
the internal pipeline. Additionally, this also required that modules be
passed down to the output of the next operator as well.

This commit additionally allows child-producing operators to work in the
internal pipeline without the description.json file present, which was
not possible before (it required that the child be named explicitly).

The external pipeline should not emit that the pipeline is finished if
a child datasource is encountered. That was removed.

The snapshot operator does not have any useful behavior, currently. But
it should work well with intermediate data sources, since it will
essentially create an intermediate data source.

Signed-off-by: Patrick Avery <[email protected]>
  • Loading branch information
psavery committed Apr 7, 2020
1 parent b5e5921 commit ce045ee
Show file tree
Hide file tree
Showing 10 changed files with 218 additions and 171 deletions.
190 changes: 112 additions & 78 deletions tomviz/Pipeline.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
#include "DataSource.h"
#include "DockerExecutor.h"
#include "DockerUtilities.h"
#include "EmdFormat.h"
#include "ExternalPythonExecutor.h"
#include "ModuleManager.h"
#include "Operator.h"
Expand Down Expand Up @@ -180,11 +179,13 @@ Pipeline::Future* Pipeline::execute(DataSource* dataSource)

auto operators = dataSource->operators();
if (operators.size() < 1) {
emit finished();
return emptyFuture();
}

Operator* firstModifiedOperator = operators.first();
if (!isModified(dataSource, &firstModifiedOperator)) {
emit finished();
return emptyFuture();
}

Expand Down Expand Up @@ -269,6 +270,37 @@ Pipeline::Future* Pipeline::execute(DataSource* ds, Operator* start,

auto pipelineFuture = new PipelineFutureInternal(
this, branchFuture->operators(), branchFuture, operators.last() == end);

// After the pipeline finishes, move the modules to the back and
// remove the old child data source.
// If the operator to be removed has a child, move those modules to
// the back. Otherwise, move the transformed data source modules.
DataSource* oldChild = nullptr;
if (m_lastOperatorChildRemoved) {
// This indicates that an operator was just removed. Use that.
oldChild = m_lastOperatorChildRemoved;
m_lastOperatorChildRemoved = nullptr;
} else {
// Get either the child data source of the last operator or the
// transformed data source (which could be the root data source).
auto lastOp = operators.last();
oldChild = lastOp->childDataSource() ? lastOp->childDataSource()
: transformedDataSource();
}

connect(pipelineFuture, &Pipeline::Future::finished, oldChild,
[this, oldChild]() {
auto newChild = transformedDataSource();
if (newChild != oldChild) {
// If the child is not the same, move the modules to the new one
ModuleManager::instance().moveModules(oldChild, newChild);
if (!oldChild->forkable()) {
// Remove the old child data source if it was not forkable
ModuleManager::instance().removeChildDataSource(oldChild);
}
}
});

connect(pipelineFuture, &Pipeline::Future::finished, this,
&Pipeline::finished);

Expand Down Expand Up @@ -355,57 +387,68 @@ void Pipeline::branchFinished()
if (operators.isEmpty()) {
return;
}
auto start = future->operators().first()->dataSource();
auto start = operators.first()->dataSource();
auto newData = future->result();
// We only add the transformed child data source if the last operator
// doesn't already have an explicit child data source i.e.
// hasChildDataSource is true.

// Set the output data on the final operator's child data source
auto lastOp = start->operators().last();
if (!lastOp->hasChildDataSource()) {
DataSource* newChildDataSource = nullptr;
if (lastOp->childDataSource() == nullptr) {
newChildDataSource = new DataSource("Output");
newChildDataSource->setPersistenceState(
tomviz::DataSource::PersistenceState::Transient);
newChildDataSource->setForkable(false);
newChildDataSource->setParent(this);
lastOp->setChildDataSource(newChildDataSource);
auto rootDataSource = dataSource();
// connect signal to flow units and spacing to child data source.
connect(dataSource(), &DataSource::dataPropertiesChanged,
[rootDataSource, newChildDataSource]() {
// Only flow the properties if no user modifications have been
// made.
if (!newChildDataSource->unitsModified()) {
newChildDataSource->setUnits(rootDataSource->getUnits(),
false);
double spacing[3];
rootDataSource->getSpacing(spacing);
newChildDataSource->setSpacing(spacing, false);
}
});
}
QString label = "Output";

auto child = lastOp->childDataSource();
if (child) {
// Remove the old child, and create a new one from the output data.
// We will always use the output data for the final output.
label = child->label();
ModuleManager::instance().removeChildDataSource(child);
lastOp->setChildDataSource(nullptr);
}

// Update the type if necessary
DataSource::DataSourceType type = DataSource::hasTiltAngles(newData)
? DataSource::TiltSeries
: DataSource::Volume;
lastOp->childDataSource()->setData(newData);
lastOp->childDataSource()->setType(type);
lastOp->childDataSource()->dataModified();

if (newChildDataSource != nullptr) {
emit lastOp->newChildDataSource(newChildDataSource);
// Move modules from root data source.
ModuleManager::instance().moveModules(dataSource(), newChildDataSource);
// Remove any children produced by the operators. We currently do not
// support intermediate data sources.
for (auto op : operators) {
if (op->childDataSource()) {
ModuleManager::instance().removeChildDataSource(op->childDataSource());
op->setChildDataSource(nullptr);
}
}
else {
// If this is the only operator, make sure the modules are moved down.
if (start->operators().size() == 1)
ModuleManager::instance().moveModules(dataSource(),
lastOp->childDataSource());
}

// Create one
child = new DataSource(label);
child->setPersistenceState(tomviz::DataSource::PersistenceState::Transient);
lastOp->setChildDataSource(child);

// TODO: the following should be set to this, once we get
// intermediate datasets working.
// child->setForkable(hasChildDataSource());
child->setForkable(false);

// TODO: when we get intermediate datasets working, this data source
// should have the same pipeline, with pauses in the pipeline at
// forkable data sources.
child->setParent(this);

auto rootDataSource = dataSource();
// connect signal to flow units and spacing to child data source.
connect(dataSource(), &DataSource::dataPropertiesChanged, child,
[rootDataSource, child]() {
// Only flow the properties if no user modifications have been
// made.
if (!child->unitsModified()) {
child->setUnits(rootDataSource->getUnits(), false);
double spacing[3];
rootDataSource->getSpacing(spacing);
child->setSpacing(spacing, false);
}
});

DataSource::DataSourceType type = DataSource::hasTiltAngles(newData)
? DataSource::TiltSeries
: DataSource::Volume;
child->setData(newData);
child->setType(type);
child->dataModified();

emit lastOp->newChildDataSource(child);
}

void Pipeline::pause()
Expand Down Expand Up @@ -483,22 +526,18 @@ void Pipeline::addDataSource(DataSource* dataSource)
&Operator::newChildDataSource),
[this](DataSource* ds) { addDataSource(ds); });

// We need to ensure we move add datasource to the end of the branch
auto operators = op->dataSource()->operators();
if (operators.size() > 1) {
auto transformedDataSourceOp =
findTransformedDataSourceOperator(op->dataSource());
if (transformedDataSourceOp != nullptr) {
auto transformedDataSource = transformedDataSourceOp->childDataSource();
transformedDataSourceOp->setChildDataSource(nullptr);
op->setChildDataSource(transformedDataSource);
emit operatorAdded(op, transformedDataSource);
} else {
emit operatorAdded(op);
}
} else {
emit operatorAdded(op);
}
// This might also be the root datasource, which is okay
auto oldChild = transformedDataSource(op->dataSource());

// This is just to make the modules "move down" in the view before
// the operator is ran.
DataSource* outputDS = nullptr;
auto transformedDataSourceOp =
findTransformedDataSourceOperator(op->dataSource());
if (transformedDataSourceOp)
outputDS = oldChild;

emit operatorAdded(op, outputDS);
});
// Wire up operatorRemoved. TODO We need to check the branch of the
// pipeline we are currently executing.
Expand All @@ -509,21 +548,16 @@ void Pipeline::addDataSource(DataSource* dataSource)
if (!op->isNew()) {
m_operatorsDeleted = true;
}
if (op->childDataSource() != nullptr) {
auto transformedDataSource = op->childDataSource();
auto operators = op->dataSource()->operators();
// We have an operator to move it to.
if (!operators.isEmpty()) {
auto newOp = operators.last();
op->setChildDataSource(nullptr);
newOp->setChildDataSource(transformedDataSource);
emit newOp->dataSourceMoved(transformedDataSource);
}
// Clean it up
else {
transformedDataSource->removeAllOperators();
transformedDataSource->deleteLater();
}

if (op->dataSource()->operators().isEmpty() && op->childDataSource()) {
// The last operator was removed. Move all the modules to the root.
ModuleManager::instance().moveModules(op->childDataSource(),
op->dataSource());
ModuleManager::instance().removeChildDataSource(op->childDataSource());
} else if (op->childDataSource()) {
// Save this so we can move the modules from it and delete it
// later.
m_lastOperatorChildRemoved = op->childDataSource();
}

// If pipeline is running see if we can safely remove the operator
Expand Down
1 change: 1 addition & 0 deletions tomviz/Pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ private slots:
QScopedPointer<PipelineExecutor> m_executor;
ExecutionMode m_executionMode = Threaded;
int m_editingOperators = 0;
DataSource* m_lastOperatorChildRemoved = nullptr;
};

/// Return from getCopyOfImagePriorTo for caller to track async operation.
Expand Down
14 changes: 2 additions & 12 deletions tomviz/PipelineExecutor.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ Pipeline::Future* ExternalPipelineExecutor::execute(vtkDataObject* data,
if (!m_temporaryDir->isValid()) {
displayError("Directory Error", "Unable to create temporary directory.");
return Pipeline::emptyFuture();
;
}

QString origFileName = originalFileName();
Expand Down Expand Up @@ -169,12 +168,10 @@ Pipeline::Future* ExternalPipelineExecutor::execute(vtkDataObject* data,
&ExternalPipelineExecutor::pipelineStarted);
connect(m_progressReader.data(), &ProgressReader::pipelineFinished, this,
[this, future]() {
// Read the modified active scalars
auto transformedFilePath =
QDir(workingDir()).filePath(TRANSFORM_FILENAME);
vtkSmartPointer<vtkDataObject> transformedData =
vtkImageData::New();
vtkImageData* transformedImageData =
vtkImageData::SafeDownCast(transformedData.Get());
vtkNew<vtkImageData> transformedImageData;
// Make sure we don't ask the user about subsampling
QVariantMap options = { { "askForSubsample", false } };
if (EmdFormat::read(transformedFilePath.toLatin1().data(),
Expand All @@ -186,7 +183,6 @@ Pipeline::Future* ExternalPipelineExecutor::execute(vtkDataObject* data,
.arg(transformedFilePath));
}
emit future->finished();
transformedImageData->FastDelete();
});
connect(future, &Pipeline::Future::finished, this,
&ExternalPipelineExecutor::reset);
Expand Down Expand Up @@ -250,11 +246,6 @@ void ExternalPipelineExecutor::operatorStarted(Operator* op)
{
op->setState(OperatorState::Running);
emit op->transformingStarted();

auto pythonOp = qobject_cast<OperatorPython*>(op);
if (pythonOp != nullptr) {
pythonOp->createChildDataSource();
}
}

void ExternalPipelineExecutor::operatorFinished(Operator* op)
Expand All @@ -277,7 +268,6 @@ void ExternalPipelineExecutor::operatorFinished(Operator* op)
if (EmdFormat::read(fileInfo.filePath().toLatin1().data(), childData,
options)) {
childOutput[name] = childData;
emit pipeline()->finished();
} else {
displayError(
"Read Error",
Expand Down
2 changes: 0 additions & 2 deletions tomviz/PipelineModel.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,6 @@ bool PipelineModel::TreeItem::remove(DataSource* source)
// Resume but don't execute as we are removing this data source.
pipeline->resume();
}
} else if (childItem->module()) {
ModuleManager::instance().removeModule(childItem->module());
}
}
if (parent()) {
Expand Down
1 change: 0 additions & 1 deletion tomviz/Tvh5Format.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ bool Tvh5Format::loadDataSource(h5::H5ReadWrite& reader,
if (parent) {
// This is a child data source. Hook it up to the operator parent.
parent->setChildDataSource(dataSource);
parent->setHasChildDataSource(true);
parent->newChildDataSource(dataSource);
// If it has a parent, it will be deserialized later.
} else {
Expand Down
29 changes: 16 additions & 13 deletions tomviz/operators/Operator.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -193,23 +193,26 @@ void Operator::createNewChildDataSource(
const QString& label, vtkSmartPointer<vtkDataObject> childData,
DataSource::DataSourceType type, DataSource::PersistenceState state)
{
if (this->childDataSource() == nullptr) {
DataSource* childDS =
new DataSource(vtkImageData::SafeDownCast(childData), type,
this->dataSource()->pipeline(), state);
childDS->setLabel(label);
setChildDataSource(childDS);
setHasChildDataSource(true);
emit Operator::newChildDataSource(childDS);
auto child = childDataSource();
if (!child) {
child = new DataSource(vtkImageData::SafeDownCast(childData), type,
this->dataSource()->pipeline(), state);
child->setLabel(label);
setChildDataSource(child);
emit Operator::newChildDataSource(child);
}
// Reuse the existing "Output" data source.
else {
childDataSource()->setData(childData);
childDataSource()->setLabel(label);
childDataSource()->setForkable(true);
childDataSource()->dataModified();
setHasChildDataSource(true);
child->setData(childData);
child->setType(type);
child->setLabel(label);
child->setPersistenceState(state);
child->dataModified();
}
// TODO: the following should be set to this, once we get
// intermediate datasets working.
// child->setForkable(hasChildDataSource());
child->setForkable(false);
}

void Operator::cancelTransform()
Expand Down
10 changes: 10 additions & 0 deletions tomviz/operators/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,19 @@ class Operator : public QObject
virtual OperatorResult* resultAt(int index) const;

/// Set whether the operator is expected to produce a child DataSource.
/// TODO: this is beginning to take on a new meaning, since every
/// operator now produces a child data source. The new meaning is:
/// should the child data source be kept and not thrown away when
/// another operator is performed on it?
/// Maybe we should change the naming?
virtual void setHasChildDataSource(bool value);

/// Get whether the operator is expected to produce a child DataSource.
/// TODO: this is beginning to take on a new meaning, since every
/// operator now produces a child data source. The new meaning is:
/// should the child data source be kept and not thrown away when
/// another operator is performed on it?
/// Maybe we should change the naming?
virtual bool hasChildDataSource() const;

/// Set the child DataSource. Can be nullptr.
Expand Down
Loading

0 comments on commit ce045ee

Please sign in to comment.