Skip to content

Commit

Permalink
[FSTORE-743] Add online support for External Feature Groups (logicalc…
Browse files Browse the repository at this point in the history
  • Loading branch information
moritzmeister authored and moritzmeister committed Apr 6, 2023
1 parent 14dd156 commit a06ddb0
Show file tree
Hide file tree
Showing 26 changed files with 690 additions and 483 deletions.
278 changes: 278 additions & 0 deletions hopsworks-IT/src/test/ruby/spec/featuregroup_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1949,6 +1949,284 @@
expect(parsed_json["errorCode"]).to eql(270114)
end

it "should be able to create an online enabled on-demand/external feature group" do
project = get_project
featurestore_id = get_featurestore_id(project.id)
connector_id = get_jdbc_connector_id
json_result, featuregroup_name = create_on_demand_featuregroup(project.id, featurestore_id, connector_id,
online_enabled: true)
parsed_json = JSON.parse(json_result)
expect_status_details(201)
expect(parsed_json["onlineEnabled"]).to be true

topic_name = project.id.to_s + "_" + parsed_json["id"].to_s + "_" + featuregroup_name + "_" +
parsed_json["version"].to_s + "_onlinefs"
get_project_topics(project.id)
expect_status_details(200)
topic = json_body[:items].select{|topic| topic[:name] == topic_name}
expect(topic.length).to eq(1)
get_subject_schema(project, topic[0][:name], 1)
expect_status_details(200)
end

it "should be able to enable an on-demand/external feature group online after it was created offline only" do
project = get_project
featurestore_id = get_featurestore_id(project.id)
connector_id = get_jdbc_connector_id
json_result, featuregroup_name = create_on_demand_featuregroup(project.id, featurestore_id, connector_id,
online_enabled: false)
parsed_json = JSON.parse(json_result)
expect_status_details(201)
expect(parsed_json["onlineEnabled"]).to be false

parsed_json["onlineEnabled"] = true

update_featuregroup_metadata_endpoint = "#{ENV['HOPSWORKS_API']}/project/" + project.id.to_s +
"/featurestores/" + featurestore_id.to_s + "/featuregroups/" + parsed_json["id"].to_s +
"?enableOnline=true"

json_data = parsed_json.to_json
json_result = put update_featuregroup_metadata_endpoint, json_data
parsed_json = JSON.parse(json_result)
expect_status_details(200)
expect(parsed_json["onlineEnabled"]).to be true

topic_name = project.id.to_s + "_" + parsed_json["id"].to_s + "_" + featuregroup_name + "_" +
parsed_json["version"].to_s + "_onlinefs"
get_project_topics(project.id)
expect_status_details(200)
topic = json_body[:items].select{|topic| topic[:name] == topic_name}
expect(topic.length).to eq(1)
get_subject_schema(project, topic[0][:name], 1)
expect_status_details(200)
end

it "should be able to disable an online on-demand/external feature group after it was created online" do
project = get_project
featurestore_id = get_featurestore_id(project.id)
connector_id = get_jdbc_connector_id
json_result, featuregroup_name = create_on_demand_featuregroup(project.id, featurestore_id, connector_id,
online_enabled: true)
parsed_json = JSON.parse(json_result)
expect_status_details(201)
expect(parsed_json["onlineEnabled"]).to be true

parsed_json["onlineEnabled"] = false

update_featuregroup_metadata_endpoint = "#{ENV['HOPSWORKS_API']}/project/" + project.id.to_s +
"/featurestores/" + featurestore_id.to_s + "/featuregroups/" + parsed_json["id"].to_s +
"?disableOnline=true"

json_data = parsed_json.to_json
json_result = put update_featuregroup_metadata_endpoint, json_data
parsed_json = JSON.parse(json_result)
expect_status_details(200)
expect(parsed_json["onlineEnabled"]).to be false

# topic should still be there as we currently don't delete it
topic_name = project.id.to_s + "_" + parsed_json["id"].to_s + "_" + featuregroup_name + "_" +
parsed_json["version"].to_s + "_onlinefs"
get_project_topics(project.id)
expect_status_details(200)
topic = json_body[:items].select{|topic| topic[:name] == topic_name}
expect(topic.length).to eq(1)
get_subject_schema(project, topic[0][:name], 1)
expect_status_details(200)
end

it "should be possible to preview from online storage of an on-demand/external feature group" do
project = create_project(validate_session: false)
featurestore_id = get_featurestore_id(project.id)
connector_id = get_jdbc_connector_id
json_result, _ = create_on_demand_featuregroup(project.id, featurestore_id, connector_id,
name: "online_fg", online_enabled: true)
parsed_json = JSON.parse(json_result)
expect_status_details(201)
expect(parsed_json["onlineEnabled"]).to be true
featuregroup_id = parsed_json["id"]

# add sample ros
OnlineFg.db_name = project[:projectname]
OnlineFg.create(testfeature: 1).save
OnlineFg.create(testfeature: 2).save

get "#{ENV['HOPSWORKS_API']}/project/" + project.id.to_s + "/featurestores/" + featurestore_id.to_s + "/featuregroups/" + featuregroup_id.to_s + "/preview?storage=online&limit=1"
expect_status_details(200)
parsed_json = JSON.parse(response.body)
expect(parsed_json['items'].length).to eql 1

# should fetch the online feature data if the fg is online enabled and storage not specified
get "#{ENV['HOPSWORKS_API']}/project/#{project.id}/featurestores/#{featurestore_id}/featuregroups/#{featuregroup_id}/preview?&limit=1"
expect_status_details(200)
parsed_json = JSON.parse(response.body)
expect(parsed_json['items'].length).to eql 1
expect(parsed_json['items'][0]['storage']).to eql "ONLINE"
end

it "should not be possible to preview from offline storage for an on-demand/external feature group" do
project = get_project
featurestore_id = get_featurestore_id(project.id)
connector_id = get_jdbc_connector_id
json_result, _ = create_on_demand_featuregroup(project.id, featurestore_id, connector_id,
online_enabled: false)
parsed_json = JSON.parse(json_result)
expect_status_details(201)
expect(parsed_json["onlineEnabled"]).to be false
featuregroup_id = parsed_json["id"]

get "#{ENV['HOPSWORKS_API']}/project/" + project.id.to_s + "/featurestores/" + featurestore_id.to_s +
"/featuregroups/" + featuregroup_id.to_s + "/preview?storage=online&limit=1"
expect_status_details(400)
get "#{ENV['HOPSWORKS_API']}/project/" + project.id.to_s + "/featurestores/" + featurestore_id.to_s +
"/featuregroups/" + featuregroup_id.to_s + "/preview?storage=offline&limit=1"
expect_status_details(400)
end

it "should be possible to generate a online query with online on-demand/external feature groups" do
featurestore_id = get_featurestore_id(@project[:id])
featurestore_name = get_featurestore_name(@project.id)
connector_id = get_jdbc_connector_id
features = [{type: "INT", name: "testfeature", description: "testfeaturedescription", primary: true},
{type: "TIMESTAMP", name: "event_time"}]
json_result, fg_name_on_demand = create_on_demand_featuregroup(@project[:id], featurestore_id, connector_id,
features: features, event_time: "event_time", online_enabled: true)
expect_status_details(201)
parsed_json = JSON.parse(json_result)
fg_ond_id = parsed_json["id"]
fg_ond_type = parsed_json["type"]

features = [{type: "INT", name: "testfeature", description: "testfeaturedescription", primary: true},
{type: "INT", name: "anotherfeature", primary: false},
{type: "TIMESTAMP", name: "event_time"}]
json_result, fg_name = create_cached_featuregroup(@project[:id], featurestore_id, features: features,
event_time: "event_time", online: true )
parsed_json = JSON.parse(json_result)
fg_cached_id = parsed_json["id"]
fg_cached_type = parsed_json["type"]

query = {
leftFeatureGroup: {id: fg_cached_id, type: fg_cached_type, eventTime: "event_time"},
leftFeatures: [{name: 'anotherfeature'}, {name: "event_time"}],
joins: [{query: {
leftFeatureGroup: {id: fg_ond_id, type: fg_ond_type, eventTime: "event_time"},
leftFeatures: [{name: 'testfeature'}]
}}]}
json_result = put "#{ENV['HOPSWORKS_API']}/project/#{@project[:id]}/featurestores/query", query
expect_status_details(200)
query = JSON.parse(json_result)
expect(query.key?("onDemandFeatureGroups")).to be true

expect(query['query']).to eql("SELECT `fg1`.`anotherfeature` `anotherfeature`, `fg1`.`event_time` `event_time`, `fg0`.`testfeature` `testfeature`\n" +
"FROM `#{featurestore_name}`.`#{fg_name}_1` `fg1`\n" +
"INNER JOIN `fg0` ON `fg1`.`testfeature` = `fg0`.`testfeature`")

expect(query['pitQuery']).to eql("WITH right_fg0 AS " +
"(SELECT *\nFROM " +
"(SELECT `fg1`.`anotherfeature` `anotherfeature`, `fg1`.`event_time` `event_time`, `fg1`.`testfeature` `join_pk_testfeature`, `fg1`.`event_time` `join_evt_event_time`, `fg0`.`testfeature` `testfeature`, RANK() OVER (PARTITION BY `fg0`.`testfeature`, `fg1`.`event_time` ORDER BY `fg0`.`event_time` DESC) pit_rank_hopsworks\n" +
"FROM `#{featurestore_name}`.`#{fg_name}_1` `fg1`\n" +
"INNER JOIN `fg0` ON `fg1`.`testfeature` = `fg0`.`testfeature` AND `fg1`.`event_time` >= `fg0`.`event_time`) NA\n" +
"WHERE `pit_rank_hopsworks` = 1) (SELECT `right_fg0`.`anotherfeature` `anotherfeature`, `right_fg0`.`event_time` `event_time`, `right_fg0`.`testfeature` `testfeature`\nFROM right_fg0)")
expect(query["queryOnline"]).to eql("SELECT `fg1`.`anotherfeature` `anotherfeature`, `fg1`.`event_time` `event_time`, `fg0`.`testfeature` `testfeature`\nFROM `#{@project['projectname']}`.`#{fg_name}_1` `fg1`\nINNER JOIN `#{@project['projectname']}`.`#{fg_name_on_demand}_1` `fg0` ON `fg1`.`testfeature` = `fg0`.`testfeature`")
end

it "should be possible to overwrite and clear an online on-demand/external feature group" do
project = get_project
featurestore_id = get_featurestore_id(project.id)
connector_id = get_jdbc_connector_id
json_result, _ = create_on_demand_featuregroup(project.id, featurestore_id, connector_id,
online_enabled: true)
parsed_json = JSON.parse(json_result)
expect_status_details(201)
expect(parsed_json["onlineEnabled"]).to be true
featuregroup_id = parsed_json["id"]
clear_featuregroup_contents_endpoint = "#{ENV['HOPSWORKS_API']}/project/" + project.id.to_s + "/featurestores/" + featurestore_id.to_s + "/featuregroups/" + featuregroup_id.to_s + "/clear"
post clear_featuregroup_contents_endpoint
expect_status_details(200)
end

it "should be possible to append features to an online on-demand/external feature group" do
project = get_project
featurestore_id = get_featurestore_id(project.id)
connector_id = get_jdbc_connector_id
json_result, _ = create_on_demand_featuregroup(project.id, featurestore_id, connector_id,
online_enabled: true)
parsed_json = JSON.parse(json_result)
expect_status_details(201)
expect(parsed_json["onlineEnabled"]).to be true
parsed_json["description"] = "changed description"
parsed_json["features"] = [
{
type: "INT",
name: "testfeature",
description: "testfeaturedescription",
primary: true,
onlineType: "INT",
partition: false
},
{
type: "DOUBLE",
name: "testfeature2",
description: "testfeaturedescription",
primary: false,
onlineType: "DOUBLE",
partition: false,
defaultValue: nil
},
]
update_featuregroup_metadata_endpoint = "#{ENV['HOPSWORKS_API']}/project/" + project.id.to_s +
"/featurestores/" + featurestore_id.to_s + "/featuregroups/" + parsed_json["id"].to_s +
"?updateMetadata=true"

json_data = parsed_json.to_json
json_result = put update_featuregroup_metadata_endpoint, json_data
parsed_json = JSON.parse(json_result)
expect_status_details(200)
expect(parsed_json["features"].length).to be 2
expect(parsed_json["description"]).to eql("changed description")
expect(parsed_json["features"].select{ |f| f["name"] == "testfeature"}.first["defaultValue"]).to be nil
expect(parsed_json["features"].select{ |f| f["name"] == "testfeature2"}.first["defaultValue"]).to be nil
expect(parsed_json["onlineEnabled"]).to be true
end

it "should not be possible to append features to an online on-demand/external feature group with default value" do
project = get_project
featurestore_id = get_featurestore_id(project.id)
connector_id = get_jdbc_connector_id
json_result, _ = create_on_demand_featuregroup(project.id, featurestore_id, connector_id,
online_enabled: true)
parsed_json = JSON.parse(json_result)
expect_status_details(201)
expect(parsed_json["onlineEnabled"]).to be true
parsed_json["description"] = "changed description"
parsed_json["features"] = [
{
type: "INT",
name: "testfeature",
description: "testfeaturedescription",
primary: true,
onlineType: "INT",
partition: false
},
{
type: "DOUBLE",
name: "testfeature2",
description: "testfeaturedescription",
primary: false,
onlineType: "DOUBLE",
partition: false,
defaultValue: "10.0"
},
]
update_featuregroup_metadata_endpoint = "#{ENV['HOPSWORKS_API']}/project/" + project.id.to_s +
"/featurestores/" + featurestore_id.to_s + "/featuregroups/" + parsed_json["id"].to_s +
"?updateMetadata=true"

json_data = parsed_json.to_json
json_result = put update_featuregroup_metadata_endpoint, json_data
parsed_json = JSON.parse(json_result)
expect_status_details(200)
end

it "should be able to generate a query with only on-demand feature group" do
featurestore_id = get_featurestore_id(@project[:id])
connector_id = get_jdbc_connector_id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,8 @@ def backfill_stream_featuregroup(featurestore_id, featuregroup_id, featuregroup_
end

def create_on_demand_featuregroup(project_id, featurestore_id, jdbcconnectorId, name: nil, version: 1, query: nil,
features: nil, data_format: nil, options: nil, event_time: nil)
features: nil, data_format: nil, options: nil, event_time: nil,
online_enabled: false)
type = "onDemandFeaturegroupDTO"
featuregroupType = "ON_DEMAND_FEATURE_GROUP"
create_featuregroup_endpoint = "#{ENV['HOPSWORKS_API']}/project/#{project_id}/featurestores/#{featurestore_id}/featuregroups"
Expand All @@ -224,7 +225,8 @@ def create_on_demand_featuregroup(project_id, featurestore_id, jdbcconnectorId,
},
query: query,
featuregroupType: featuregroupType,
eventTime: event_time
eventTime: event_time,
onlineEnabled: online_enabled
}

