Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FSTORE-743] Online support for external feature groups #1323

Merged
merged 2 commits into from
Apr 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
280 changes: 279 additions & 1 deletion hopsworks-IT/src/test/ruby/spec/featuregroup_spec.rb

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,8 @@ def backfill_stream_featuregroup(featurestore_id, featuregroup_id, featuregroup_
end

def create_on_demand_featuregroup(project_id, featurestore_id, jdbcconnectorId, name: nil, version: 1, query: nil,
features: nil, data_format: nil, options: nil, event_time: nil)
features: nil, data_format: nil, options: nil, event_time: nil,
online_enabled: false)
type = "onDemandFeaturegroupDTO"
featuregroupType = "ON_DEMAND_FEATURE_GROUP"
create_featuregroup_endpoint = "#{ENV['HOPSWORKS_API']}/project/#{project_id}/featurestores/#{featurestore_id}/featuregroups"
Expand All @@ -224,7 +225,8 @@ def create_on_demand_featuregroup(project_id, featurestore_id, jdbcconnectorId,
},
query: query,
featuregroupType: featuregroupType,
eventTime: event_time
eventTime: event_time,
onlineEnabled: online_enabled
}

unless data_format == nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,9 @@
import io.hops.hopsworks.jwt.annotation.JWTRequired;
import io.hops.hopsworks.persistence.entity.featurestore.Featurestore;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.Featuregroup;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.FeaturegroupType;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.persistence.entity.user.Users;
import io.hops.hopsworks.persistence.entity.user.security.apiKey.ApiScope;
import io.hops.hopsworks.restutils.RESTCodes;
import io.swagger.annotations.ApiOperation;

import javax.ejb.EJB;
Expand All @@ -49,7 +47,6 @@
import javax.ws.rs.core.Response;
import javax.ws.rs.core.SecurityContext;
import javax.ws.rs.core.UriInfo;
import java.util.logging.Level;

@RequestScoped
@TransactionAttribute(TransactionAttributeType.NEVER)
Expand Down Expand Up @@ -102,19 +99,11 @@ public Response getPreview(@BeanParam FeatureGroupPreviewBeanParam featureGroupP
"Row limit should greater than 0 and lower than: " + settings.getFGPreviewLimit());
}

// validate feature group type (we can only return data preview for cached feature groups)
if (featuregroup.getFeaturegroupType() == FeaturegroupType.ON_DEMAND_FEATURE_GROUP) {
throw new FeaturestoreException(
RESTCodes.FeaturestoreErrorCode.PREVIEW_NOT_SUPPORTED_FOR_ON_DEMAND_FEATUREGROUPS,
Level.FINE, "featuregroupId: " + featuregroup.getId());
}

