Skip to content

Commit

Permalink
[HWORKS-737] Deleted project leads to NPE when processing SearchFSCom…
Browse files Browse the repository at this point in the history
…mand (#1548)
  • Loading branch information
o-alex committed Oct 2, 2023
1 parent ccb1aa9 commit f4cf4df
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import io.hops.hopsworks.persistence.entity.commands.search.SearchFSCommand;
import io.hops.hopsworks.persistence.entity.commands.search.SearchFSCommandHistory;
import io.hops.hopsworks.persistence.entity.commands.search.SearchFSCommandOp;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.restutils.RESTCodes;

import javax.annotation.PostConstruct;
Expand Down Expand Up @@ -138,7 +137,7 @@ public void process() {
try {
processInt();
} catch (Exception t) {
LOGGER.log(Level.INFO, "Command processing failed with error:", t.getStackTrace());
LOGGER.log(Level.INFO, "Command processing failed with error", t);
}
}
schedule();
Expand All @@ -160,34 +159,32 @@ private void processInt() throws CommandException {
return;
}
Set<Long> updatingDocs = updatingCommands.stream().map(this::getDocId).collect(Collectors.toSet());
Map<Integer, Project> updatingProjects = updatingCommands.stream()
.collect(Collectors.toMap(c -> c.getProject().getId(), Command::getProject, (p1, p2) -> p1));
Set<Integer> updatingProjects = updatingCommands.stream().map(Command::getProjectId).collect(Collectors.toSet());

List<SearchFSCommand> cleaningCommands = commandFacade.findByQuery(queryByStatus(CommandStatus.CLEANING));
active += cleaningCommands.size();
if (active >= maxOngoing) {
return;
}
Set<Long> cleaningDocs = cleaningCommands.stream().map(this::getDocId).collect(Collectors.toSet());
updatingProjects.putAll(cleaningCommands.stream().collect(Collectors.toMap(c -> c.getProject().getId(),
Command::getProject, (existingP, newP) -> existingP)));
updatingProjects.addAll(cleaningCommands.stream().map(Command::getProjectId).collect(Collectors.toSet()));

Map<Integer, SearchFSCommand> toDeleteProjects = commandFacade.findByQuery(queryDeletedProjects(CommandStatus.NEW))
.stream().collect(Collectors.toMap(c -> c.getProject().getId(), c -> c, (existingC, newC) -> existingC));
.stream().collect(Collectors.toMap(Command::getProjectId, c -> c, (existingC, newC) -> existingC));
Map<Integer, SearchFSCommand> deletingProjects
= commandFacade.findByQuery(queryDeletedProjects(CommandStatus.CLEANING))
.stream().collect(Collectors.toMap(c -> c.getProject().getId(), c -> c, (existingC, newC) -> existingC));
.stream().collect(Collectors.toMap(Command::getProjectId, c -> c, (existingC, newC) -> existingC));

List<SearchFSCommand> failedCommands = commandFacade.findByQuery(queryByStatus(CommandStatus.FAILED));
Set<Long> failedDocs = failedCommands.stream().map(this::getDocId).collect(Collectors.toSet());

//clean deleted projects that are not actively worked on
Set<Integer> deletedAndNotActive = Sets.difference(toDeleteProjects.keySet(), updatingProjects.keySet());
Set<Integer> deletedAndNotActive = Sets.difference(toDeleteProjects.keySet(), updatingProjects);
for (Integer idx : deletedAndNotActive) {
cleanDeletedProject(toDeleteProjects.get(idx));
}
//determine projects to be excluded
Set<Project> excludeProjects = unionProjects(toDeleteProjects, deletingProjects);
Set<Integer> excludeProjects = unionProjects(toDeleteProjects, deletingProjects);
//determine documents to be excluded
Set<Long> excludeDocs = new HashSet<>();
excludeDocs.addAll(updatingDocs);
Expand Down Expand Up @@ -217,10 +214,10 @@ private void processInt() throws CommandException {
}
}

