From e5b75f5a9a304d80ced4bd4ccb222fce411e1b02 Mon Sep 17 00:00:00 2001 From: Moritz Meister <8422705+moritzmeister@users.noreply.github.com> Date: Tue, 4 Apr 2023 20:16:22 +0200 Subject: [PATCH] [FSTORE-743] Add online support for External Feature Groups (#1312) --- .../src/test/ruby/spec/featuregroup_spec.rb | 278 ++++++++++++++++++ .../ruby/spec/helpers/featurestore_helper.rb | 6 +- .../FeatureGroupPreviewResource.java | 13 +- .../featuregroup/FeaturegroupService.java | 18 +- .../featuregroup/PreviewBuilder.java | 18 +- .../PreparedStatementBuilder.java | 3 +- .../FeatureGroupInputValidation.java | 42 ++- .../featuregroup/FeaturegroupController.java | 155 ++++++---- .../featuregroup/FeaturegroupDTO.java | 12 + .../cached/CachedFeaturegroupController.java | 115 ++------ .../cached/CachedFeaturegroupDTO.java | 21 +- .../OnDemandFeaturegroupController.java | 99 ++++++- .../ondemand/OnDemandFeaturegroupDTO.java | 13 +- .../online/OnlineFeaturegroupController.java | 9 +- .../stream/StreamFeatureGroupController.java | 42 +-- .../stream/StreamFeatureGroupDTO.java | 28 +- .../query/ConstructorController.java | 21 +- .../common/security/QuotasEnforcement.java | 19 +- .../TestFeatureGroupInputValidation.java | 58 ---- .../TestCachedFeatureGroupController.java | 33 --- .../TestFeatureGroupInputValidation.java | 49 +++ .../security/TestQuotasEnforcement.java | 74 ++--- .../featuregroup/Featuregroup.java | 12 + .../cached/CachedFeaturegroup.java | 10 - .../ondemand/OnDemandFeature.java | 15 +- .../stream/StreamFeatureGroup.java | 10 - 26 files changed, 690 insertions(+), 483 deletions(-) delete mode 100644 hopsworks-common/src/test/io/hops/hopsworks/common/featurestore/featuregroup/TestFeatureGroupInputValidation.java diff --git a/hopsworks-IT/src/test/ruby/spec/featuregroup_spec.rb b/hopsworks-IT/src/test/ruby/spec/featuregroup_spec.rb index 2969ba7b0c..91e9a4895b 100644 --- a/hopsworks-IT/src/test/ruby/spec/featuregroup_spec.rb +++ b/hopsworks-IT/src/test/ruby/spec/featuregroup_spec.rb @@ -1928,6 +1928,284 @@ expect(parsed_json["errorCode"]).to eql(270114) end + it "should be able to create an online enabled on-demand/external feature group" do + project = get_project + featurestore_id = get_featurestore_id(project.id) + connector_id = get_jdbc_connector_id + json_result, featuregroup_name = create_on_demand_featuregroup(project.id, featurestore_id, connector_id, + online_enabled: true) + parsed_json = JSON.parse(json_result) + expect_status_details(201) + expect(parsed_json["onlineEnabled"]).to be true + + topic_name = project.id.to_s + "_" + parsed_json["id"].to_s + "_" + featuregroup_name + "_" + + parsed_json["version"].to_s + "_onlinefs" + get_project_topics(project.id) + expect_status_details(200) + topic = json_body[:items].select{|topic| topic[:name] == topic_name} + expect(topic.length).to eq(1) + get_subject_schema(project, topic[0][:name], 1) + expect_status_details(200) + end + + it "should be able to enable an on-demand/external feature group online after it was created offline only" do + project = get_project + featurestore_id = get_featurestore_id(project.id) + connector_id = get_jdbc_connector_id + json_result, featuregroup_name = create_on_demand_featuregroup(project.id, featurestore_id, connector_id, + online_enabled: false) + parsed_json = JSON.parse(json_result) + expect_status_details(201) + expect(parsed_json["onlineEnabled"]).to be false + + parsed_json["onlineEnabled"] = true + + update_featuregroup_metadata_endpoint = "#{ENV['HOPSWORKS_API']}/project/" + project.id.to_s + + "/featurestores/" + featurestore_id.to_s + "/featuregroups/" + parsed_json["id"].to_s + + "?enableOnline=true" + + json_data = parsed_json.to_json + json_result = put update_featuregroup_metadata_endpoint, json_data + parsed_json = JSON.parse(json_result) + expect_status_details(200) + expect(parsed_json["onlineEnabled"]).to be true + + topic_name = project.id.to_s + "_" + parsed_json["id"].to_s + "_" + featuregroup_name + "_" + + parsed_json["version"].to_s + "_onlinefs" + get_project_topics(project.id) + expect_status_details(200) + topic = json_body[:items].select{|topic| topic[:name] == topic_name} + expect(topic.length).to eq(1) + get_subject_schema(project, topic[0][:name], 1) + expect_status_details(200) + end + + it "should be able to disable an online on-demand/external feature group after it was created online" do + project = get_project + featurestore_id = get_featurestore_id(project.id) + connector_id = get_jdbc_connector_id + json_result, featuregroup_name = create_on_demand_featuregroup(project.id, featurestore_id, connector_id, + online_enabled: true) + parsed_json = JSON.parse(json_result) + expect_status_details(201) + expect(parsed_json["onlineEnabled"]).to be true + + parsed_json["onlineEnabled"] = false + + update_featuregroup_metadata_endpoint = "#{ENV['HOPSWORKS_API']}/project/" + project.id.to_s + + "/featurestores/" + featurestore_id.to_s + "/featuregroups/" + parsed_json["id"].to_s + + "?disableOnline=true" + + json_data = parsed_json.to_json + json_result = put update_featuregroup_metadata_endpoint, json_data + parsed_json = JSON.parse(json_result) + expect_status_details(200) + expect(parsed_json["onlineEnabled"]).to be false + + # topic should still be there as we currently don't delete it + topic_name = project.id.to_s + "_" + parsed_json["id"].to_s + "_" + featuregroup_name + "_" + + parsed_json["version"].to_s + "_onlinefs" + get_project_topics(project.id) + expect_status_details(200) + topic = json_body[:items].select{|topic| topic[:name] == topic_name} + expect(topic.length).to eq(1) + get_subject_schema(project, topic[0][:name], 1) + expect_status_details(200) + end + + it "should be possible to preview from online storage of an on-demand/external feature group" do + project = create_project(validate_session: false) + featurestore_id = get_featurestore_id(project.id) + connector_id = get_jdbc_connector_id + json_result, _ = create_on_demand_featuregroup(project.id, featurestore_id, connector_id, + name: "online_fg", online_enabled: true) + parsed_json = JSON.parse(json_result) + expect_status_details(201) + expect(parsed_json["onlineEnabled"]).to be true + featuregroup_id = parsed_json["id"] + + # add sample ros + OnlineFg.db_name = project[:projectname] + OnlineFg.create(testfeature: 1).save + OnlineFg.create(testfeature: 2).save + + get "#{ENV['HOPSWORKS_API']}/project/" + project.id.to_s + "/featurestores/" + featurestore_id.to_s + "/featuregroups/" + featuregroup_id.to_s + "/preview?storage=online&limit=1" + expect_status_details(200) + parsed_json = JSON.parse(response.body) + expect(parsed_json['items'].length).to eql 1 + + # should fetch the online feature data if the fg is online enabled and storage not specified + get "#{ENV['HOPSWORKS_API']}/project/#{project.id}/featurestores/#{featurestore_id}/featuregroups/#{featuregroup_id}/preview?&limit=1" + expect_status_details(200) + parsed_json = JSON.parse(response.body) + expect(parsed_json['items'].length).to eql 1 + expect(parsed_json['items'][0]['storage']).to eql "ONLINE" + end + + it "should not be possible to preview from offline storage for an on-demand/external feature group" do + project = get_project + featurestore_id = get_featurestore_id(project.id) + connector_id = get_jdbc_connector_id + json_result, _ = create_on_demand_featuregroup(project.id, featurestore_id, connector_id, + online_enabled: false) + parsed_json = JSON.parse(json_result) + expect_status_details(201) + expect(parsed_json["onlineEnabled"]).to be false + featuregroup_id = parsed_json["id"] + + get "#{ENV['HOPSWORKS_API']}/project/" + project.id.to_s + "/featurestores/" + featurestore_id.to_s + + "/featuregroups/" + featuregroup_id.to_s + "/preview?storage=online&limit=1" + expect_status_details(400) + get "#{ENV['HOPSWORKS_API']}/project/" + project.id.to_s + "/featurestores/" + featurestore_id.to_s + + "/featuregroups/" + featuregroup_id.to_s + "/preview?storage=offline&limit=1" + expect_status_details(400) + end + + it "should be possible to generate a online query with online on-demand/external feature groups" do + featurestore_id = get_featurestore_id(@project[:id]) + featurestore_name = get_featurestore_name(@project.id) + connector_id = get_jdbc_connector_id + features = [{type: "INT", name: "testfeature", description: "testfeaturedescription", primary: true}, + {type: "TIMESTAMP", name: "event_time"}] + json_result, fg_name_on_demand = create_on_demand_featuregroup(@project[:id], featurestore_id, connector_id, + features: features, event_time: "event_time", online_enabled: true) + expect_status_details(201) + parsed_json = JSON.parse(json_result) + fg_ond_id = parsed_json["id"] + fg_ond_type = parsed_json["type"] + + features = [{type: "INT", name: "testfeature", description: "testfeaturedescription", primary: true}, + {type: "INT", name: "anotherfeature", primary: false}, + {type: "TIMESTAMP", name: "event_time"}] + json_result, fg_name = create_cached_featuregroup(@project[:id], featurestore_id, features: features, + event_time: "event_time", online: true ) + parsed_json = JSON.parse(json_result) + fg_cached_id = parsed_json["id"] + fg_cached_type = parsed_json["type"] + + query = { + leftFeatureGroup: {id: fg_cached_id, type: fg_cached_type, eventTime: "event_time"}, + leftFeatures: [{name: 'anotherfeature'}, {name: "event_time"}], + joins: [{query: { + leftFeatureGroup: {id: fg_ond_id, type: fg_ond_type, eventTime: "event_time"}, + leftFeatures: [{name: 'testfeature'}] + }}]} + json_result = put "#{ENV['HOPSWORKS_API']}/project/#{@project[:id]}/featurestores/query", query + expect_status_details(200) + query = JSON.parse(json_result) + expect(query.key?("onDemandFeatureGroups")).to be true + + expect(query['query']).to eql("SELECT `fg1`.`anotherfeature` `anotherfeature`, `fg1`.`event_time` `event_time`, `fg0`.`testfeature` `testfeature`\n" + + "FROM `#{featurestore_name}`.`#{fg_name}_1` `fg1`\n" + + "INNER JOIN `fg0` ON `fg1`.`testfeature` = `fg0`.`testfeature`") + + expect(query['pitQuery']).to eql("WITH right_fg0 AS " + + "(SELECT *\nFROM " + + "(SELECT `fg1`.`anotherfeature` `anotherfeature`, `fg1`.`event_time` `event_time`, `fg1`.`testfeature` `join_pk_testfeature`, `fg1`.`event_time` `join_evt_event_time`, `fg0`.`testfeature` `testfeature`, RANK() OVER (PARTITION BY `fg0`.`testfeature`, `fg1`.`event_time` ORDER BY `fg0`.`event_time` DESC) pit_rank_hopsworks\n" + + "FROM `#{featurestore_name}`.`#{fg_name}_1` `fg1`\n" + + "INNER JOIN `fg0` ON `fg1`.`testfeature` = `fg0`.`testfeature` AND `fg1`.`event_time` >= `fg0`.`event_time`) NA\n" + + "WHERE `pit_rank_hopsworks` = 1) (SELECT `right_fg0`.`anotherfeature` `anotherfeature`, `right_fg0`.`event_time` `event_time`, `right_fg0`.`testfeature` `testfeature`\nFROM right_fg0)") + expect(query["queryOnline"]).to eql("SELECT `fg1`.`anotherfeature` `anotherfeature`, `fg1`.`event_time` `event_time`, `fg0`.`testfeature` `testfeature`\nFROM `#{@project['projectname']}`.`#{fg_name}_1` `fg1`\nINNER JOIN `#{@project['projectname']}`.`#{fg_name_on_demand}_1` `fg0` ON `fg1`.`testfeature` = `fg0`.`testfeature`") + end + + it "should be possible to overwrite and clear an online on-demand/external feature group" do + project = get_project + featurestore_id = get_featurestore_id(project.id) + connector_id = get_jdbc_connector_id + json_result, _ = create_on_demand_featuregroup(project.id, featurestore_id, connector_id, + online_enabled: true) + parsed_json = JSON.parse(json_result) + expect_status_details(201) + expect(parsed_json["onlineEnabled"]).to be true + featuregroup_id = parsed_json["id"] + clear_featuregroup_contents_endpoint = "#{ENV['HOPSWORKS_API']}/project/" + project.id.to_s + "/featurestores/" + featurestore_id.to_s + "/featuregroups/" + featuregroup_id.to_s + "/clear" + post clear_featuregroup_contents_endpoint + expect_status_details(200) + end + + it "should be possible to append features to an online on-demand/external feature group" do + project = get_project + featurestore_id = get_featurestore_id(project.id) + connector_id = get_jdbc_connector_id + json_result, _ = create_on_demand_featuregroup(project.id, featurestore_id, connector_id, + online_enabled: true) + parsed_json = JSON.parse(json_result) + expect_status_details(201) + expect(parsed_json["onlineEnabled"]).to be true + parsed_json["description"] = "changed description" + parsed_json["features"] = [ + { + type: "INT", + name: "testfeature", + description: "testfeaturedescription", + primary: true, + onlineType: "INT", + partition: false + }, + { + type: "DOUBLE", + name: "testfeature2", + description: "testfeaturedescription", + primary: false, + onlineType: "DOUBLE", + partition: false, + defaultValue: nil + }, + ] + update_featuregroup_metadata_endpoint = "#{ENV['HOPSWORKS_API']}/project/" + project.id.to_s + + "/featurestores/" + featurestore_id.to_s + "/featuregroups/" + parsed_json["id"].to_s + + "?updateMetadata=true" + + json_data = parsed_json.to_json + json_result = put update_featuregroup_metadata_endpoint, json_data + parsed_json = JSON.parse(json_result) + expect_status_details(200) + expect(parsed_json["features"].length).to be 2 + expect(parsed_json["description"]).to eql("changed description") + expect(parsed_json["features"].select{ |f| f["name"] == "testfeature"}.first["defaultValue"]).to be nil + expect(parsed_json["features"].select{ |f| f["name"] == "testfeature2"}.first["defaultValue"]).to be nil + expect(parsed_json["onlineEnabled"]).to be true + end + + it "should not be possible to append features to an online on-demand/external feature group with default value" do + project = get_project + featurestore_id = get_featurestore_id(project.id) + connector_id = get_jdbc_connector_id + json_result, _ = create_on_demand_featuregroup(project.id, featurestore_id, connector_id, + online_enabled: true) + parsed_json = JSON.parse(json_result) + expect_status_details(201) + expect(parsed_json["onlineEnabled"]).to be true + parsed_json["description"] = "changed description" + parsed_json["features"] = [ + { + type: "INT", + name: "testfeature", + description: "testfeaturedescription", + primary: true, + onlineType: "INT", + partition: false + }, + { + type: "DOUBLE", + name: "testfeature2", + description: "testfeaturedescription", + primary: false, + onlineType: "DOUBLE", + partition: false, + defaultValue: "10.0" + }, + ] + update_featuregroup_metadata_endpoint = "#{ENV['HOPSWORKS_API']}/project/" + project.id.to_s + + "/featurestores/" + featurestore_id.to_s + "/featuregroups/" + parsed_json["id"].to_s + + "?updateMetadata=true" + + json_data = parsed_json.to_json + json_result = put update_featuregroup_metadata_endpoint, json_data + parsed_json = JSON.parse(json_result) + expect_status_details(200) + end + it "should be able to generate a query with only on-demand feature group" do featurestore_id = get_featurestore_id(@project[:id]) connector_id = get_jdbc_connector_id diff --git a/hopsworks-IT/src/test/ruby/spec/helpers/featurestore_helper.rb b/hopsworks-IT/src/test/ruby/spec/helpers/featurestore_helper.rb index 0821d27cd8..d40f904af4 100644 --- a/hopsworks-IT/src/test/ruby/spec/helpers/featurestore_helper.rb +++ b/hopsworks-IT/src/test/ruby/spec/helpers/featurestore_helper.rb @@ -203,7 +203,8 @@ def backfill_stream_featuregroup(featurestore_id, featuregroup_id, featuregroup_ end def create_on_demand_featuregroup(project_id, featurestore_id, jdbcconnectorId, name: nil, version: 1, query: nil, - features: nil, data_format: nil, options: nil, event_time: nil) + features: nil, data_format: nil, options: nil, event_time: nil, + online_enabled: false) type = "onDemandFeaturegroupDTO" featuregroupType = "ON_DEMAND_FEATURE_GROUP" create_featuregroup_endpoint = "#{ENV['HOPSWORKS_API']}/project/#{project_id}/featurestores/#{featurestore_id}/featuregroups" @@ -224,7 +225,8 @@ def create_on_demand_featuregroup(project_id, featurestore_id, jdbcconnectorId, }, query: query, featuregroupType: featuregroupType, - eventTime: event_time + eventTime: event_time, + onlineEnabled: online_enabled } unless data_format == nil diff --git a/hopsworks-api/src/main/java/io/hops/hopsworks/api/featurestore/featuregroup/FeatureGroupPreviewResource.java b/hopsworks-api/src/main/java/io/hops/hopsworks/api/featurestore/featuregroup/FeatureGroupPreviewResource.java index fa3c8b2ce8..20587eb7f6 100644 --- a/hopsworks-api/src/main/java/io/hops/hopsworks/api/featurestore/featuregroup/FeatureGroupPreviewResource.java +++ b/hopsworks-api/src/main/java/io/hops/hopsworks/api/featurestore/featuregroup/FeatureGroupPreviewResource.java @@ -31,11 +31,9 @@ import io.hops.hopsworks.jwt.annotation.JWTRequired; import io.hops.hopsworks.persistence.entity.featurestore.Featurestore; import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.Featuregroup; -import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.FeaturegroupType; import io.hops.hopsworks.persistence.entity.project.Project; import io.hops.hopsworks.persistence.entity.user.Users; import io.hops.hopsworks.persistence.entity.user.security.apiKey.ApiScope; -import io.hops.hopsworks.restutils.RESTCodes; import io.swagger.annotations.ApiOperation; import javax.ejb.EJB; @@ -51,7 +49,6 @@ import javax.ws.rs.core.Response; import javax.ws.rs.core.SecurityContext; import javax.ws.rs.core.UriInfo; -import java.util.logging.Level; @Logged @RequestScoped @@ -108,19 +105,11 @@ public Response getPreview(@BeanParam FeatureGroupPreviewBeanParam featureGroupP "Row limit should greater than 0 and lower than: " + settings.getFGPreviewLimit()); } - // validate feature group type (we can only return data preview for cached feature groups) - if (featuregroup.getFeaturegroupType() == FeaturegroupType.ON_DEMAND_FEATURE_GROUP) { - throw new FeaturestoreException( - RESTCodes.FeaturestoreErrorCode.PREVIEW_NOT_SUPPORTED_FOR_ON_DEMAND_FEATUREGROUPS, - Level.FINE, "featuregroupId: " + featuregroup.getId()); - } - // set online flag. if the user doesn't provide the storage flag and the feature group // is available online, return the data from the online feature store as it's faster. boolean online; if (featureGroupPreviewBeanParam.getStorage() == null) { - online = (featuregroup.getStreamFeatureGroup() != null && featuregroup.getStreamFeatureGroup().isOnlineEnabled()) - || (featuregroup.getCachedFeaturegroup() != null && featuregroup.getCachedFeaturegroup().isOnlineEnabled()); + online = featuregroup.isOnlineEnabled(); } else { online = featureGroupPreviewBeanParam.getStorage().equals(FeatureGroupStorage.ONLINE); } diff --git a/hopsworks-api/src/main/java/io/hops/hopsworks/api/featurestore/featuregroup/FeaturegroupService.java b/hopsworks-api/src/main/java/io/hops/hopsworks/api/featurestore/featuregroup/FeaturegroupService.java index 4b7644be8f..06b1bdd3b3 100644 --- a/hopsworks-api/src/main/java/io/hops/hopsworks/api/featurestore/featuregroup/FeaturegroupService.java +++ b/hopsworks-api/src/main/java/io/hops/hopsworks/api/featurestore/featuregroup/FeaturegroupService.java @@ -462,24 +462,26 @@ public Response updateFeaturegroup(@Context SecurityContext sc, FeaturegroupDTO updatedFeaturegroupDTO = null; if(updateMetadata) { updatedFeaturegroupDTO = featuregroupController.updateFeaturegroupMetadata(project, user, featurestore, - featuregroupDTO); + featuregroup, featuregroupDTO); } - if(enableOnline && featuregroup.getFeaturegroupType() == FeaturegroupType.CACHED_FEATURE_GROUP && - !(featuregroup.getCachedFeaturegroup().isOnlineEnabled())) { + if(enableOnline && !featuregroup.isOnlineEnabled() && + (featuregroup.getFeaturegroupType() == FeaturegroupType.CACHED_FEATURE_GROUP || + featuregroup.getFeaturegroupType() == FeaturegroupType.ON_DEMAND_FEATURE_GROUP)) { updatedFeaturegroupDTO = - featuregroupController.enableFeaturegroupOnline(featurestore, featuregroupDTO, project, user); + featuregroupController.enableFeaturegroupOnline(featuregroup, project, user); } - if(disableOnline && featuregroup.getFeaturegroupType() == FeaturegroupType.CACHED_FEATURE_GROUP && - featuregroup.getCachedFeaturegroup().isOnlineEnabled()){ + if(disableOnline && featuregroup.isOnlineEnabled() && + (featuregroup.getFeaturegroupType() == FeaturegroupType.CACHED_FEATURE_GROUP || + featuregroup.getFeaturegroupType() == FeaturegroupType.ON_DEMAND_FEATURE_GROUP)){ updatedFeaturegroupDTO = featuregroupController.disableFeaturegroupOnline(featuregroup, project, user); } if (enableOnline && featuregroup.getFeaturegroupType() == FeaturegroupType.STREAM_FEATURE_GROUP && - !featuregroup.getStreamFeatureGroup().isOnlineEnabled()) { + !featuregroup.isOnlineEnabled()) { throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.STREAM_FEATURE_GROUP_ONLINE_DISABLE_ENABLE, Level.FINE, "Please create a new version of the feature group to enable online storage."); } if (disableOnline && featuregroup.getFeaturegroupType() == FeaturegroupType.STREAM_FEATURE_GROUP && - featuregroup.getStreamFeatureGroup().isOnlineEnabled()) { + featuregroup.isOnlineEnabled()) { throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.STREAM_FEATURE_GROUP_ONLINE_DISABLE_ENABLE, Level.FINE, "Please create a new version of the feature group to disable online storage."); } diff --git a/hopsworks-api/src/main/java/io/hops/hopsworks/api/featurestore/featuregroup/PreviewBuilder.java b/hopsworks-api/src/main/java/io/hops/hopsworks/api/featurestore/featuregroup/PreviewBuilder.java index 1bfd8d4d6f..fd56c4cdac 100644 --- a/hopsworks-api/src/main/java/io/hops/hopsworks/api/featurestore/featuregroup/PreviewBuilder.java +++ b/hopsworks-api/src/main/java/io/hops/hopsworks/api/featurestore/featuregroup/PreviewBuilder.java @@ -17,10 +17,9 @@ package io.hops.hopsworks.api.featurestore.featuregroup; import io.hops.hopsworks.common.api.ResourceRequest; -import io.hops.hopsworks.common.featurestore.featuregroup.cached.CachedFeaturegroupController; +import io.hops.hopsworks.common.featurestore.featuregroup.FeaturegroupController; import io.hops.hopsworks.common.featurestore.featuregroup.cached.FeatureGroupStorage; import io.hops.hopsworks.common.featurestore.featuregroup.cached.FeaturegroupPreview; -import io.hops.hopsworks.common.featurestore.featuregroup.stream.StreamFeatureGroupController; import io.hops.hopsworks.exceptions.FeaturestoreException; import io.hops.hopsworks.exceptions.HopsSecurityException; import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.Featuregroup; @@ -44,10 +43,7 @@ public class PreviewBuilder { @EJB - private CachedFeaturegroupController cachedFeaturegroupController; - - @EJB - private StreamFeatureGroupController streamFeaturegroupController; + private FeaturegroupController featuregroupController; private URI uri(UriInfo uriInfo, Project project, Featuregroup featuregroup) { return uriInfo.getBaseUriBuilder().path(ResourceRequest.Name.PROJECT.toString().toLowerCase()) @@ -64,15 +60,9 @@ public PreviewDTO build(UriInfo uriInfo, Users user, Project project, Featuregro String partition, boolean online, int limit) throws FeaturestoreException, HopsSecurityException { - FeaturegroupPreview preview = null; + FeaturegroupPreview preview; try { - if (featuregroup.getStreamFeatureGroup() != null) { - preview = streamFeaturegroupController - .getFeaturegroupPreview(featuregroup, project, user, partition, online, limit); - } else { - preview = cachedFeaturegroupController - .getFeaturegroupPreview(featuregroup, project, user, partition, online, limit); - } + preview = featuregroupController.getFeaturegroupPreview(featuregroup, project, user, partition, online, limit); } catch (SQLException e) { throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.COULD_NOT_PREVIEW_FEATUREGROUP, Level.SEVERE, "Feature Group id: " + featuregroup.getId(), e.getMessage(), e); diff --git a/hopsworks-api/src/main/java/io/hops/hopsworks/api/featurestore/trainingdataset/PreparedStatementBuilder.java b/hopsworks-api/src/main/java/io/hops/hopsworks/api/featurestore/trainingdataset/PreparedStatementBuilder.java index 86c445e501..47fc5a501f 100644 --- a/hopsworks-api/src/main/java/io/hops/hopsworks/api/featurestore/trainingdataset/PreparedStatementBuilder.java +++ b/hopsworks-api/src/main/java/io/hops/hopsworks/api/featurestore/trainingdataset/PreparedStatementBuilder.java @@ -187,8 +187,7 @@ private List createServingPreparedStatementDTOS( for (TrainingDatasetJoin join : joins) { Featuregroup featuregroup = join.getFeatureGroup(); - if ((featuregroup.getStreamFeatureGroup() != null && !featuregroup.getStreamFeatureGroup().isOnlineEnabled()) - || (featuregroup.getCachedFeaturegroup() != null && !featuregroup.getCachedFeaturegroup().isOnlineEnabled())){ + if (!featuregroup.isOnlineEnabled()){ throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.FEATURESTORE_ONLINE_NOT_ENABLED, Level.FINE, "Inference vector is only available for training datasets generated by online enabled " + "feature groups. Feature group `" + featuregroup.getName() + "` is not online enabled."); diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/FeatureGroupInputValidation.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/FeatureGroupInputValidation.java index d6be48fdf9..b08a58cf5d 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/FeatureGroupInputValidation.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/FeatureGroupInputValidation.java @@ -32,6 +32,7 @@ import javax.ejb.Stateless; import javax.ejb.TransactionAttribute; import javax.ejb.TransactionAttributeType; +import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.logging.Level; @@ -135,11 +136,7 @@ public void verifySchemaProvided(FeaturegroupDTO featuregroupDTO) throws Feature * @throws FeaturestoreException */ public void verifyOnlineOfflineTypeMatch(FeaturegroupDTO featuregroupDTO) throws FeaturestoreException{ - if ((featuregroupDTO instanceof CachedFeaturegroupDTO - && ((CachedFeaturegroupDTO) featuregroupDTO).getOnlineEnabled()) || - (featuregroupDTO instanceof StreamFeatureGroupDTO - && ((StreamFeatureGroupDTO) featuregroupDTO).getOnlineEnabled())) { - + if (featuregroupDTO.getOnlineEnabled()) { for (FeatureGroupFeatureDTO feature : featuregroupDTO.getFeatures()) { String offlineType = feature.getType().toLowerCase().replace(" ", ""); String onlineType = @@ -190,11 +187,7 @@ public void verifyOnlineOfflineTypeMatch(FeaturegroupDTO featuregroupDTO) throws * @throws FeaturestoreException */ public void verifyOnlineSchemaValid(FeaturegroupDTO featuregroupDTO) throws FeaturestoreException{ - if ((featuregroupDTO instanceof CachedFeaturegroupDTO - && ((CachedFeaturegroupDTO) featuregroupDTO).getOnlineEnabled()) || - (featuregroupDTO instanceof StreamFeatureGroupDTO - && ((StreamFeatureGroupDTO) featuregroupDTO).getOnlineEnabled())) { - + if (featuregroupDTO.getOnlineEnabled()) { if (featuregroupDTO.getFeatures().size() > FeaturestoreConstants.MAX_MYSQL_COLUMNS) { throw new FeaturestoreException( COULD_NOT_CREATE_ONLINE_FEATUREGROUP, @@ -229,10 +222,7 @@ public void verifyOnlineSchemaValid(FeaturegroupDTO featuregroupDTO) throws Feat * @throws FeaturestoreException */ public void verifyPrimaryKeySupported(FeaturegroupDTO featuregroupDTO) throws FeaturestoreException{ - if ((featuregroupDTO instanceof CachedFeaturegroupDTO - && ((CachedFeaturegroupDTO) featuregroupDTO).getOnlineEnabled()) || - (featuregroupDTO instanceof StreamFeatureGroupDTO - && ((StreamFeatureGroupDTO) featuregroupDTO).getOnlineEnabled())) { + if (featuregroupDTO.getOnlineEnabled()) { Integer totalBytes = 0; for (FeatureGroupFeatureDTO feature : featuregroupDTO.getFeatures()) { if (feature.getPrimary()) { @@ -338,4 +328,28 @@ public void verifyPartitionKeySupported(FeaturegroupDTO featuregroupDTO) throws } } } + + public List verifyAndGetNewFeatures(List previousSchema, + List newSchema) + throws FeaturestoreException { + List newFeatures = new ArrayList<>(); + for (FeatureGroupFeatureDTO newFeature : newSchema) { + boolean isNew = + !previousSchema.stream().anyMatch(previousFeature -> previousFeature.getName().equals(newFeature.getName())); + if (isNew) { + newFeatures.add(newFeature); + if (newFeature.getPrimary() || newFeature.getPartition()) { + throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_FEATUREGROUP_UPDATE, Level.FINE, + "Appended feature `" + newFeature.getName() + "` is specified as primary or partition key. Primary key and " + + "partition key cannot be changed when appending features."); + } + if (newFeature.getType() == null) { + throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_FEATUREGROUP_UPDATE, Level.FINE, + "Appended feature `" + newFeature.getName() + "` is missing type information. Type information is " + + "mandatory when appending features to a feature group."); + } + } + } + return newFeatures; + } } diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/FeaturegroupController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/FeaturegroupController.java index 1127c5fab2..fc0d145869 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/FeaturegroupController.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/FeaturegroupController.java @@ -27,12 +27,14 @@ import io.hops.hopsworks.common.featurestore.featuregroup.cached.CachedFeaturegroupController; import io.hops.hopsworks.common.featurestore.featuregroup.cached.CachedFeaturegroupDTO; import io.hops.hopsworks.common.featurestore.featuregroup.cached.CachedFeaturegroupFacade; +import io.hops.hopsworks.common.featurestore.featuregroup.cached.FeaturegroupPreview; import io.hops.hopsworks.common.featurestore.featuregroup.ondemand.OnDemandFeaturegroupController; import io.hops.hopsworks.common.featurestore.featuregroup.ondemand.OnDemandFeaturegroupDTO; import io.hops.hopsworks.common.featurestore.featuregroup.online.OnlineFeaturegroupController; import io.hops.hopsworks.common.featurestore.featuregroup.stream.StreamFeatureGroupController; import io.hops.hopsworks.common.featurestore.featuregroup.stream.StreamFeatureGroupDTO; import io.hops.hopsworks.common.featurestore.featuregroup.stream.StreamFeatureGroupFacade; +import io.hops.hopsworks.common.featurestore.online.OnlineFeaturestoreController; import io.hops.hopsworks.common.featurestore.statistics.StatisticsController; import io.hops.hopsworks.common.featurestore.statistics.columns.StatisticColumnController; import io.hops.hopsworks.common.featurestore.storageconnectors.FeaturestoreStorageConnectorController; @@ -123,6 +125,8 @@ public class FeaturegroupController { @EJB private InodeController inodeController; @EJB + private OnlineFeaturestoreController onlineFeaturestoreController; + @EJB private OnlineFeaturegroupController onlineFeaturegroupController; @EJB private FeaturestoreActivityFacade fsActivityFacade; @@ -181,13 +185,10 @@ public FeaturegroupDTO clearFeaturegroup(Featuregroup featuregroup, Project proj switch (featuregroup.getFeaturegroupType()) { case CACHED_FEATURE_GROUP: case STREAM_FEATURE_GROUP: + case ON_DEMAND_FEATURE_GROUP: FeaturegroupDTO featuregroupDTO = convertFeaturegrouptoDTO(featuregroup, project, user); deleteFeaturegroup(featuregroup, project, user); return createFeaturegroupNoValidation(featuregroup.getFeaturestore(), featuregroupDTO, project, user); - case ON_DEMAND_FEATURE_GROUP: - throw new FeaturestoreException( - RESTCodes.FeaturestoreErrorCode.CLEAR_OPERATION_NOT_SUPPORTED_FOR_ON_DEMAND_FEATUREGROUPS, - Level.FINE, "featuregroupId: " + featuregroup.getId()); default: throw new IllegalArgumentException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_FEATUREGROUP_TYPE.getMessage() + ", Recognized Feature group types are: " + FeaturegroupType.ON_DEMAND_FEATURE_GROUP + ", and: " + @@ -254,16 +255,13 @@ public FeaturegroupDTO createFeaturegroupNoValidation(Featurestore featurestore, CachedFeaturegroup cachedFeaturegroup = null; StreamFeatureGroup streamFeatureGroup = null; - List featuresNoHudi = null; + // make copy of schema without hudi columns + List featuresNoHudi = new ArrayList<>(featuregroupDTO.getFeatures());; if (featuregroupDTO instanceof CachedFeaturegroupDTO) { - // make copy of schema without hudi columns - featuresNoHudi = new ArrayList<>(featuregroupDTO.getFeatures()); cachedFeaturegroup = cachedFeaturegroupController.createCachedFeaturegroup(featurestore, (CachedFeaturegroupDTO) featuregroupDTO, project, user); } else if (featuregroupDTO instanceof StreamFeatureGroupDTO){ - // make copy of schema without hudi columns - featuresNoHudi = new ArrayList<>(featuregroupDTO.getFeatures()); streamFeatureGroup = streamFeatureGroupController.createStreamFeatureGroup(featurestore, (StreamFeatureGroupDTO) featuregroupDTO, project, user); } else { @@ -276,17 +274,10 @@ public FeaturegroupDTO createFeaturegroupNoValidation(Featurestore featurestore, cachedFeaturegroup, streamFeatureGroup, onDemandFeaturegroup); // online feature group needs to be set up after persisting metadata in order to get feature group id - if (featuregroupDTO instanceof CachedFeaturegroupDTO && settings.isOnlineFeaturestore() - && featuregroup.getCachedFeaturegroup().isOnlineEnabled()){ + if (settings.isOnlineFeaturestore() && featuregroup.isOnlineEnabled()) { onlineFeaturegroupController.setupOnlineFeatureGroup(featurestore, featuregroup, featuresNoHudi, project, user); - } else if (featuregroupDTO instanceof StreamFeatureGroupDTO) { - if (settings.isOnlineFeaturestore() && featuregroup.getStreamFeatureGroup().isOnlineEnabled()) { - // setup kafka topic and online feature group in rondb - onlineFeaturegroupController.setupOnlineFeatureGroup(featurestore, featuregroup, featuresNoHudi, project, user); - } else { - // not online enabled so set up only kafka topic - streamFeatureGroupController.setupOfflineStreamFeatureGroup(project, featuregroup, featuresNoHudi); - } + } else if (featuregroupDTO instanceof StreamFeatureGroupDTO && !featuregroupDTO.getOnlineEnabled()) { + streamFeatureGroupController.setupOfflineStreamFeatureGroup(project, featuregroup, featuresNoHudi); } FeaturegroupDTO completeFeaturegroupDTO = convertFeaturegrouptoDTO(featuregroup, project, user); @@ -339,7 +330,8 @@ public FeaturegroupDTO convertFeaturegrouptoDTO(Featuregroup featuregroup, Proje featuregroup.getOnDemandFeaturegroup().getFeaturestoreConnector()); OnDemandFeaturegroupDTO onDemandFeaturegroupDTO = - new OnDemandFeaturegroupDTO(featurestoreName, featuregroup, storageConnectorDTO); + onDemandFeaturegroupController.convertOnDemandFeatureGroupToDTO(featurestoreName, featuregroup, + storageConnectorDTO); try { String path = getFeatureGroupLocation(featuregroup); @@ -412,11 +404,10 @@ public FeaturegroupDTO getFeaturegroupWithIdAndFeaturestore(Featurestore feature * @throws FeaturestoreException */ public FeaturegroupDTO updateFeaturegroupMetadata(Project project, Users user, Featurestore featurestore, + Featuregroup featuregroup, FeaturegroupDTO featuregroupDTO) throws FeaturestoreException, SQLException, ProvenanceException, ServiceException, SchemaException, KafkaException { - Featuregroup featuregroup = getFeaturegroupById(featurestore, featuregroupDTO.getId()); - featurestoreUtils.verifyUserProjectEqualsFsProjectAndDataOwner(user, project, featuregroup.getFeaturestore(), FeaturestoreUtils.ActionMessage.UPDATE_FEATURE_GROUP_METADATA); @@ -440,7 +431,7 @@ public FeaturegroupDTO updateFeaturegroupMetadata(Project project, Users user, F streamFeatureGroupController.updateMetadata(project, user, featuregroup, (StreamFeatureGroupDTO) featuregroupDTO); } else if (featuregroup.getFeaturegroupType() == FeaturegroupType.ON_DEMAND_FEATURE_GROUP) { - onDemandFeaturegroupController.updateOnDemandFeaturegroupMetadata(featuregroup.getOnDemandFeaturegroup(), + onDemandFeaturegroupController.updateOnDemandFeaturegroupMetadata(project, user, featuregroup, (OnDemandFeaturegroupDTO) featuregroupDTO); } @@ -464,26 +455,33 @@ public FeaturegroupDTO updateFeaturegroupMetadata(Project project, Users user, F /** * Enable online feature serving of a feature group that is currently only offline * - * @param featurestore the featurestore where the featuregroup resides - * @param featuregroupDTO the updated featuregroup metadata + * @param featuregroup the updated featuregroup metadata + * @param project + * @param user * @return DTO of the updated feature group * @throws FeaturestoreException */ - public FeaturegroupDTO enableFeaturegroupOnline(Featurestore featurestore, FeaturegroupDTO featuregroupDTO, - Project project, Users user) + public FeaturegroupDTO enableFeaturegroupOnline(Featuregroup featuregroup, Project project, Users user) throws FeaturestoreException, SQLException, ServiceException, KafkaException, SchemaException, ProjectException, UserException, IOException, HopsSecurityException { - Featuregroup featuregroup = getFeaturegroupById(featurestore, featuregroupDTO.getId()); - featurestoreUtils.verifyUserProjectEqualsFsProjectAndDataOwner(user, project, featuregroup.getFeaturestore(), + Featurestore featurestore = featuregroup.getFeaturestore(); + if(!settings.isOnlineFeaturestore()) { + throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.FEATURESTORE_ONLINE_NOT_ENABLED, + Level.FINE, "Online Featurestore is not enabled for this Hopsworks cluster."); + } + if (!onlineFeaturestoreController.checkIfDatabaseExists( + onlineFeaturestoreController.getOnlineFeaturestoreDbName(featurestore.getProject()))) { + throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.FEATURESTORE_ONLINE_NOT_ENABLED, + Level.FINE, "Online Featurestore is not enabled for this project. To enable online feature store, talk to an " + + "administrator."); + } + featurestoreUtils.verifyUserProjectEqualsFsProjectAndDataOwner(user, project, featurestore, FeaturestoreUtils.ActionMessage.ENABLE_FEATURE_GROUP_ONLINE); - if(featuregroup.getFeaturegroupType() == FeaturegroupType.ON_DEMAND_FEATURE_GROUP){ - throw new FeaturestoreException( - RESTCodes.FeaturestoreErrorCode.ONLINE_FEATURE_SERVING_NOT_SUPPORTED_FOR_ON_DEMAND_FEATUREGROUPS, Level.FINE, - ", Online feature serving is only supported for featuregroups of type: " - + FeaturegroupType.CACHED_FEATURE_GROUP + ", and the user requested to enable feature serving on a " + - "featuregroup with type:" + FeaturegroupType.ON_DEMAND_FEATURE_GROUP); + if (featuregroup.getFeaturegroupType() == FeaturegroupType.ON_DEMAND_FEATURE_GROUP){ + onDemandFeaturegroupController.enableFeatureGroupOnline(featurestore, featuregroup, project, user); + } else { + cachedFeaturegroupController.enableFeaturegroupOnline(featurestore, featuregroup, project, user); } - cachedFeaturegroupController.enableFeaturegroupOnline(featurestore, featuregroup, project, user); // Log activity fsActivityFacade.logMetadataActivity(user, featuregroup, FeaturestoreActivityMeta.ONLINE_ENABLED, null); @@ -499,17 +497,22 @@ public FeaturegroupDTO enableFeaturegroupOnline(Featurestore featurestore, Featu * @return DTO of the updated feature group * @throws FeaturestoreException */ - public FeaturegroupDTO disableFeaturegroupOnline(Featuregroup featuregroup, Project project, Users user) + public FeaturegroupDTO disableFeaturegroupOnline(Featuregroup featuregroup, + Project project, Users user) throws FeaturestoreException, SQLException, ServiceException, SchemaException, KafkaException { - featurestoreUtils.verifyUserProjectEqualsFsProjectAndDataOwner(user, project, featuregroup.getFeaturestore(), - FeaturestoreUtils.ActionMessage.DISABLE_FEATURE_GROUP_ONLINE); - if(featuregroup.getFeaturegroupType() == FeaturegroupType.ON_DEMAND_FEATURE_GROUP) { - throw new FeaturestoreException( - RESTCodes.FeaturestoreErrorCode.ONLINE_FEATURE_SERVING_NOT_SUPPORTED_FOR_ON_DEMAND_FEATUREGROUPS, Level.FINE, - ", Online feature serving is only supported for featuregroups of type: " - + FeaturegroupType.CACHED_FEATURE_GROUP + ", and the user requested to a feature serving operation on a " + - "featuregroup with type:" + FeaturegroupType.ON_DEMAND_FEATURE_GROUP); + Featurestore featurestore = featuregroup.getFeaturestore(); + if(!settings.isOnlineFeaturestore()) { + throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.FEATURESTORE_ONLINE_NOT_ENABLED, + Level.FINE, "Online Featurestore is not enabled for this Hopsworks cluster."); } + if (!onlineFeaturestoreController.checkIfDatabaseExists( + onlineFeaturestoreController.getOnlineFeaturestoreDbName(featurestore.getProject()))) { + throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.FEATURESTORE_ONLINE_NOT_ENABLED, + Level.FINE, "Online Featurestore is not enabled for this project. To enable online feature store, talk to an " + + "administrator."); + } + featurestoreUtils.verifyUserProjectEqualsFsProjectAndDataOwner(user, project, featurestore, + FeaturestoreUtils.ActionMessage.DISABLE_FEATURE_GROUP_ONLINE); cachedFeaturegroupController.disableFeaturegroupOnline(featuregroup, project, user); // Log activity @@ -594,18 +597,18 @@ public void deleteFeaturegroup(Featuregroup featuregroup, Project project, Users } switch (featuregroup.getFeaturegroupType()) { case CACHED_FEATURE_GROUP: - //Delete hive_table will cascade to cached_featuregroup_table which will cascade to feature_group table + // Delete hive_table will cascade to cached_featuregroup_table which will cascade to feature_group table cachedFeaturegroupController.dropHiveFeaturegroup(featuregroup, project, user); - //Delete mysql table and metadata - if(settings.isOnlineFeaturestore() && featuregroup.getCachedFeaturegroup().isOnlineEnabled()) { + // Delete mysql table and metadata + if(settings.isOnlineFeaturestore() && featuregroup.isOnlineEnabled()) { onlineFeaturegroupController.disableOnlineFeatureGroup(featuregroup, project, user); } break; case STREAM_FEATURE_GROUP: - //Delete hive_table will cascade to stream_featuregroup_table which will cascade to feature_group table + // Delete hive_table will cascade to stream_featuregroup_table which will cascade to feature_group table cachedFeaturegroupController.dropHiveFeaturegroup(featuregroup, project, user); - //Delete mysql table and metadata - if (settings.isOnlineFeaturestore() && featuregroup.getStreamFeatureGroup().isOnlineEnabled()) { + // Delete mysql table and metadata + if (settings.isOnlineFeaturestore() && featuregroup.isOnlineEnabled()) { onlineFeaturegroupController.disableOnlineFeatureGroup(featuregroup, project, user); } else { // only topics need to be deleted, but no RonDB table @@ -613,9 +616,13 @@ public void deleteFeaturegroup(Featuregroup featuregroup, Project project, Users } break; case ON_DEMAND_FEATURE_GROUP: - //Delete on_demand_feature_group will cascade will cascade to feature_group table + // Delete on_demand_feature_group will cascade to feature_group table onDemandFeaturegroupController .removeOnDemandFeaturegroup(featuregroup.getFeaturestore(), featuregroup, project, user); + // Delete mysql table and metadata + if (settings.isOnlineFeaturestore() && featuregroup.isOnlineEnabled()) { + onlineFeaturegroupController.disableOnlineFeatureGroup(featuregroup, project, user); + } break; default: throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_FEATUREGROUP_TYPE, Level.FINE, @@ -754,6 +761,7 @@ private Featuregroup persistFeaturegroupMetadata(Featurestore featurestore, Proj featuregroup.setStreamFeatureGroup(streamFeatureGroup); featuregroup.setOnDemandFeaturegroup(onDemandFeaturegroup); featuregroup.setEventTime(featuregroupDTO.getEventTime()); + featuregroup.setOnlineEnabled(settings.isOnlineFeaturestore() && featuregroupDTO.getOnlineEnabled()); StatisticsConfig statisticsConfig = new StatisticsConfig(featuregroupDTO.getStatisticsConfig().getEnabled(), featuregroupDTO.getStatisticsConfig().getCorrelations(), featuregroupDTO.getStatisticsConfig().getHistograms(), @@ -788,7 +796,8 @@ public List getFeatures(Featuregroup featuregroup, Proje featuregroup.getFeaturestore(), project, user); case ON_DEMAND_FEATURE_GROUP: return featuregroup.getOnDemandFeaturegroup().getFeatures().stream() - .map(f -> new FeatureGroupFeatureDTO(f.getName(), f.getType(), f.getPrimary(), null, featuregroup.getId())) + .map(f -> new FeatureGroupFeatureDTO( + f.getName(), f.getType(), f.getPrimary(), f.getDefaultValue(), featuregroup.getId())) .collect(Collectors.toList()); } return new ArrayList<>(); @@ -874,16 +883,42 @@ void verifyFeaturesNoDefaultValue(List features) private void enforceFeaturegroupQuotas(Featurestore featurestore, FeaturegroupDTO featuregroup) throws FeaturestoreException { try { - boolean onlineEnabled = false; - if (featuregroup instanceof CachedFeaturegroupDTO) { - onlineEnabled = ((CachedFeaturegroupDTO) featuregroup).getOnlineEnabled(); - } else if (featuregroup instanceof StreamFeatureGroupDTO) { - onlineEnabled = ((StreamFeatureGroupDTO) featuregroup).getOnlineEnabled(); - } - quotasEnforcement.enforceFeaturegroupsQuota(featurestore, onlineEnabled); + quotasEnforcement.enforceFeaturegroupsQuota(featurestore, featuregroup.getOnlineEnabled()); } catch (QuotaEnforcementException ex) { throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.COULD_NOT_CREATE_FEATUREGROUP, Level.SEVERE, ex.getMessage(), ex.getMessage(), ex); } } + + /** + * Previews a given featuregroup by doing a SELECT LIMIT query on the Hive Table (offline feature data) + * and the MySQL table (online feature data) + * + * @param featuregroup of the featuregroup to preview + * @param project the project the user is operating from, in case of shared feature store + * @param user the user making the request + * @param partition the selected partition if any as represented in the PARTITIONS_METASTORE + * @param online whether to show preview from the online feature store + * @param limit the number of rows to visualize + * @return A DTO with the first 20 feature rows of the online and offline tables. + * @throws SQLException + * @throws FeaturestoreException + * @throws HopsSecurityException + */ + public FeaturegroupPreview getFeaturegroupPreview(Featuregroup featuregroup, Project project, + Users user, String partition, boolean online, int limit) + throws SQLException, FeaturestoreException, HopsSecurityException { + if (online && featuregroup.isOnlineEnabled()) { + return onlineFeaturegroupController.getFeaturegroupPreview(featuregroup, project, user, limit); + } else if (online) { + throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.FEATUREGROUP_NOT_ONLINE, Level.FINE); + } else if (featuregroup.getFeaturegroupType() == FeaturegroupType.ON_DEMAND_FEATURE_GROUP) { + throw new FeaturestoreException( + RESTCodes.FeaturestoreErrorCode.PREVIEW_NOT_SUPPORTED_FOR_ON_DEMAND_FEATUREGROUPS, + Level.FINE, "Preview for offline storage of external feature groups is not supported", + "featuregroupId: " + featuregroup.getId()); + } else { + return cachedFeaturegroupController.getOfflineFeaturegroupPreview(featuregroup, project, user, partition, limit); + } + } } diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/FeaturegroupDTO.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/FeaturegroupDTO.java index db38bc2bb9..4f8bbe057f 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/FeaturegroupDTO.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/FeaturegroupDTO.java @@ -17,9 +17,11 @@ package io.hops.hopsworks.common.featurestore.featuregroup; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonSetter; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.annotation.JsonTypeName; +import com.fasterxml.jackson.annotation.Nulls; import io.hops.hopsworks.common.featurestore.FeaturestoreEntityDTO; import io.hops.hopsworks.common.featurestore.datavalidationv2.suites.ExpectationSuiteDTO; import io.hops.hopsworks.common.featurestore.feature.FeatureGroupFeatureDTO; @@ -52,6 +54,8 @@ public class FeaturegroupDTO extends FeaturestoreEntityDTO { private ExpectationSuiteDTO expectationSuite = null; private String onlineTopicName = null; private String eventTime = null; + @JsonSetter(nulls = Nulls.SKIP) + private Boolean onlineEnabled = false; public FeaturegroupDTO() { } @@ -121,6 +125,14 @@ public String getEventTime() { public void setEventTime(String eventTime) { this.eventTime = eventTime; } + + public Boolean getOnlineEnabled() { + return onlineEnabled; + } + + public void setOnlineEnabled(Boolean onlineEnabled) { + this.onlineEnabled = onlineEnabled; + } @Override public String toString() { diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/cached/CachedFeaturegroupController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/cached/CachedFeaturegroupController.java index 6de0d9ee1d..4c9f5bce19 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/cached/CachedFeaturegroupController.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/cached/CachedFeaturegroupController.java @@ -22,11 +22,12 @@ import io.hops.hopsworks.common.featurestore.activity.FeaturestoreActivityFacade; import io.hops.hopsworks.common.featurestore.datavalidationv2.suites.ExpectationSuiteController; import io.hops.hopsworks.common.featurestore.feature.FeatureGroupFeatureDTO; +import io.hops.hopsworks.common.featurestore.featuregroup.FeatureGroupInputValidation; import io.hops.hopsworks.common.featurestore.featuregroup.FeaturegroupController; import io.hops.hopsworks.common.featurestore.featuregroup.FeaturegroupDTO; +import io.hops.hopsworks.common.featurestore.featuregroup.FeaturegroupFacade; import io.hops.hopsworks.common.featurestore.featuregroup.online.OnlineFeaturegroupController; import io.hops.hopsworks.common.featurestore.featuregroup.stream.StreamFeatureGroupDTO; -import io.hops.hopsworks.common.featurestore.online.OnlineFeaturestoreController; import io.hops.hopsworks.common.featurestore.query.ConstructorController; import io.hops.hopsworks.common.featurestore.query.Feature; import io.hops.hopsworks.common.featurestore.utils.FeaturestoreUtils; @@ -102,6 +103,8 @@ public class CachedFeaturegroupController { @EJB private CachedFeaturegroupFacade cachedFeatureGroupFacade; @EJB + private FeaturegroupFacade featureGroupFacade; + @EJB private CertificateMaterializer certificateMaterializer; @EJB private Settings settings; @@ -110,8 +113,6 @@ public class CachedFeaturegroupController { @EJB private OnlineFeaturegroupController onlineFeaturegroupController; @EJB - private OnlineFeaturestoreController onlineFeaturestoreController; - @EJB private OfflineFeatureGroupController offlineFeatureGroupController; @EJB private HiveController hiveController; @@ -125,6 +126,8 @@ public class CachedFeaturegroupController { private ExpectationSuiteController expectationSuiteController; @EJB private FeaturegroupController featuregroupController; + @EJB + private FeatureGroupInputValidation featureGroupInputValidation; private static final Logger LOGGER = Logger.getLogger(CachedFeaturegroupController.class.getName()); private static final List HUDI_SPEC_FEATURE_NAMES = Arrays.asList("_hoodie_record_key", @@ -177,33 +180,6 @@ private Connection initConnection(String databaseName, Project project, Users us "project: " + project.getName() + ", hive database: " + databaseName, e.getMessage(), e); } } - - /** - * Previews a given featuregroup by doing a SELECT LIMIT query on the Hive Table (offline feature data) - * and the MySQL table (online feature data) - * - * @param featuregroup of the featuregroup to preview - * @param project the project the user is operating from, in case of shared feature store - * @param user the user making the request - * @param partition the selected partition if any as represented in the PARTITIONS_METASTORE - * @param online whether to show preview from the online feature store - * @param limit the number of rows to visualize - * @return A DTO with the first 20 feature rows of the online and offline tables. - * @throws SQLException - * @throws FeaturestoreException - * @throws HopsSecurityException - */ - public FeaturegroupPreview getFeaturegroupPreview(Featuregroup featuregroup, Project project, - Users user, String partition, boolean online, int limit) - throws SQLException, FeaturestoreException, HopsSecurityException { - if (online && featuregroup.getCachedFeaturegroup().isOnlineEnabled()) { - return onlineFeaturegroupController.getFeaturegroupPreview(featuregroup, project, user, limit); - } else if (online) { - throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.FEATUREGROUP_NOT_ONLINE, Level.FINE); - } else { - return getOfflineFeaturegroupPreview(featuregroup, project, user, partition, limit); - } - } /** * Previews the offline data of a given featuregroup by doing a SELECT LIMIT query on the Hive Table @@ -310,15 +286,13 @@ public CachedFeaturegroup createCachedFeaturegroup(Featurestore featurestore, cachedFeaturegroupDTO.getFeatures(), project, user, getTableFormat(cachedFeaturegroupDTO.getTimeTravelFormat())); - boolean onlineEnabled = settings.isOnlineFeaturestore() && cachedFeaturegroupDTO.getOnlineEnabled(); - //Get HiveTblId of the newly created table from the metastore HiveTbls hiveTbls = cachedFeatureGroupFacade.getHiveTableByNameAndDB(tbl, featurestore.getHiveDbId()) .orElseThrow(() -> new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.COULD_NOT_CREATE_FEATUREGROUP, Level.WARNING, "", "Table created correctly but not in the metastore")); //Persist cached feature group - return persistCachedFeaturegroupMetadata(hiveTbls, onlineEnabled, cachedFeaturegroupDTO.getTimeTravelFormat(), + return persistCachedFeaturegroupMetadata(hiveTbls, cachedFeaturegroupDTO.getTimeTravelFormat(), cachedFeaturegroupDTO.getFeatures()); } @@ -338,7 +312,7 @@ public CachedFeaturegroupDTO convertCachedFeaturegroupToDTO(Featuregroup feature List featureGroupFeatureDTOS = getFeaturesDTO(featuregroup.getCachedFeaturegroup(), featuregroup.getId(), featuregroup.getFeaturestore(), project, user); - if (settings.isOnlineFeaturestore() && featuregroup.getCachedFeaturegroup().isOnlineEnabled()) { + if (settings.isOnlineFeaturestore() && featuregroup.isOnlineEnabled()) { cachedFeaturegroupDTO.setOnlineEnabled(true); cachedFeaturegroupDTO.setOnlineTopicName(onlineFeaturegroupController .onlineFeatureGroupTopicName(project.getId(), featuregroup.getId(), @@ -589,12 +563,10 @@ private void closeConnection(Connection conn, Users user, Project project) { * @param featureGroupFeatureDTOS the list of the feature group feature DTOs * @return Entity of the created cached feature group */ - private CachedFeaturegroup persistCachedFeaturegroupMetadata(HiveTbls hiveTable, boolean onlineEnabled, - TimeTravelFormat timeTravelFormat, + private CachedFeaturegroup persistCachedFeaturegroupMetadata(HiveTbls hiveTable, TimeTravelFormat timeTravelFormat, List featureGroupFeatureDTOS) { CachedFeaturegroup cachedFeaturegroup = new CachedFeaturegroup(); cachedFeaturegroup.setHiveTbls(hiveTable); - cachedFeaturegroup.setOnlineEnabled(onlineEnabled); cachedFeaturegroup.setTimeTravelFormat(timeTravelFormat); cachedFeaturegroup.setFeaturesExtraConstraints( buildFeatureExtraConstrains(featureGroupFeatureDTOS, cachedFeaturegroup, null)); @@ -619,20 +591,10 @@ private CachedFeaturegroup persistCachedFeaturegroupMetadata(HiveTbls hiveTable, * @throws FeaturestoreException * @throws SQLException */ - public FeaturegroupDTO enableFeaturegroupOnline(Featurestore featurestore, Featuregroup featuregroup, + public void enableFeaturegroupOnline(Featurestore featurestore, Featuregroup featuregroup, Project project, Users user) throws FeaturestoreException, SQLException, ServiceException, KafkaException, SchemaException, ProjectException, UserException, IOException, HopsSecurityException { - if(!settings.isOnlineFeaturestore()) { - throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.FEATURESTORE_ONLINE_NOT_ENABLED, - Level.FINE, "Online Featurestore is not enabled for this Hopsworks cluster."); - } - if (!onlineFeaturestoreController.checkIfDatabaseExists( - onlineFeaturestoreController.getOnlineFeaturestoreDbName(featurestore.getProject()))) { - throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.FEATURESTORE_ONLINE_NOT_ENABLED, - Level.FINE, "Online Featurestore is not enabled for this project. To enable online feature store, talk to an " + - "administrator."); - } CachedFeaturegroup cachedFeaturegroup = featuregroup.getCachedFeaturegroup(); List features = getFeaturesDTO(featuregroup.getCachedFeaturegroup(), featuregroup.getId(), @@ -641,13 +603,12 @@ public FeaturegroupDTO enableFeaturegroupOnline(Featurestore featurestore, Featu features = dropHudiSpecFeatureGroupFeature(features); } - if(!cachedFeaturegroup.isOnlineEnabled()) { + if(!featuregroup.isOnlineEnabled()) { onlineFeaturegroupController.setupOnlineFeatureGroup(featurestore, featuregroup, features, project, user); } //Set foreign key of the cached feature group to the new online feature group - cachedFeaturegroup.setOnlineEnabled(true); - cachedFeatureGroupFacade.updateMetadata(cachedFeaturegroup); - return convertCachedFeaturegroupToDTO(featuregroup, project, user); + featuregroup.setOnlineEnabled(true); + featureGroupFacade.updateFeaturegroupMetadata(featuregroup); } /** @@ -660,25 +621,13 @@ public FeaturegroupDTO enableFeaturegroupOnline(Featurestore featurestore, Featu * @throws FeaturestoreException * @throws SQLException */ - public FeaturegroupDTO disableFeaturegroupOnline(Featuregroup featuregroup, Project project, Users user) - throws FeaturestoreException, SQLException, ServiceException, SchemaException, KafkaException { - if(!settings.isOnlineFeaturestore()) { - throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.FEATURESTORE_ONLINE_NOT_ENABLED, - Level.FINE, "Online Featurestore is not enabled for this Hopsworks cluster."); - } - if (!onlineFeaturestoreController.checkIfDatabaseExists( - onlineFeaturestoreController.getOnlineFeaturestoreDbName(featuregroup.getFeaturestore().getProject()))) { - throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.FEATURESTORE_ONLINE_NOT_ENABLED, - Level.FINE, "Online Featurestore is not enabled for this project. To enable online feature store, talk to an " + - "administrator."); - } - CachedFeaturegroup cachedFeaturegroup = featuregroup.getCachedFeaturegroup(); - if (settings.isOnlineFeaturestore() && cachedFeaturegroup.isOnlineEnabled()) { + public void disableFeaturegroupOnline(Featuregroup featuregroup, Project project, Users user) + throws FeaturestoreException, SQLException, SchemaException, KafkaException { + if (settings.isOnlineFeaturestore() && featuregroup.isOnlineEnabled()) { onlineFeaturegroupController.disableOnlineFeatureGroup(featuregroup, project, user); - cachedFeaturegroup.setOnlineEnabled(false); - cachedFeatureGroupFacade.persist(cachedFeaturegroup); + featuregroup.setOnlineEnabled(false); + featureGroupFacade.updateFeaturegroupMetadata(featuregroup); } - return convertCachedFeaturegroupToDTO(featuregroup, project, user); } public void updateMetadata(Project project, Users user, Featuregroup featuregroup, @@ -694,7 +643,7 @@ public void updateMetadata(Project project, Users user, Featuregroup featuregrou List newFeatures = new ArrayList<>(); if (featuregroupDTO.getFeatures() != null) { verifyPreviousSchemaUnchanged(previousSchema, featuregroupDTO.getFeatures()); - newFeatures = verifyAndGetNewFeatures(previousSchema, featuregroupDTO.getFeatures()); + newFeatures = featureGroupInputValidation.verifyAndGetNewFeatures(previousSchema, featuregroupDTO.getFeatures()); } // change table description @@ -712,7 +661,7 @@ public void updateMetadata(Project project, Users user, Featuregroup featuregrou featuregroup.getFeaturestore(), tableName, newFeatures, project, user); // if online feature group - if (settings.isOnlineFeaturestore() && featuregroup.getCachedFeaturegroup().isOnlineEnabled()) { + if (settings.isOnlineFeaturestore() && featuregroup.isOnlineEnabled()) { onlineFeaturegroupController.alterOnlineFeatureGroupSchema( featuregroup, newFeatures, featuregroupDTO.getFeatures(), project, user); } @@ -793,30 +742,6 @@ public void verifyPreviousSchemaUnchanged(List previousS } } - public List verifyAndGetNewFeatures(List previousSchema, - List newSchema) - throws FeaturestoreException { - List newFeatures = new ArrayList<>(); - for (FeatureGroupFeatureDTO newFeature : newSchema) { - boolean isNew = - !previousSchema.stream().anyMatch(previousFeature -> previousFeature.getName().equals(newFeature.getName())); - if (isNew) { - newFeatures.add(newFeature); - if (newFeature.getPrimary() || newFeature.getPartition()) { - throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_FEATUREGROUP_UPDATE, Level.FINE, - "Appended feature `" + newFeature.getName() + "` is specified as primary or partition key. Primary key and " - + "partition key cannot be changed when appending features."); - } - if (newFeature.getType() == null) { - throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_FEATUREGROUP_UPDATE, Level.FINE, - "Appended feature `" + newFeature.getName() + "` is missing type information. Type information is " + - "mandatory when appending features to a feature group."); - } - } - } - return newFeatures; - } - public void verifyPrimaryKey(FeaturegroupDTO featuregroupDTO, TimeTravelFormat timeTravelFormat) throws FeaturestoreException { // Currently the Hudi implementation requires having at last one primary key diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/cached/CachedFeaturegroupDTO.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/cached/CachedFeaturegroupDTO.java index d040320905..6a98e69e95 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/cached/CachedFeaturegroupDTO.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/cached/CachedFeaturegroupDTO.java @@ -35,31 +35,21 @@ @JsonTypeName("cachedFeaturegroupDTO") public class CachedFeaturegroupDTO extends FeaturegroupDTO { - @JsonSetter(nulls = Nulls.SKIP) - private Boolean onlineEnabled = false; @JsonSetter(nulls = Nulls.SKIP) private TimeTravelFormat timeTravelFormat = TimeTravelFormat.NONE; - + private List parents; public CachedFeaturegroupDTO() { super(); } - + public CachedFeaturegroupDTO(Featuregroup featuregroup) { super(featuregroup); } - public Boolean getOnlineEnabled() { - return onlineEnabled; - } - - public void setOnlineEnabled(Boolean onlineEnabled) { - this.onlineEnabled = onlineEnabled; - } - public TimeTravelFormat getTimeTravelFormat () { return timeTravelFormat; } - + @JsonSetter(nulls = Nulls.SKIP) public void setTimeTravelFormat (TimeTravelFormat timeTravelFormat ) { this.timeTravelFormat = timeTravelFormat; @@ -68,16 +58,15 @@ public void setTimeTravelFormat (TimeTravelFormat timeTravelFormat ) { public List getParents() { return parents; } - + public void setParents( List parents) { this.parents = parents; } - + @Override public String toString() { return "CachedFeaturegroupDTO{" + - ", onlineEnabled=" + onlineEnabled + ", timeTravelFormat =" + timeTravelFormat + ", parentFeatureGroups =" + Objects.toString(parents) + '}'; diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/ondemand/OnDemandFeaturegroupController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/ondemand/OnDemandFeaturegroupController.java index 88c04db7b9..dd3beb58d6 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/ondemand/OnDemandFeaturegroupController.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/ondemand/OnDemandFeaturegroupController.java @@ -18,14 +18,27 @@ import com.google.common.base.Strings; import io.hops.hopsworks.common.featurestore.FeaturestoreFacade; +import io.hops.hopsworks.common.featurestore.activity.FeaturestoreActivityFacade; import io.hops.hopsworks.common.featurestore.feature.FeatureGroupFeatureDTO; +import io.hops.hopsworks.common.featurestore.featuregroup.FeatureGroupInputValidation; +import io.hops.hopsworks.common.featurestore.featuregroup.FeaturegroupFacade; +import io.hops.hopsworks.common.featurestore.featuregroup.online.OnlineFeaturegroupController; import io.hops.hopsworks.common.featurestore.storageconnectors.FeaturestoreConnectorFacade; +import io.hops.hopsworks.common.featurestore.storageconnectors.FeaturestoreStorageConnectorDTO; import io.hops.hopsworks.common.hdfs.DistributedFileSystemOps; import io.hops.hopsworks.common.hdfs.DistributedFsService; import io.hops.hopsworks.common.hdfs.HdfsUsersController; +import io.hops.hopsworks.common.hdfs.Utils; import io.hops.hopsworks.common.hdfs.inode.InodeController; import io.hops.hopsworks.exceptions.FeaturestoreException; +import io.hops.hopsworks.exceptions.HopsSecurityException; +import io.hops.hopsworks.exceptions.KafkaException; +import io.hops.hopsworks.exceptions.ProjectException; +import io.hops.hopsworks.exceptions.SchemaException; +import io.hops.hopsworks.exceptions.ServiceException; +import io.hops.hopsworks.exceptions.UserException; import io.hops.hopsworks.persistence.entity.featurestore.Featurestore; +import io.hops.hopsworks.persistence.entity.featurestore.activity.FeaturestoreActivityMeta; import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.Featuregroup; import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.ondemand.OnDemandFeature; import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.ondemand.OnDemandFeaturegroup; @@ -45,8 +58,10 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.sql.SQLException; import java.util.Collection; import java.util.ArrayList; +import java.util.Comparator; import java.util.List; import java.util.Optional; import java.util.logging.Level; @@ -70,6 +85,14 @@ public class OnDemandFeaturegroupController { private HdfsUsersController hdfsUsersController; @EJB private InodeController inodeController; + @EJB + private OnlineFeaturegroupController onlineFeatureGroupController; + @EJB + private FeatureGroupInputValidation featureGroupInputValidation; + @EJB + private FeaturegroupFacade featureGroupFacade; + @EJB + private FeaturestoreActivityFacade fsActivityFacade; /** * Persists an on demand feature group @@ -131,18 +154,43 @@ public OnDemandFeaturegroup createOnDemandFeaturegroup(Featurestore featurestore return onDemandFeaturegroup; } + public OnDemandFeaturegroupDTO convertOnDemandFeatureGroupToDTO(String featureStoreName, Featuregroup featureGroup, + FeaturestoreStorageConnectorDTO storageConnectorDTO) throws FeaturestoreException { + List features = getFeaturesDTO(featureGroup); + String onlineTopicName = onlineFeatureGroupController.onlineFeatureGroupTopicName( + featureGroup.getFeaturestore().getProject().getId(), + featureGroup.getId(), Utils.getFeaturegroupName(featureGroup)); + return new OnDemandFeaturegroupDTO(featureStoreName, featureGroup, storageConnectorDTO, features, onlineTopicName); + } + + private List getFeaturesDTO(Featuregroup featureGroup) + throws FeaturestoreException { + List features = featureGroup.getOnDemandFeaturegroup().getFeatures().stream() + .sorted(Comparator.comparing(OnDemandFeature::getIdx)) + .map(fgFeature -> + new FeatureGroupFeatureDTO(fgFeature.getName(), fgFeature.getType(), fgFeature.getDescription(), + fgFeature.getPrimary(), fgFeature.getDefaultValue(), featureGroup.getId())).collect(Collectors.toList()); + return onlineFeatureGroupController.getFeaturegroupFeatures(featureGroup, features); + } + /** * Updates metadata of an on demand feature group in the feature store * - * @param onDemandFeaturegroup the on-demand feature group to update + * @param featuregroup the on-demand feature group to update * @param onDemandFeaturegroupDTO the metadata DTO */ - public void updateOnDemandFeaturegroupMetadata(OnDemandFeaturegroup onDemandFeaturegroup, + public void updateOnDemandFeaturegroupMetadata(Project project, Users user, Featuregroup featuregroup, OnDemandFeaturegroupDTO onDemandFeaturegroupDTO) - throws FeaturestoreException { + throws FeaturestoreException, SchemaException, SQLException, KafkaException { + OnDemandFeaturegroup onDemandFeaturegroup = featuregroup.getOnDemandFeaturegroup(); + List previousSchema = getFeaturesDTO(featuregroup); + // verify previous schema unchanged and valid verifySchemaUnchangedAndValid(onDemandFeaturegroup.getFeatures(), onDemandFeaturegroupDTO.getFeatures()); + List newFeatures = featureGroupInputValidation.verifyAndGetNewFeatures(previousSchema, + onDemandFeaturegroupDTO.getFeatures()); + // Update metadata in entity if (onDemandFeaturegroupDTO.getDescription() != null) { onDemandFeaturegroup.setDescription(onDemandFeaturegroupDTO.getDescription()); @@ -150,9 +198,23 @@ public void updateOnDemandFeaturegroupMetadata(OnDemandFeaturegroup onDemandFeat // append new features and update existing ones updateOnDemandFeatures(onDemandFeaturegroup, onDemandFeaturegroupDTO.getFeatures()); + + // alter table for new additional features + if (!newFeatures.isEmpty()) { + if (featuregroup.isOnlineEnabled()) { + onlineFeatureGroupController.alterOnlineFeatureGroupSchema( + featuregroup, newFeatures, onDemandFeaturegroupDTO.getFeatures(), project, user); + } + // Log schema change + String newFeaturesStr = "New features: " + newFeatures.stream().map(FeatureGroupFeatureDTO::getName) + .collect(Collectors.joining(",")); + fsActivityFacade.logMetadataActivity(user, featuregroup, FeaturestoreActivityMeta.FG_ALTERED, newFeaturesStr); + } // finally merge in database onDemandFeaturegroupFacade.updateMetadata(onDemandFeaturegroup); + + } private void updateOnDemandFeatures(OnDemandFeaturegroup onDemandFeaturegroup, @@ -166,7 +228,7 @@ private void updateOnDemandFeatures(OnDemandFeaturegroup onDemandFeaturegroup, } else { onDemandFeaturegroup.getFeatures().add(new OnDemandFeature(onDemandFeaturegroup, feature.getName(), feature.getType(), feature.getDescription(), feature.getPrimary(), - onDemandFeaturegroup.getFeatures().size())); + onDemandFeaturegroup.getFeatures().size(), feature.getDefaultValue())); } } } @@ -189,6 +251,11 @@ public void verifySchemaUnchangedAndValid(Collection previousSc "Primary key or type information of feature " + feature.getName() + " changed. Primary key" + " and type cannot be changed when updating features."); } + if (newFeature.getDefaultValue() != null) { + throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_FEATUREGROUP_UPDATE, Level.FINE, + "Default values for features appended to external feature groups are not supported. Feature: " + + feature.getName() + ", provided default value: " + newFeature.getDefaultValue() + " must be null instead."); + } } } @@ -224,7 +291,7 @@ private List convertOnDemandFeatures(OnDemandFeaturegroupDTO on List features = new ArrayList<>(); for (FeatureGroupFeatureDTO f : onDemandFeaturegroupDTO.getFeatures()) { features.add(new OnDemandFeature(onDemandFeaturegroup, f.getName(), f.getType(), f.getDescription(), - f.getPrimary(), i++)); + f.getPrimary(), i++, f.getDefaultValue())); } return features; } @@ -265,4 +332,26 @@ private FeaturestoreConnector getStorageConnector(Integer connectorId) throws Fe .orElseThrow(() -> new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.CONNECTOR_NOT_FOUND, Level.FINE, "Connector with id: " + connectorId + " was not found")); } + + /** + * Update an external feature group that currently does not support online feature serving, to support it. + * + * @param featurestore the featurestore where the featuregroup resides + * @param featuregroup the featuregroup entity to update + * @param user the user making the request + * @return a DTO of the updated featuregroup + * @throws FeaturestoreException + * @throws SQLException + */ + public void enableFeatureGroupOnline(Featurestore featurestore, Featuregroup featuregroup, + Project project, Users user) + throws FeaturestoreException, SQLException, ServiceException, KafkaException, SchemaException, ProjectException, + UserException, IOException, HopsSecurityException { + List features = getFeaturesDTO(featuregroup); + if(!featuregroup.isOnlineEnabled()) { + onlineFeatureGroupController.setupOnlineFeatureGroup(featurestore, featuregroup, features, project, user); + } + featuregroup.setOnlineEnabled(true); + featureGroupFacade.updateFeaturegroupMetadata(featuregroup); + } } diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/ondemand/OnDemandFeaturegroupDTO.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/ondemand/OnDemandFeaturegroupDTO.java index 0af740efa3..3f3eb93fe9 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/ondemand/OnDemandFeaturegroupDTO.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/ondemand/OnDemandFeaturegroupDTO.java @@ -22,10 +22,8 @@ import io.hops.hopsworks.common.featurestore.storageconnectors.FeaturestoreStorageConnectorDTO; import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.Featuregroup; import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.ondemand.OnDemandDataFormat; -import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.ondemand.OnDemandFeature; import javax.xml.bind.annotation.XmlRootElement; -import java.util.Comparator; import java.util.List; import java.util.stream.Collectors; @@ -54,7 +52,8 @@ public OnDemandFeaturegroupDTO() { } public OnDemandFeaturegroupDTO(String featureStoreName, Featuregroup featuregroup, - FeaturestoreStorageConnectorDTO storageConnectorDTO) { + FeaturestoreStorageConnectorDTO storageConnectorDTO, + List features, String onlineTopicName) { super(featuregroup); this.query = featuregroup.getOnDemandFeaturegroup().getQuery(); this.storageConnector = storageConnectorDTO; @@ -67,11 +66,9 @@ public OnDemandFeaturegroupDTO(String featureStoreName, Featuregroup featuregrou setFeaturestoreName(featureStoreName); setDescription(featuregroup.getOnDemandFeaturegroup().getDescription()); - setFeatures(featuregroup.getOnDemandFeaturegroup().getFeatures().stream() - .sorted(Comparator.comparing(OnDemandFeature::getIdx)) - .map(fgFeature -> - new FeatureGroupFeatureDTO(fgFeature.getName(), fgFeature.getType(), fgFeature.getDescription(), - featuregroup.getId(), fgFeature.getPrimary())).collect(Collectors.toList())); + setFeatures(features); + setOnlineEnabled(featuregroup.isOnlineEnabled()); + setOnlineTopicName(onlineTopicName); } public OnDemandFeaturegroupDTO(Featuregroup featuregroup, FeaturestoreStorageConnectorDTO storageConnectorDTO) { diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/online/OnlineFeaturegroupController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/online/OnlineFeaturegroupController.java index 68119e1c94..c452373678 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/online/OnlineFeaturegroupController.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/online/OnlineFeaturegroupController.java @@ -151,7 +151,7 @@ public void setupOnlineFeatureGroup(Featurestore featureStore, Featuregroup feat public void setupOnlineFeatureGroup(Featurestore featureStore, Integer featureGroupId, String featureGroupName, Integer featureGroupVersion, List features, Project project, Users user) - throws KafkaException, SchemaException, ProjectException, UserException, FeaturestoreException, SQLException, + throws KafkaException, SchemaException, ProjectException, FeaturestoreException, SQLException, IOException, HopsSecurityException, ServiceException { // check if onlinefs user is part of project if (project.getProjectTeamCollection().stream().noneMatch(pt -> @@ -177,7 +177,7 @@ public void setupOnlineFeatureGroup(Featurestore featureStore, Integer featureGr // The topic schema is also registered so it's available both for the hsfs library and for the collector public void createFeatureGroupKafkaTopic(Project project, String featureGroupEntityName, String topicName, List features) - throws KafkaException, SchemaException, ProjectException, UserException, FeaturestoreException { + throws KafkaException, SchemaException, FeaturestoreException { String avroSchema = avroSchemaConstructorController .constructSchema(featureGroupEntityName, Utils.getFeaturestoreName(project), features); @@ -220,7 +220,8 @@ public void alterFeatureGroupSchema(Featuregroup featureGroup, List newFeatures = new ArrayList<>(); if (featuregroupDTO.getFeatures() != null) { cachedFeaturegroupController.verifyPreviousSchemaUnchanged(previousSchema, featuregroupDTO.getFeatures()); - newFeatures = cachedFeaturegroupController.verifyAndGetNewFeatures(previousSchema, featuregroupDTO.getFeatures()); + newFeatures = featureGroupInputValidation.verifyAndGetNewFeatures(previousSchema, featuregroupDTO.getFeatures()); } // change table description @@ -242,7 +214,7 @@ public void updateMetadata(Project project, Users user, Featuregroup featuregrou if (!newFeatures.isEmpty()) { offlineFeatureGroupController.alterHiveTableFeatures( featuregroup.getFeaturestore(), tableName, newFeatures, project, user); - if (featuregroup.getStreamFeatureGroup().isOnlineEnabled()) { + if (featuregroup.isOnlineEnabled()) { onlineFeaturegroupController.alterOnlineFeatureGroupSchema( featuregroup, newFeatures, featuregroupDTO.getFeatures(), project, user); } else { diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/stream/StreamFeatureGroupDTO.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/stream/StreamFeatureGroupDTO.java index 019ebf6dfc..9b5eef1c4f 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/stream/StreamFeatureGroupDTO.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/stream/StreamFeatureGroupDTO.java @@ -30,10 +30,9 @@ public class StreamFeatureGroupDTO extends FeaturegroupDTO { private TimeTravelFormat timeTravelFormat = TimeTravelFormat.HUDI; private DeltaStreamerJobConf deltaStreamerJobConf; - private Boolean onlineEnabled = true; - + private List parents; - + public StreamFeatureGroupDTO() { super(); } @@ -41,47 +40,38 @@ public StreamFeatureGroupDTO() { public StreamFeatureGroupDTO(Featuregroup featuregroup) { super(featuregroup); } - + public DeltaStreamerJobConf getDeltaStreamerJobConf() { return deltaStreamerJobConf; } - + public void setDeltaStreamerJobConf( DeltaStreamerJobConf deltaStreamerJobConf) { this.deltaStreamerJobConf = deltaStreamerJobConf; } - - public Boolean getOnlineEnabled() { - return onlineEnabled; - } - - public void setOnlineEnabled(Boolean onlineEnabled) { - this.onlineEnabled = onlineEnabled; - } - + public TimeTravelFormat getTimeTravelFormat() { return timeTravelFormat; } - + public void setTimeTravelFormat( TimeTravelFormat timeTravelFormat) { this.timeTravelFormat = timeTravelFormat; } - + public List getParents() { return parents; } - + public void setParents(List parents) { this.parents = parents; } - + @Override public String toString() { return "StreamFeatureGroupDTO{" + "timeTravelFormat=" + timeTravelFormat + ", deltaStreamerJobConf=" + deltaStreamerJobConf + - ", onlineEnabled=" + onlineEnabled + ", parentFeatureGroups =" + Objects.toString(parents) + '}'; } diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/query/ConstructorController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/query/ConstructorController.java index bf987b4683..2ddec6368c 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/query/ConstructorController.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/query/ConstructorController.java @@ -113,12 +113,8 @@ public FsQueryDTO construct(Query query, boolean pitEnabled, boolean isTrainingD fsQueryDTO.setQuery(makeOfflineQuery(query)); fsQueryDTO.setHudiCachedFeatureGroups(getHudiAliases(query)); fsQueryDTO.setOnDemandFeatureGroups(getOnDemandAliases(user, project, query)); - - // if on-demand feature groups are involved in the query, we don't support online queries - if (fsQueryDTO.getOnDemandFeatureGroups().isEmpty()) { - fsQueryDTO.setQueryOnline( - generateSQL(query, true).toSqlString(new SparkSqlDialect(SqlDialect.EMPTY_CONTEXT)).getSql()); - } + fsQueryDTO.setQueryOnline( + generateSQL(query, true).toSqlString(new SparkSqlDialect(SqlDialect.EMPTY_CONTEXT)).getSql()); if (pitEnabled) { fsQueryDTO.setPitQuery(makePitQuery(query, isTrainingDataset)); @@ -277,7 +273,16 @@ private SqlNode generateCachedTableNode(Query query, boolean online) { return SqlStdOperatorTable.AS.createCall(asNodeList); } - private SqlNode generateOnDemandTableNode(Query query) { + private SqlNode generateOnDemandTableNode(Query query, boolean online) { + if (online) { + List tableIdentifierStr = new ArrayList<>(); + tableIdentifierStr.add("`" + query.getProject() + "`"); + tableIdentifierStr.add("`" + query.getFeaturegroup().getName() + "_" + query.getFeaturegroup().getVersion() + + "`"); + SqlNodeList asNodeList = new SqlNodeList(Arrays.asList(new SqlIdentifier(tableIdentifierStr, SqlParserPos.ZERO), + new SqlIdentifier("`" + query.getAs() + "`", SqlParserPos.ZERO)), SqlParserPos.ZERO); + return SqlStdOperatorTable.AS.createCall(asNodeList); + } return new SqlIdentifier("`" + query.getAs() + "`", SqlParserPos.ZERO); } @@ -292,7 +297,7 @@ public SqlNode generateTableNode(Query query, boolean online) { if (query.getFeaturegroup().getFeaturegroupType() != FeaturegroupType.ON_DEMAND_FEATURE_GROUP) { return generateCachedTableNode(query, online); } else { - return generateOnDemandTableNode(query); + return generateOnDemandTableNode(query, online); } } diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/security/QuotasEnforcement.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/security/QuotasEnforcement.java index 5020eddaf3..5ded8a8b2d 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/security/QuotasEnforcement.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/security/QuotasEnforcement.java @@ -29,9 +29,6 @@ import io.hops.hopsworks.exceptions.ServingException; import io.hops.hopsworks.persistence.entity.featurestore.Featurestore; import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.Featuregroup; -import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.cached.CachedFeaturegroup; -import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.ondemand.OnDemandFeaturegroup; -import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.stream.StreamFeatureGroup; import io.hops.hopsworks.persistence.entity.featurestore.trainingdataset.TrainingDataset; import io.hops.hopsworks.persistence.entity.jobs.history.Execution; import io.hops.hopsworks.persistence.entity.project.Project; @@ -200,21 +197,11 @@ private void enforceFeaturegroupsQuotaInternal(Featurestore featurestore, List { - CachedFeaturegroup cfg = fg.getCachedFeaturegroup(); - if (cfg != null) { - return cfg.isOnlineEnabled() == online; - } - StreamFeatureGroup sfg = fg.getStreamFeatureGroup(); - if (sfg != null) { - return sfg.isOnlineEnabled() == online; - } - OnDemandFeaturegroup dfg = fg.getOnDemandFeaturegroup(); - if (dfg != null) { - // External Feature Groups do not count + if (!online && fg.getOnDemandFeaturegroup() != null) { + // for backwards compatability we don't count online disabled on-demand feature group return false; } - // Failsafe, anything else (?) will count regardless - return true; + return fg.isOnlineEnabled() == online; }).count(); LOGGER.log(Level.FINE, "Enforcing quotas for online " + typeForException + " feature groups. Current number of feature groups:" + diff --git a/hopsworks-common/src/test/io/hops/hopsworks/common/featurestore/featuregroup/TestFeatureGroupInputValidation.java b/hopsworks-common/src/test/io/hops/hopsworks/common/featurestore/featuregroup/TestFeatureGroupInputValidation.java deleted file mode 100644 index e7d64542a3..0000000000 --- a/hopsworks-common/src/test/io/hops/hopsworks/common/featurestore/featuregroup/TestFeatureGroupInputValidation.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * This file is part of Hopsworks - * Copyright (C) 2022, Hopsworks AB. All rights reserved - * - * Hopsworks is free software: you can redistribute it and/or modify it under the terms of - * the GNU Affero General Public License as published by the Free Software Foundation, - * either version 3 of the License, or (at your option) any later version. - * - * Hopsworks is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; - * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR - * PURPOSE. See the GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License along with this program. - * If not, see . - */ - -package io.hops.hopsworks.common.featurestore.featuregroup; - -import io.hops.hopsworks.common.featurestore.feature.FeatureGroupFeatureDTO; -import io.hops.hopsworks.common.featurestore.featuregroup.cached.CachedFeaturegroupDTO; -import io.hops.hopsworks.exceptions.FeaturestoreException; -import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.cached.TimeTravelFormat; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; - -import java.util.ArrayList; -import java.util.List; - -public class TestFeatureGroupInputValidation { - - private FeatureGroupInputValidation featureGroupInputValidation; - - @Rule - public ExpectedException thrown = ExpectedException.none(); - - @Before - public void setup() { - featureGroupInputValidation = new FeatureGroupInputValidation(); - } - - @Test - public void testVerifyUserInputFeatureGroup() throws Exception { - CachedFeaturegroupDTO featuregroupDTO = new CachedFeaturegroupDTO(); - featuregroupDTO.setTimeTravelFormat(TimeTravelFormat.HUDI); - - // timestamp type camel case - List newSchema = new ArrayList<>(); - newSchema.add(new FeatureGroupFeatureDTO("part_param", "Integer", "", true, false)); - newSchema.add(new FeatureGroupFeatureDTO("part_param2", "String", "", false , false)); - newSchema.add(new FeatureGroupFeatureDTO("part_param3", "Timestamp", "", false , true)); - featuregroupDTO.setFeatures(newSchema); - thrown.expect(FeaturestoreException.class); - featureGroupInputValidation.verifyPartitionKeySupported(featuregroupDTO); - } - -} diff --git a/hopsworks-common/src/test/io/hops/hopsworks/common/featurestore/featuregroup/cached/TestCachedFeatureGroupController.java b/hopsworks-common/src/test/io/hops/hopsworks/common/featurestore/featuregroup/cached/TestCachedFeatureGroupController.java index 0da92cb3fb..1ee85ddb44 100644 --- a/hopsworks-common/src/test/io/hops/hopsworks/common/featurestore/featuregroup/cached/TestCachedFeatureGroupController.java +++ b/hopsworks-common/src/test/io/hops/hopsworks/common/featurestore/featuregroup/cached/TestCachedFeatureGroupController.java @@ -137,39 +137,6 @@ public void testVerifyPreviousSchemaUnchangedIfTypeChanged() throws Exception { cachedFeaturegroupController.verifyPreviousSchemaUnchanged(features, newSchema); } - @Test - public void testVerifyAndGetNewFeaturesIfPrimary() throws Exception { - List newSchema = new ArrayList<>(); - newSchema.add(new FeatureGroupFeatureDTO("part_param", "Integer", "", true, false)); - newSchema.add(new FeatureGroupFeatureDTO("part_param2", "String", "", false , false)); - newSchema.add(new FeatureGroupFeatureDTO("part_param3", "String", "", true , false)); - - thrown.expect(FeaturestoreException.class); - cachedFeaturegroupController.verifyAndGetNewFeatures(features, newSchema); - } - - @Test - public void testVerifyAndGetNewFeaturesIfPartition() throws Exception { - List newSchema = new ArrayList<>(); - newSchema.add(new FeatureGroupFeatureDTO("part_param", "Integer", "", true, false)); - newSchema.add(new FeatureGroupFeatureDTO("part_param2", "String", "", false , false)); - newSchema.add(new FeatureGroupFeatureDTO("part_param3", "String", "", false , true)); - - thrown.expect(FeaturestoreException.class); - cachedFeaturegroupController.verifyAndGetNewFeatures(features, newSchema); - } - - @Test - public void testVerifyAndGetNewFeaturesIfMissingType() throws Exception { - List newSchema = new ArrayList<>(); - newSchema.add(new FeatureGroupFeatureDTO("part_param", "Integer", "", true, false)); - newSchema.add(new FeatureGroupFeatureDTO("part_param2", "String", "", false , false)); - newSchema.add(new FeatureGroupFeatureDTO("part_param3", null, "", false , false)); - - thrown.expect(FeaturestoreException.class); - cachedFeaturegroupController.verifyAndGetNewFeatures(features, newSchema); - } - @Test public void testVerifyPrimaryKeyHudi_cachedFeatureGroup() throws Exception { List newSchema = new ArrayList<>(); diff --git a/hopsworks-common/src/test/io/hops/hopsworks/common/featurestore/utils/TestFeatureGroupInputValidation.java b/hopsworks-common/src/test/io/hops/hopsworks/common/featurestore/utils/TestFeatureGroupInputValidation.java index 34cdf10305..18e96f3714 100644 --- a/hopsworks-common/src/test/io/hops/hopsworks/common/featurestore/utils/TestFeatureGroupInputValidation.java +++ b/hopsworks-common/src/test/io/hops/hopsworks/common/featurestore/utils/TestFeatureGroupInputValidation.java @@ -20,6 +20,7 @@ import io.hops.hopsworks.common.featurestore.featuregroup.FeatureGroupInputValidation; import io.hops.hopsworks.common.featurestore.featuregroup.cached.CachedFeaturegroupDTO; import io.hops.hopsworks.exceptions.FeaturestoreException; +import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.cached.TimeTravelFormat; import org.apache.commons.lang.StringUtils; import org.junit.Before; import org.junit.Rule; @@ -125,4 +126,52 @@ public void testVerifyFeatureGroupFeatureList_type() throws Exception { featureGroupInputValidation.verifyFeatureGroupFeatureList(featureList); } + + @Test + public void testVerifyUserInputFeatureGroup() throws Exception { + CachedFeaturegroupDTO featuregroupDTO = new CachedFeaturegroupDTO(); + featuregroupDTO.setTimeTravelFormat(TimeTravelFormat.HUDI); + + // timestamp type camel case + List newSchema = new ArrayList<>(); + newSchema.add(new FeatureGroupFeatureDTO("part_param", "Integer", "", true, false)); + newSchema.add(new FeatureGroupFeatureDTO("part_param2", "String", "", false , false)); + newSchema.add(new FeatureGroupFeatureDTO("part_param3", "Timestamp", "", false , true)); + featuregroupDTO.setFeatures(newSchema); + thrown.expect(FeaturestoreException.class); + featureGroupInputValidation.verifyPartitionKeySupported(featuregroupDTO); + } + + @Test + public void testVerifyAndGetNewFeaturesIfPrimary() throws Exception { + List newSchema = new ArrayList<>(); + newSchema.add(new FeatureGroupFeatureDTO("part_param", "Integer", "", true, false)); + newSchema.add(new FeatureGroupFeatureDTO("part_param2", "String", "", false , false)); + newSchema.add(new FeatureGroupFeatureDTO("part_param3", "String", "", true , false)); + + thrown.expect(FeaturestoreException.class); + featureGroupInputValidation.verifyAndGetNewFeatures(features, newSchema); + } + + @Test + public void testVerifyAndGetNewFeaturesIfPartition() throws Exception { + List newSchema = new ArrayList<>(); + newSchema.add(new FeatureGroupFeatureDTO("part_param", "Integer", "", true, false)); + newSchema.add(new FeatureGroupFeatureDTO("part_param2", "String", "", false , false)); + newSchema.add(new FeatureGroupFeatureDTO("part_param3", "String", "", false , true)); + + thrown.expect(FeaturestoreException.class); + featureGroupInputValidation.verifyAndGetNewFeatures(features, newSchema); + } + + @Test + public void testVerifyAndGetNewFeaturesIfMissingType() throws Exception { + List newSchema = new ArrayList<>(); + newSchema.add(new FeatureGroupFeatureDTO("part_param", "Integer", "", true, false)); + newSchema.add(new FeatureGroupFeatureDTO("part_param2", "String", "", false , false)); + newSchema.add(new FeatureGroupFeatureDTO("part_param3", null, "", false , false)); + + thrown.expect(FeaturestoreException.class); + featureGroupInputValidation.verifyAndGetNewFeatures(features, newSchema); + } } diff --git a/hopsworks-common/src/test/io/hops/hopsworks/common/security/TestQuotasEnforcement.java b/hopsworks-common/src/test/io/hops/hopsworks/common/security/TestQuotasEnforcement.java index d072147478..720f7652dd 100644 --- a/hopsworks-common/src/test/io/hops/hopsworks/common/security/TestQuotasEnforcement.java +++ b/hopsworks-common/src/test/io/hops/hopsworks/common/security/TestQuotasEnforcement.java @@ -51,16 +51,21 @@ public void testQuotasOnlineEnabledFeaturegroups() throws Exception { FeaturegroupFacade featuregroupFacade = Mockito.mock(FeaturegroupFacade.class); List mockedOnlineEnabledFeaturegroups = new ArrayList<>(); CachedFeaturegroup onlineCachedFG = new CachedFeaturegroup(); - onlineCachedFG.setOnlineEnabled(true); StreamFeatureGroup onlineStreamFG = new StreamFeatureGroup(); - onlineStreamFG.setOnlineEnabled(true); + OnDemandFeaturegroup onlineOnDemandFG = new OnDemandFeaturegroup(); Featuregroup fg0 = new Featuregroup(); fg0.setCachedFeaturegroup(onlineCachedFG); + fg0.setOnlineEnabled(true); mockedOnlineEnabledFeaturegroups.add(fg0); Featuregroup fg1 = new Featuregroup(); fg1.setStreamFeatureGroup(onlineStreamFG); + fg1.setOnlineEnabled(true); mockedOnlineEnabledFeaturegroups.add(fg1); + Featuregroup fg2 = new Featuregroup(); + fg2.setOnDemandFeaturegroup(onlineOnDemandFG); + fg2.setOnlineEnabled(true); + mockedOnlineEnabledFeaturegroups.add(fg2); Mockito.when(featuregroupFacade.findByFeaturestore(Mockito.any())).thenReturn(mockedOnlineEnabledFeaturegroups); @@ -77,12 +82,12 @@ public void testQuotasOnlineEnabledFeaturegroups() throws Exception { // Test Online enabled Mockito.when(settings.getQuotasOnlineDisabledFeaturegroups()).thenReturn(100L); - // This time it should go through current: 2 max: 3 - Mockito.when(settings.getQuotasOnlineEnabledFeaturegroups()).thenReturn(3L); + // This time it should go through current: 3 max: 4 + Mockito.when(settings.getQuotasOnlineEnabledFeaturegroups()).thenReturn(4L); qe.enforceFeaturegroupsQuota(fs, true); - // This time it should throw an exception current: 2 max: 2 - Mockito.when(settings.getQuotasOnlineEnabledFeaturegroups()).thenReturn(2L); + // This time it should throw an exception current: 3 max: 3 + Mockito.when(settings.getQuotasOnlineEnabledFeaturegroups()).thenReturn(3L); thrown.expect(QuotaEnforcementException.class); thrown.expectMessage("Online enabled feature groups quota reached"); qe.enforceFeaturegroupsQuota(fs, true); @@ -93,16 +98,21 @@ public void testQuotasOnlineDisabledFeaturegroups() throws Exception { FeaturegroupFacade featuregroupFacade = Mockito.mock(FeaturegroupFacade.class); List mockedOnlineDisabledFeaturegroups = new ArrayList<>(); CachedFeaturegroup offlineCachedFG = new CachedFeaturegroup(); - offlineCachedFG.setOnlineEnabled(false); StreamFeatureGroup offlineStreamFG = new StreamFeatureGroup(); - offlineStreamFG.setOnlineEnabled(false); + OnDemandFeaturegroup offlineOnDemandFG = new OnDemandFeaturegroup(); Featuregroup fg0 = new Featuregroup(); fg0.setCachedFeaturegroup(offlineCachedFG); + fg0.setOnlineEnabled(false); mockedOnlineDisabledFeaturegroups.add(fg0); Featuregroup fg1 = new Featuregroup(); fg1.setStreamFeatureGroup(offlineStreamFG); + fg1.setOnlineEnabled(false); mockedOnlineDisabledFeaturegroups.add(fg1); + Featuregroup fg2 = new Featuregroup(); + fg2.setOnDemandFeaturegroup(offlineOnDemandFG); + fg2.setOnlineEnabled(false); + mockedOnlineDisabledFeaturegroups.add(fg2); Mockito.when(featuregroupFacade.findByFeaturestore(Mockito.any())).thenReturn(mockedOnlineDisabledFeaturegroups); @@ -119,60 +129,18 @@ public void testQuotasOnlineDisabledFeaturegroups() throws Exception { // Test Online disabled Mockito.when(settings.getQuotasOnlineEnabledFeaturegroups()).thenReturn(100L); - // This time it should go through current: 2 max: 3 + // We make sure On-demand/External online disabled FGs are not counted + // It should pass with max: 3 even though current: 3 already Mockito.when(settings.getQuotasOnlineDisabledFeaturegroups()).thenReturn(3L); qe.enforceFeaturegroupsQuota(fs, false); - // This time it should throw an exception current: 2 max: 2 + // This time it should throw an exception current: 3 max: 2 Mockito.when(settings.getQuotasOnlineDisabledFeaturegroups()).thenReturn(2L); thrown.expect(QuotaEnforcementException.class); thrown.expectMessage("Online disabled feature groups quota reached"); qe.enforceFeaturegroupsQuota(fs, false); } - @Test - public void testQuotasIgnoreExternalFeaturegroups() { - boolean noException = true; - try { - FeaturegroupFacade featuregroupFacade = Mockito.mock(FeaturegroupFacade.class); - List mockedOnlineDisabledFeaturegroups = new ArrayList<>(); - CachedFeaturegroup offlineFG = new CachedFeaturegroup(); - offlineFG.setOnlineEnabled(false); - OnDemandFeaturegroup ondemandFG = new OnDemandFeaturegroup(); - - Featuregroup fg0 = new Featuregroup(); - fg0.setCachedFeaturegroup(offlineFG); - mockedOnlineDisabledFeaturegroups.add(fg0); - Featuregroup fg1 = new Featuregroup(); - fg1.setOnDemandFeaturegroup(ondemandFG); - mockedOnlineDisabledFeaturegroups.add(fg1); - - Mockito.when(featuregroupFacade.findByFeaturestore(Mockito.any())).thenReturn(mockedOnlineDisabledFeaturegroups); - - Settings settings = Mockito.mock(Settings.class); - - QuotasEnforcement qe = new QuotasEnforcement(); - qe.setFeaturegroupFacade(featuregroupFacade); - qe.setSettings(settings); - - Featurestore fs = new Featurestore(); - Project project = new Project(); - project.setName("ProjectName"); - fs.setProject(project); - - // Test Online disabled - Mockito.when(settings.getQuotasOnlineEnabledFeaturegroups()).thenReturn(100L); - - // Although quotas has been set to 2 and we do have 2 Feature Groups, - // it should go through because one is External and we should ignore it - Mockito.when(settings.getQuotasOnlineDisabledFeaturegroups()).thenReturn(2L); - qe.enforceFeaturegroupsQuota(fs, false); - } catch (Exception e) { - noException = false; - } - Assert.assertTrue(noException); - } - @Test public void testIgnoreFeaturegroupQuotas() throws Exception { FeaturegroupFacade featuregroupFacade = Mockito.mock(FeaturegroupFacade.class); diff --git a/hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/featurestore/featuregroup/Featuregroup.java b/hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/featurestore/featuregroup/Featuregroup.java index 25b9991577..80641bd6e2 100644 --- a/hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/featurestore/featuregroup/Featuregroup.java +++ b/hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/featurestore/featuregroup/Featuregroup.java @@ -123,6 +123,8 @@ public class Featuregroup implements Serializable { private Integer version; @Column(name = "event_time") private String eventTime; + @Column(name = "online_enabled") + private boolean onlineEnabled; @NotNull @Enumerated(EnumType.ORDINAL) @Column(name = "feature_group_type") @@ -286,6 +288,14 @@ public Collection getValidationReports() { public void setValidationReports(Collection validationReports) { this.validationReports = validationReports; } + + public boolean isOnlineEnabled() { + return onlineEnabled; + } + + public void setOnlineEnabled(boolean onlineEnabled) { + this.onlineEnabled = onlineEnabled; + } @Override public boolean equals(Object o) { @@ -305,6 +315,7 @@ public boolean equals(Object o) { if (!Objects.equals(cachedFeaturegroup, that.cachedFeaturegroup)) return false; if (!Objects.equals(streamFeatureGroup, that.streamFeatureGroup)) return false; if (!Objects.equals(eventTime, that.eventTime)) return false; + if (!Objects.equals(onlineEnabled, that.onlineEnabled)) return false; if (!Objects.equals(expectationSuite, that.expectationSuite)) return false; return Objects.equals(statisticsConfig, that.statisticsConfig); } @@ -323,6 +334,7 @@ public int hashCode() { result = 31 * result + (streamFeatureGroup != null ? streamFeatureGroup.hashCode() : 0); result = 31 * result + (statisticsConfig != null ? statisticsConfig.hashCode() : 0); result = 31 * result + (eventTime != null ? eventTime.hashCode() : 0); + result = 31 * result + (onlineEnabled ? 1: 0); result = 31 * result + (expectationSuite != null ? expectationSuite.hashCode(): 0); return result; } diff --git a/hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/featurestore/featuregroup/cached/CachedFeaturegroup.java b/hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/featurestore/featuregroup/cached/CachedFeaturegroup.java index b57e7a0c67..0905fb0596 100644 --- a/hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/featurestore/featuregroup/cached/CachedFeaturegroup.java +++ b/hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/featurestore/featuregroup/cached/CachedFeaturegroup.java @@ -60,8 +60,6 @@ public class CachedFeaturegroup implements Serializable { private Integer id; @JoinColumn(name = "offline_feature_group", referencedColumnName = "TBL_ID") private HiveTbls hiveTbls; - @Column(name = "online_enabled") - private boolean onlineEnabled; @Basic(optional = false) @NotNull @Enumerated(EnumType.ORDINAL) @@ -90,14 +88,6 @@ public void setId(Integer id) { this.id = id; } - public boolean isOnlineEnabled() { - return onlineEnabled; - } - - public void setOnlineEnabled(boolean onlineEnabled) { - this.onlineEnabled = onlineEnabled; - } - public TimeTravelFormat getTimeTravelFormat() { return timeTravelFormat; } diff --git a/hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/featurestore/featuregroup/ondemand/OnDemandFeature.java b/hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/featurestore/featuregroup/ondemand/OnDemandFeature.java index e83cd6c5c5..cb90df51a0 100644 --- a/hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/featurestore/featuregroup/ondemand/OnDemandFeature.java +++ b/hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/featurestore/featuregroup/ondemand/OnDemandFeature.java @@ -59,6 +59,8 @@ public class OnDemandFeature implements Serializable { @Basic(optional = false) @Column(name = "idx") private Integer idx; + @Column(name = "default_value") + private String defaultValue; public static long getSerialVersionUID() { return serialVersionUID; @@ -67,13 +69,14 @@ public static long getSerialVersionUID() { public OnDemandFeature() {} public OnDemandFeature(OnDemandFeaturegroup onDemandFeaturegroup, String name, String type, - String description, Boolean primary, Integer idx) { + String description, Boolean primary, Integer idx, String defaultValue) { this.onDemandFeaturegroup = onDemandFeaturegroup; this.name = name; this.type = type; this.description = description; this.primary = primary; this.idx = idx; + this.defaultValue = defaultValue; } public Integer getId() { @@ -132,6 +135,14 @@ public void setIdx(Integer idx) { this.idx = idx; } + public String getDefaultValue() { + return defaultValue; + } + + public void setDefaultValue(String defaultValue) { + this.defaultValue = defaultValue; + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -144,6 +155,7 @@ public boolean equals(Object o) { if (!Objects.equals(name, that.name)) return false; if (!Objects.equals(type, that.type)) return false; if (!Objects.equals(idx, that.idx)) return false; + if (!Objects.equals(defaultValue, that.defaultValue)) return false; return Objects.equals(primary, that.primary); } @@ -155,6 +167,7 @@ public int hashCode() { result = 31 * result + (type != null ? type.hashCode() : 0); result = 31 * result + (primary != null ? primary.hashCode() : 0); result = 31 * result + (idx != null ? idx.hashCode() : 0); + result = 31 * result + (defaultValue != null ? defaultValue.hashCode() : 0); return result; } } diff --git a/hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/featurestore/featuregroup/stream/StreamFeatureGroup.java b/hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/featurestore/featuregroup/stream/StreamFeatureGroup.java index 2a28c9edbb..adb9065e30 100644 --- a/hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/featurestore/featuregroup/stream/StreamFeatureGroup.java +++ b/hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/featurestore/featuregroup/stream/StreamFeatureGroup.java @@ -62,8 +62,6 @@ public class StreamFeatureGroup implements Serializable { private Collection featuresExtraConstraints; @OneToMany(cascade = CascadeType.ALL, mappedBy = "streamFeatureGroup") private Collection cachedFeatures; - @Column(name = "online_enabled") - private boolean onlineEnabled; public StreamFeatureGroup() {}; @@ -99,14 +97,6 @@ public void setFeaturesExtraConstraints(Collection