// set online flag. if the user doesn't provide the storage flag and the feature group
// is available online, return the data from the online feature store as it's faster.
boolean online;
if (featureGroupPreviewBeanParam.getStorage() == null) {
online = (featuregroup.getStreamFeatureGroup() != null && featuregroup.getStreamFeatureGroup().isOnlineEnabled())
|| (featuregroup.getCachedFeaturegroup() != null && featuregroup.getCachedFeaturegroup().isOnlineEnabled());
online = featuregroup.isOnlineEnabled();
} else {
online = featureGroupPreviewBeanParam.getStorage().equals(FeatureGroupStorage.ONLINE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,24 +453,26 @@ public Response updateFeaturegroup(@Context SecurityContext sc,
FeaturegroupDTO updatedFeaturegroupDTO = null;
if(updateMetadata) {
updatedFeaturegroupDTO = featuregroupController.updateFeaturegroupMetadata(project, user, featurestore,
featuregroupDTO);
featuregroup, featuregroupDTO);
}
if(enableOnline && featuregroup.getFeaturegroupType() == FeaturegroupType.CACHED_FEATURE_GROUP &&
!(featuregroup.getCachedFeaturegroup().isOnlineEnabled())) {
if(enableOnline && !featuregroup.isOnlineEnabled() &&
(featuregroup.getFeaturegroupType() == FeaturegroupType.CACHED_FEATURE_GROUP ||
featuregroup.getFeaturegroupType() == FeaturegroupType.ON_DEMAND_FEATURE_GROUP)) {
updatedFeaturegroupDTO =
featuregroupController.enableFeaturegroupOnline(featurestore, featuregroupDTO, project, user);
featuregroupController.enableFeaturegroupOnline(featuregroup, project, user);
}
if(disableOnline && featuregroup.getFeaturegroupType() == FeaturegroupType.CACHED_FEATURE_GROUP &&
featuregroup.getCachedFeaturegroup().isOnlineEnabled()){
if(disableOnline && featuregroup.isOnlineEnabled() &&
(featuregroup.getFeaturegroupType() == FeaturegroupType.CACHED_FEATURE_GROUP ||
featuregroup.getFeaturegroupType() == FeaturegroupType.ON_DEMAND_FEATURE_GROUP)){
updatedFeaturegroupDTO = featuregroupController.disableFeaturegroupOnline(featuregroup, project, user);
}
if (enableOnline && featuregroup.getFeaturegroupType() == FeaturegroupType.STREAM_FEATURE_GROUP &&
!featuregroup.getStreamFeatureGroup().isOnlineEnabled()) {
!featuregroup.isOnlineEnabled()) {
throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.STREAM_FEATURE_GROUP_ONLINE_DISABLE_ENABLE,
Level.FINE, "Please create a new version of the feature group to enable online storage.");
}
if (disableOnline && featuregroup.getFeaturegroupType() == FeaturegroupType.STREAM_FEATURE_GROUP &&
featuregroup.getStreamFeatureGroup().isOnlineEnabled()) {
featuregroup.isOnlineEnabled()) {
throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.STREAM_FEATURE_GROUP_ONLINE_DISABLE_ENABLE,
Level.FINE, "Please create a new version of the feature group to disable online storage.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@
package io.hops.hopsworks.api.featurestore.featuregroup;

import io.hops.hopsworks.common.api.ResourceRequest;
import io.hops.hopsworks.common.featurestore.featuregroup.cached.CachedFeaturegroupController;
import io.hops.hopsworks.common.featurestore.featuregroup.FeaturegroupController;
import io.hops.hopsworks.common.featurestore.featuregroup.cached.FeatureGroupStorage;
import io.hops.hopsworks.common.featurestore.featuregroup.cached.FeaturegroupPreview;
import io.hops.hopsworks.common.featurestore.featuregroup.stream.StreamFeatureGroupController;
import io.hops.hopsworks.exceptions.FeaturestoreException;
import io.hops.hopsworks.exceptions.HopsSecurityException;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.Featuregroup;
Expand All @@ -44,10 +43,7 @@
public class PreviewBuilder {

@EJB
private CachedFeaturegroupController cachedFeaturegroupController;

@EJB
private StreamFeatureGroupController streamFeaturegroupController;
private FeaturegroupController featuregroupController;

private URI uri(UriInfo uriInfo, Project project, Featuregroup featuregroup) {
return uriInfo.getBaseUriBuilder().path(ResourceRequest.Name.PROJECT.toString().toLowerCase())
Expand All @@ -64,15 +60,9 @@ public PreviewDTO build(UriInfo uriInfo, Users user, Project project, Featuregro
String partition, boolean online, int limit)
throws FeaturestoreException, HopsSecurityException {

FeaturegroupPreview preview = null;
FeaturegroupPreview preview;
try {
if (featuregroup.getStreamFeatureGroup() != null) {
preview = streamFeaturegroupController
.getFeaturegroupPreview(featuregroup, project, user, partition, online, limit);
} else {
preview = cachedFeaturegroupController
.getFeaturegroupPreview(featuregroup, project, user, partition, online, limit);
}
preview = featuregroupController.getFeaturegroupPreview(featuregroup, project, user, partition, online, limit);
} catch (SQLException e) {
throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.COULD_NOT_PREVIEW_FEATUREGROUP,
Level.SEVERE, "Feature Group id: " + featuregroup.getId(), e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,7 @@ private List<ServingPreparedStatementDTO> createServingPreparedStatementDTOS(
for (TrainingDatasetJoin join : joins) {
Featuregroup featuregroup = join.getFeatureGroup();

if ((featuregroup.getStreamFeatureGroup() != null && !featuregroup.getStreamFeatureGroup().isOnlineEnabled())
|| (featuregroup.getCachedFeaturegroup() != null && !featuregroup.getCachedFeaturegroup().isOnlineEnabled())){
if (!featuregroup.isOnlineEnabled()){
throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.FEATURESTORE_ONLINE_NOT_ENABLED,
Level.FINE, "Inference vector is only available for training datasets generated by online enabled " +
"feature groups. Feature group `" + featuregroup.getName() + "` is not online enabled.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.logging.Level;
Expand Down Expand Up @@ -135,11 +136,7 @@ public void verifySchemaProvided(FeaturegroupDTO featuregroupDTO) throws Feature
* @throws FeaturestoreException
*/
public void verifyOnlineOfflineTypeMatch(FeaturegroupDTO featuregroupDTO) throws FeaturestoreException{
if ((featuregroupDTO instanceof CachedFeaturegroupDTO
&& ((CachedFeaturegroupDTO) featuregroupDTO).getOnlineEnabled()) ||
(featuregroupDTO instanceof StreamFeatureGroupDTO
&& ((StreamFeatureGroupDTO) featuregroupDTO).getOnlineEnabled())) {

if (featuregroupDTO.getOnlineEnabled()) {
for (FeatureGroupFeatureDTO feature : featuregroupDTO.getFeatures()) {
String offlineType = feature.getType().toLowerCase().replace(" ", "");
String onlineType =
Expand Down Expand Up @@ -190,11 +187,7 @@ public void verifyOnlineOfflineTypeMatch(FeaturegroupDTO featuregroupDTO) throws
* @throws FeaturestoreException
*/
public void verifyOnlineSchemaValid(FeaturegroupDTO featuregroupDTO) throws FeaturestoreException{
if ((featuregroupDTO instanceof CachedFeaturegroupDTO
&& ((CachedFeaturegroupDTO) featuregroupDTO).getOnlineEnabled()) ||
(featuregroupDTO instanceof StreamFeatureGroupDTO
&& ((StreamFeatureGroupDTO) featuregroupDTO).getOnlineEnabled())) {

if (featuregroupDTO.getOnlineEnabled()) {
if (featuregroupDTO.getFeatures().size() > FeaturestoreConstants.MAX_MYSQL_COLUMNS) {
throw new FeaturestoreException(
COULD_NOT_CREATE_ONLINE_FEATUREGROUP,
Expand Down Expand Up @@ -229,10 +222,7 @@ public void verifyOnlineSchemaValid(FeaturegroupDTO featuregroupDTO) throws Feat
* @throws FeaturestoreException
*/
public void verifyPrimaryKeySupported(FeaturegroupDTO featuregroupDTO) throws FeaturestoreException{
if ((featuregroupDTO instanceof CachedFeaturegroupDTO
&& ((CachedFeaturegroupDTO) featuregroupDTO).getOnlineEnabled()) ||
(featuregroupDTO instanceof StreamFeatureGroupDTO
&& ((StreamFeatureGroupDTO) featuregroupDTO).getOnlineEnabled())) {
if (featuregroupDTO.getOnlineEnabled()) {
Integer totalBytes = 0;
for (FeatureGroupFeatureDTO feature : featuregroupDTO.getFeatures()) {
if (feature.getPrimary()) {
Expand Down Expand Up @@ -338,4 +328,28 @@ public void verifyPartitionKeySupported(FeaturegroupDTO featuregroupDTO) throws
}
}
}

public List<FeatureGroupFeatureDTO> verifyAndGetNewFeatures(List<FeatureGroupFeatureDTO> previousSchema,
List<FeatureGroupFeatureDTO> newSchema)
throws FeaturestoreException {
List<FeatureGroupFeatureDTO> newFeatures = new ArrayList<>();
for (FeatureGroupFeatureDTO newFeature : newSchema) {
boolean isNew =
!previousSchema.stream().anyMatch(previousFeature -> previousFeature.getName().equals(newFeature.getName()));
if (isNew) {
newFeatures.add(newFeature);
if (newFeature.getPrimary() || newFeature.getPartition()) {
throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_FEATUREGROUP_UPDATE, Level.FINE,
"Appended feature `" + newFeature.getName() + "` is specified as primary or partition key. Primary key and "
+ "partition key cannot be changed when appending features.");
}
if (newFeature.getType() == null) {
throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_FEATUREGROUP_UPDATE, Level.FINE,
"Appended feature `" + newFeature.getName() + "` is missing type information. Type information is " +
"mandatory when appending features to a feature group.");
}
}
}
return newFeatures;
}
}
Loading