unless data_format == nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,9 @@
import io.hops.hopsworks.jwt.annotation.JWTRequired;
import io.hops.hopsworks.persistence.entity.featurestore.Featurestore;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.Featuregroup;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.FeaturegroupType;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.persistence.entity.user.Users;
import io.hops.hopsworks.persistence.entity.user.security.apiKey.ApiScope;
import io.hops.hopsworks.restutils.RESTCodes;
import io.swagger.annotations.ApiOperation;

import javax.ejb.EJB;
Expand All @@ -49,7 +47,6 @@
import javax.ws.rs.core.Response;
import javax.ws.rs.core.SecurityContext;
import javax.ws.rs.core.UriInfo;
import java.util.logging.Level;

@RequestScoped
@TransactionAttribute(TransactionAttributeType.NEVER)
Expand Down Expand Up @@ -102,19 +99,11 @@ public Response getPreview(@BeanParam FeatureGroupPreviewBeanParam featureGroupP
"Row limit should greater than 0 and lower than: " + settings.getFGPreviewLimit());
}

// validate feature group type (we can only return data preview for cached feature groups)
if (featuregroup.getFeaturegroupType() == FeaturegroupType.ON_DEMAND_FEATURE_GROUP) {
throw new FeaturestoreException(
RESTCodes.FeaturestoreErrorCode.PREVIEW_NOT_SUPPORTED_FOR_ON_DEMAND_FEATUREGROUPS,
Level.FINE, "featuregroupId: " + featuregroup.getId());
}

// set online flag. if the user doesn't provide the storage flag and the feature group
// is available online, return the data from the online feature store as it's faster.
boolean online;
if (featureGroupPreviewBeanParam.getStorage() == null) {
online = (featuregroup.getStreamFeatureGroup() != null && featuregroup.getStreamFeatureGroup().isOnlineEnabled())
|| (featuregroup.getCachedFeaturegroup() != null && featuregroup.getCachedFeaturegroup().isOnlineEnabled());
online = featuregroup.isOnlineEnabled();
} else {
online = featureGroupPreviewBeanParam.getStorage().equals(FeatureGroupStorage.ONLINE);
}
Expand Down
Loading

0 comments on commit a06ddb0

Please sign in to comment.