private Set<Project> unionProjects(Map<Integer, SearchFSCommand> p1, Map<Integer, SearchFSCommand> p2) {
Set<Project> result = new HashSet<>();
p1.values().forEach(c -> result.add(c.getProject()));
p2.values().forEach(c -> result.add(c.getProject()));
private Set<Integer> unionProjects(Map<Integer, SearchFSCommand> p1, Map<Integer, SearchFSCommand> p2) {
Set<Integer> result = new HashSet<>();
p1.values().forEach(c -> result.add(c.getProjectId()));
p2.values().forEach(c -> result.add(c.getProjectId()));
return result;
}

Expand All @@ -230,20 +227,20 @@ private void cleanDeletedProject(SearchFSCommand command) {
Try<Boolean> result = processFunction().apply(command);
try {
if(result.checkedGet()) {
List<SearchFSCommand> toSkip = commandFacade.findByQuery(queryByProject(command.getProject(),
List<SearchFSCommand> toSkip = commandFacade.findByQuery(queryByProject(command.getProjectId(),
SearchFSCommandOp.DELETE_PROJECT));
toSkip.forEach(c -> removeCommand(c, CommandStatus.SKIPPED));
removeCommand(command, CommandStatus.SUCCESS);
}
} catch (Throwable t) {
LOGGER.log(Level.INFO, "Project:{0} clean failed with error:{1}",
new Object[]{command.getProject().getId(), t.getStackTrace()});
new Object[]{command.getProjectId(), t.getStackTrace()});
failCommand(command, t.getMessage());
}
});
}

private Set<Long> cleanDeletedArtifacts(Set<Project> excludeProjects, Set<Long> excludeDocs, int maxOngoing)
private Set<Long> cleanDeletedArtifacts(Set<Integer> excludeProjects, Set<Long> excludeDocs, int maxOngoing)
throws CommandException {
Set<Long> deleting = new HashSet<>();
List<SearchFSCommand> toDelete = commandFacade
Expand Down Expand Up @@ -276,7 +273,7 @@ private void cleanDeletedArtifact(SearchFSCommand command) {
});
}

private Set<Long> cleanDeleteCascadedArtifacts(Set<Project> excludeProjects, Set<Long> excludeDocs, int maxOngoing) {
private Set<Long> cleanDeleteCascadedArtifacts(Set<Integer> excludeProjects, Set<Long> excludeDocs, int maxOngoing) {
Set<Long> deleting = new HashSet<>();
List<SearchFSCommand> toDelete = commandFacade.findDeleteCascaded(excludeProjects, excludeDocs, maxOngoing);
for(SearchFSCommand command : toDelete) {
Expand All @@ -298,7 +295,7 @@ private void cleanDeleteCascadedArtifact(SearchFSCommand command) {
cleanDeletedArtifact(deleteArtifact);
}

private Set<Long> processArtifacts(Set<Project> excludeProjects, Set<Long> excludeDocs, int maxOngoing) {
private Set<Long> processArtifacts(Set<Integer> excludeProjects, Set<Long> excludeDocs, int maxOngoing) {
Set<Long> processing = new HashSet<>();
List<SearchFSCommand> toProcess = commandFacade.findToProcess(excludeProjects, excludeDocs, maxOngoing);
for(SearchFSCommand command : toProcess) {
Expand Down Expand Up @@ -342,9 +339,9 @@ private QueryParam queryDeletedProjects(CommandStatus status) {
return new QueryParam(null, null, filters, null);
}

private QueryParam queryByProject(Project project, SearchFSCommandOp notOp) {
private QueryParam queryByProject(Integer projectId, SearchFSCommandOp notOp) {
Set<AbstractFacade.FilterBy> filters = new HashSet<>();
filters.add(new CommandFilterBy(CommandFacade.Filters.PROJECT_ID_EQ, project.getId().toString()));
filters.add(new CommandFilterBy(CommandFacade.Filters.PROJECT_ID_EQ, projectId.toString()));
filters.add(new CommandFilterBy(SearchFSCommandFacade.SearchFSFilters.OP_NEQ, notOp.name()));
return new QueryParam(null, null, filters, null);
}
Expand Down Expand Up @@ -420,7 +417,7 @@ private Function<SearchFSCommand, Try<Boolean>> processFunction() {
private Try<Boolean> processCommand(SearchFSCommand c) {
try {
if(c.getOp().equals(SearchFSCommandOp.DELETE_PROJECT)) {
searchController.deleteProject(c.getProject());
searchController.deleteProject(c.getProjectId());
return Try.apply(() -> true);
} else if (c.getOp().equals(SearchFSCommandOp.DELETE_ARTIFACT)) {
searchController.delete(c.getInodeId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import io.hops.hopsworks.persistence.entity.featurestore.featureview.FeatureView;
import io.hops.hopsworks.persistence.entity.featurestore.metadata.FeatureStoreTag;
import io.hops.hopsworks.persistence.entity.featurestore.trainingdataset.TrainingDatasetFeature;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.restutils.RESTCodes;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.index.IndexRequest;
Expand Down Expand Up @@ -86,10 +85,10 @@ public class SearchFSOpenSearchController {
@EJB
private Settings settings;

public long deleteProject(Project project) throws OpenSearchException {
public long deleteProject(Integer projectId) throws OpenSearchException {
DeleteByQueryRequest deleteRequest = new DeleteByQueryRequest(Settings.FEATURESTORE_INDEX);
deleteRequest.setQuery(QueryBuilders.matchQuery(
FeaturestoreXAttrsConstants.PROJECT_ID, project.getId()));
FeaturestoreXAttrsConstants.PROJECT_ID, projectId));
return opensearchClient.deleteByQuery(deleteRequest);
}

Expand Down Expand Up @@ -153,7 +152,7 @@ private SearchDoc updateMetadata(SearchFSCommand c) throws CommandException {

private SearchDoc create(SearchFSCommand c) throws CommandException {
SearchDoc doc = new SearchDoc();
doc.setProjectId(c.getProject().getId());
doc.setProjectId(c.getProjectId());
doc.setProjectName(c.getProject().getName());

String featureStorePath = Utils.getFeaturestorePath(c.getProject(), settings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

public abstract class CommandFacade<C extends Command> extends AbstractFacade<C> {
public static final String STATUS_FIELD = "status";
public static final String PROJECT_FIELD = "project";
public static final String PROJECT_ID_FIELD = "projectId";

@PersistenceContext(unitName = "kthfsPU")
protected EntityManager em;
Expand Down Expand Up @@ -108,7 +108,7 @@ protected void setParam(Query q, AbstractFacade.FilterBy filterBy) throws Comman
try {
if(filterBy.getField().equals(STATUS_FIELD)) {
q.setParameter(filterBy.getField(), CommandStatus.valueOf(filterBy.getParam()));
} else if(filterBy.getField().equals(PROJECT_FIELD)){
} else if(filterBy.getField().equals(PROJECT_ID_FIELD)){
q.setParameter(filterBy.getField(), Integer.parseInt(filterBy.getParam()));
} else {
String msg = "invalid filter:" + filterBy.toString();
Expand All @@ -124,8 +124,7 @@ protected void setParam(Query q, AbstractFacade.FilterBy filterBy) throws Comman
public enum Filters implements CommandFilter {
STATUS_EQ(STATUS_FIELD, "c.status = :status ", "NEW"),
STATUS_NEQ(STATUS_FIELD,"c.status != :status ", "NEW"),
PROJECT_ID_EQ(PROJECT_FIELD, "c.project.id = :project", "0"),
PROJECT_IS_NULL(PROJECT_FIELD, "c.project IS NULL", null);
PROJECT_ID_EQ(PROJECT_ID_FIELD, "c." + PROJECT_ID_FIELD + " = :" + PROJECT_ID_FIELD, "0");

private final String sql;
private final String field;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import io.hops.hopsworks.persistence.entity.commands.CommandStatus;
import io.hops.hopsworks.persistence.entity.commands.search.SearchFSCommand;
import io.hops.hopsworks.persistence.entity.commands.search.SearchFSCommandOp;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.restutils.RESTCodes;

import javax.ejb.Stateless;
Expand Down Expand Up @@ -51,15 +50,15 @@ protected String getTableName() {
return SearchFSCommand.TABLE_NAME;
}

public List<SearchFSCommand> findByQuery(QueryParam queryParam, Set<Project> excludeProjects, Set<Long> excludeDocs)
public List<SearchFSCommand> findByQuery(QueryParam queryParam, Set<Integer> excludeProjects, Set<Long> excludeDocs)
throws CommandException {
if(queryParam == null) {
throw new CommandException(RESTCodes.CommandErrorCode.INVALID_SQL_QUERY, Level.INFO, "query param is null");
}
String queryStrPrefix = "SELECT c FROM " + getTableName() + " c ";
String queryStr = buildQuery(queryStrPrefix, queryParam.getFilters(), queryParam.getSorts(), "");
if(!excludeProjects.isEmpty()) {
queryStr += " AND c." + PROJECT_FIELD + " NOT IN :exclude_" + PROJECT_FIELD;
queryStr += " AND c." + PROJECT_ID_FIELD + " NOT IN :exclude_" + PROJECT_ID_FIELD;
}
if(!excludeDocs.isEmpty()) {
queryStr += " AND c." + DOC_ID_FIELD + " NOT IN :exclude_" + DOC_ID_FIELD;
Expand All @@ -70,15 +69,15 @@ public List<SearchFSCommand> findByQuery(QueryParam queryParam, Set<Project> exc
q.setMaxResults(queryParam.getLimit());
}
if(!excludeProjects.isEmpty()) {
q.setParameter("exclude_" + PROJECT_FIELD, excludeProjects);
q.setParameter("exclude_" + PROJECT_ID_FIELD, excludeProjects);
}
if(!excludeDocs.isEmpty()) {
q.setParameter("exclude_" + DOC_ID_FIELD, excludeDocs);
}
return q.getResultList();
}

public List<SearchFSCommand> findToProcess(Set<Project> excludeProjects, Set<Long> excludeDocs, int limit) {
public List<SearchFSCommand> findToProcess(Set<Integer> excludeProjects, Set<Long> excludeDocs, int limit) {
UnaryOperator<String> filterFoLive = tableName -> {
String filter = "(";
filter += tableName + "." + FEATURE_GROUP_FIELD + " IS NOT NULL OR ";
Expand All @@ -90,7 +89,7 @@ public List<SearchFSCommand> findToProcess(Set<Project> excludeProjects, Set<Lon
return findToProcessInt(excludeProjects, excludeDocs, limit, filterFoLive);
}

public List<SearchFSCommand> findDeleteCascaded(Set<Project> excludeProjects, Set<Long> excludeDocs, int limit) {
public List<SearchFSCommand> findDeleteCascaded(Set<Integer> excludeProjects, Set<Long> excludeDocs, int limit) {
UnaryOperator<String> filterForDeleteCascaded = tableName -> {
String filter = "(";
filter += tableName + "." + FEATURE_GROUP_FIELD + " IS NULL AND ";
Expand All @@ -102,7 +101,7 @@ public List<SearchFSCommand> findDeleteCascaded(Set<Project> excludeProjects, Se
return findToProcessInt(excludeProjects, excludeDocs, limit, filterForDeleteCascaded);
}

private List<SearchFSCommand> findToProcessInt(Set<Project> excludeProjects, Set<Long> excludeDocs, int limit,
private List<SearchFSCommand> findToProcessInt(Set<Integer> excludeProjects, Set<Long> excludeDocs, int limit,
UnaryOperator<String> queryAppendFilter) {
String queryStr = "";
queryStr += "SELECT jc FROM " + getTableName() + " jc WHERE jc.id IN (";
Expand All @@ -112,7 +111,7 @@ private List<SearchFSCommand> findToProcessInt(Set<Project> excludeProjects, Set
queryStr += " WHERE c.status = :status";
queryStr += " AND " + queryAppendFilter.apply("c");
if(!excludeProjects.isEmpty()) {
queryStr += " AND c." + PROJECT_FIELD + " NOT IN :exclude_" + PROJECT_FIELD;
queryStr += " AND c." + PROJECT_ID_FIELD + " NOT IN :exclude_" + PROJECT_ID_FIELD;
}
if(!excludeDocs.isEmpty()) {
queryStr += " AND c." + DOC_ID_FIELD + " NOT IN :exclude_" + DOC_ID_FIELD;
Expand All @@ -124,7 +123,7 @@ private List<SearchFSCommand> findToProcessInt(Set<Project> excludeProjects, Set
TypedQuery<SearchFSCommand> query = em.createQuery(queryStr, entityClass);
query.setParameter("status", CommandStatus.NEW);
if (!excludeProjects.isEmpty()) {
query.setParameter("exclude_" + PROJECT_FIELD, excludeProjects);
query.setParameter("exclude_" + PROJECT_ID_FIELD, excludeProjects);
}
if (!excludeDocs.isEmpty()) {
query.setParameter("exclude_" + DOC_ID_FIELD, excludeDocs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public abstract class Command {
@JoinColumn(name = "project_id", referencedColumnName = "id")
@ManyToOne(optional = false)
private Project project;
@Column(name = "project_id", insertable = false, updatable = false)
private Integer projectId;
@Basic(optional = false)
@NotNull
@Size(min = 1, max = 20)
Expand All @@ -67,6 +69,10 @@ public Project getProject() {
return project;
}

public Integer getProjectId() {
return projectId;
}

public CommandStatus getStatus() {
return status;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public SearchFSCommandHistory() {

public SearchFSCommandHistory(SearchFSCommand command) {
this.id = command.getId();
this.projectId = command.getProject() != null ? command.getProject().getId() : null;
this.projectId = command.getProjectId();
this.status = command.getStatus();
this.errorMsg = command.getErrorMsg();
this.featureGroupId = command.getFeatureGroup() != null ? command.getFeatureGroup().getId() : null;
Expand Down

0 comments on commit f4cf4df

Please sign in to comment.