Skip to content

Commit

Permalink
[FSTORE-1263] Add pagination to backend endpoints for FeatureGroups a…
Browse files Browse the repository at this point in the history
…nd StorageConnectors (#1732)

* FG pagination

* SC pagination

* FeatureGroup pagination

* fg

* sc pagination

* minor changes

* add setOffsetAndLim

* some fixes

* filter fix

* fg test

* fv test

* fv test fix

* sc tests

* test fix

* fix fg statistics test

* undo changes in FeaturestoreConnector

* test fixes
  • Loading branch information
bubriks committed Apr 4, 2024
1 parent 5ca666a commit 8adf966
Show file tree
Hide file tree
Showing 20 changed files with 996 additions and 108 deletions.
155 changes: 135 additions & 20 deletions hopsworks-IT/src/test/ruby/spec/featuregroup_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1065,7 +1065,7 @@
expect(parsed_json["items"][1]["commitID"]).to eql(1603577485000)
end

it "should be able to retrieve commit timeline from existing hudi enabled offline cached featuregroup without providing any parameters" do
it "should be able to retrieve commit timeline from existing hudi enabled offline cached featuregroup without providing any parameters" do
featurestore_id = get_featurestore_id(@project[:id])
json_result, featuregroup_name = create_cached_featuregroup_with_partition(@project[:id], featurestore_id, time_travel_format: "HUDI")
parsed_json = JSON.parse(json_result)
Expand All @@ -1076,7 +1076,7 @@

json_result = get "#{ENV['HOPSWORKS_API']}/project/#{@project[:id]}/featurestores/#{featurestore_id}/featuregroups/#{featuregroup_id}/commits"
expect_status_details(200)
parsed_json = JSON.parse(json_result)
parsed_json = JSON.parse(json_result)
expect(parsed_json["items"][0]["commitID"]).to eql(1603577485000)
end

Expand Down Expand Up @@ -1208,32 +1208,33 @@
system_time_1st_statistic_commit = (Time.now.to_f * 1000).to_i
json_data = {
items:[],
windowEndTime: nil,
commitTime:system_time_1st_statistic_commit,
windowEndCommitTime: nil,
computationTime:system_time_1st_statistic_commit,
featureDescriptiveStatistics: feature_descriptive_statistics,
rowPercentage: 1.0
}

json_data_str = json_data.to_json
_ = post create_statistic_endpoint, json_data_str
post create_statistic_endpoint, json_data_str
expect_status_details(200)

json_result = get "#{ENV['HOPSWORKS_API']}/project/#{@project[:id]}/featurestores/#{featurestore_id}/featuregroups/#{featuregroup_id}/statistics?fields=content&sort_by=commit_time%3Adesc&offset=0&limit=1"
parsed_json = JSON.parse(json_result)
expect(parsed_json["items"].first["windowStartTime"]).to eql(1603577485000) # first fg commit
expect(parsed_json["items"].first["windowEndTime"]).to eql(1603650176000)
expect(parsed_json["items"].first["commitTime"]).to eql(system_time_1st_statistic_commit)
expect(parsed_json["items"].first["windowStartCommitTime"]).to eql(0) # first fg commit
expect(parsed_json["items"].first["windowEndCommitTime"]).to eql(1603650176000)
expect(parsed_json["items"].first["computationTime"]).to eql(system_time_1st_statistic_commit)

system_time_2nd_statistic_commit = (Time.now.to_f * 1000).to_i
json_data[:windowEndTime] = 1603650176000
json_data[:commitTime] = system_time_2nd_statistic_commit
json_data[:windowEndCommitTime] = 1603650176000
json_data[:computationTime] = system_time_2nd_statistic_commit
json_data_str = json_data.to_json
_ = post create_statistic_endpoint, json_data_str

json_result = get "#{ENV['HOPSWORKS_API']}/project/#{@project[:id]}/featurestores/#{featurestore_id}/featuregroups/#{featuregroup_id}/statistics?fields=content&sort_by=commit_time%3Adesc&offset=0&limit=1"
parsed_json = JSON.parse(json_result)
expect(parsed_json["items"].first["windowStartTime"]).to eql(1603577485000) # first fg commit
expect(parsed_json["items"].first["windowEndTime"]).to eql(1603650176000) # same commit, no stats are recomputed
expect(parsed_json["items"].first["commitTime"]).to eql(system_time_1st_statistic_commit) # therefore, system_time_1st_statistic_commit
expect(parsed_json["items"].first["windowStartCommitTime"]).to eql(0) # first fg commit
expect(parsed_json["items"].first["windowEndCommitTime"]).to eql(1603650176000) # same commit, no stats are recomputed
expect(parsed_json["items"].first["computationTime"]).to eql(system_time_1st_statistic_commit) # therefore, system_time_1st_statistic_commit
end

it "should be able to delete a feature group with 500 commits" do
Expand Down Expand Up @@ -1873,7 +1874,7 @@
data_format: "CSV")
parsed_json = JSON.parse(json_result)
expect_status_details(201)
expect(parsed_json.key?("location")).to be true
expect(parsed_json.key?("location")).to be true
expect(parsed_json["query"]).to eql("")
expect(parsed_json["dataFormat"]).to eql("CSV")
end
Expand Down Expand Up @@ -3141,41 +3142,155 @@
context 'with valid project, featurestore service enabled' do
before :all do
with_valid_project

project = get_project
featurestore_id = get_featurestore_id(project.id)

# create some feature groups
create_cached_featuregroup(project.id, featurestore_id)
expect_status_details(201)

create_cached_featuregroup(project.id, featurestore_id)
expect_status_details(201)

create_cached_featuregroup(project.id, featurestore_id, featuregroup_name:"featuregroup1", version:1)
expect_status_details(201)

create_cached_featuregroup(project.id, featurestore_id, featuregroup_name:"featuregroup1", version:2)
expect_status_details(201)

create_cached_featuregroup(project.id, featurestore_id, featuregroup_name:"featuregroup1", version:3)
expect_status_details(201)
end

it "should be able to list all featuregroups of the project's featurestore" do
project = get_project
featurestore_id = get_featurestore_id(project.id)

# no ordering guarantee
get_featuregroups_endpoint = "#{ENV['HOPSWORKS_API']}/project/" + project.id.to_s + "/featurestores/" + featurestore_id.to_s + "/featuregroups"

# get fgs
get get_featuregroups_endpoint
parsed_json = JSON.parse(response.body)
expect_status_details(200)
expect(parsed_json.length == 0).to be true
old_count = parsed_json.length

# create fg
json_result, featuregroup_name = create_cached_featuregroup(project.id, featurestore_id)
expect_status_details(201)

# get fgs
get get_featuregroups_endpoint
parsed_json = JSON.parse(response.body)
expect_status_details(200)
expect(parsed_json.length == 1).to be true
count = parsed_json.length

# ensure that the created fg is in returned fgs
expect(count).to eq(old_count + 1)
expect(parsed_json[0].key?("id")).to be true
expect(parsed_json[0].key?("featurestoreName")).to be true
expect(parsed_json[0].key?("name")).to be true
expect(parsed_json[0]["featurestoreName"] == project.projectname.downcase + "_featurestore").to be true
expect(parsed_json[0]["name"] == featuregroup_name).to be true
expect(parsed_json.any? { |fg| fg["name"] == featuregroup_name }).to be true
end

it "should be able to list all featuregroups of the project's featurestore sorted by name" do
project = get_project
featurestore_id = get_featurestore_id(project.id)

# get fgs
get "#{ENV['HOPSWORKS_API']}/project/" + project.id.to_s + "/featurestores/" + featurestore_id.to_s + "/featuregroups?sort_by=NAME:asc"
expect_status_details(200)
names = json_body.map { |o| "#{o[:name]}" }
sorted_names = names.sort_by(&:downcase)

expect(json_body.length).to eq(6)
expect(names).to eq(sorted_names)
end

it "should be able to list all featuregroups of the project's featurestore sorted by name and version" do
project = get_project
featurestore_id = get_featurestore_id(project.id)

# get fgs
get "#{ENV['HOPSWORKS_API']}/project/" + project.id.to_s + "/featurestores/" + featurestore_id.to_s + "/featuregroups?sort_by=NAME:asc,VERSION:asc"
expect_status_details(200)
names_versions = json_body.map { |o| "#{o[:name]}_#{o[:version]}" }
sorted_names_versions = names_versions.sort_by(&:downcase)

expect(json_body.length).to eq(6)
expect(names_versions).to eq(sorted_names_versions)
end

it "should be able to list all featuregroups of the project's featurestore filtered by name" do
project = get_project
featurestore_id = get_featurestore_id(project.id)

# get fgs
get "#{ENV['HOPSWORKS_API']}/project/" + project.id.to_s + "/featurestores/" + featurestore_id.to_s + "/featuregroups?filter_by=NAME:featuregroup1"
expect_status_details(200)

expect(json_body.length).to eq(3)
expect(json_body.all? { |fg| fg[:name] == "featuregroup1" }).to be true
end

it "should be able to list all featuregroups of the project's featurestore filtered by name and version" do
project = get_project
featurestore_id = get_featurestore_id(project.id)

# get fgs
get "#{ENV['HOPSWORKS_API']}/project/" + project.id.to_s + "/featurestores/" + featurestore_id.to_s + "/featuregroups?filter_by=NAME:featuregroup1&filter_by=VERSION:2"
expect_status_details(200)

expect(json_body.length).to eq(1)
expect(json_body.all? { |fg| fg[:name] == "featuregroup1" }).to be true
expect(json_body.all? { |fg| fg[:version] == 2 }).to be true
end

it "should be able to list all featuregroups of the project's featurestore limit" do
project = get_project
featurestore_id = get_featurestore_id(project.id)

# get fgs
get "#{ENV['HOPSWORKS_API']}/project/" + project.id.to_s + "/featurestores/" + featurestore_id.to_s + "/featuregroups?sort_by=NAME:asc&limit=2"
expect_status_details(200)

expect(json_body.length).to eq(2)
end

it "should be able to list all featuregroups of the project's featurestore offset" do
project = get_project
featurestore_id = get_featurestore_id(project.id)

# get fgs
get "#{ENV['HOPSWORKS_API']}/project/" + project.id.to_s + "/featurestores/" + featurestore_id.to_s + "/featuregroups?sort_by=NAME:asc"
expect_status_details(200)
names = json_body.map { |o| "#{o[:name]}" }

get "#{ENV['HOPSWORKS_API']}/project/" + project.id.to_s + "/featurestores/" + featurestore_id.to_s + "/featuregroups?sort_by=NAME:asc&offset=1"
expect_status_details(200)
names_with_offset = json_body.map { |o| "#{o[:name]}" }

expect(json_body.length).to eq(5)
for i in 0..json_body.length-1 do
expect(names[i+1]).to eq(names_with_offset[i])
end
end

it "should be able to get a featuregroup with a particular id" do
project = get_project
featurestore_id = get_featurestore_id(project.id)

json_result, featuregroup_name = create_cached_featuregroup(project.id, featurestore_id)
expect_status_details(201)
parsed_json = JSON.parse(json_result)
expect_status_details(201)

featuregroup_id = parsed_json["id"]
get_featuregroup_endpoint = "#{ENV['HOPSWORKS_API']}/project/" + project.id.to_s + "/featurestores/" + featurestore_id.to_s + "/featuregroups/" + featuregroup_id.to_s
get get_featuregroup_endpoint
get "#{ENV['HOPSWORKS_API']}/project/" + project.id.to_s + "/featurestores/" + featurestore_id.to_s + "/featuregroups/" + featuregroup_id.to_s
parsed_json = JSON.parse(response.body)
expect_status_details(200)

expect(parsed_json.key?("id")).to be true
expect(parsed_json.key?("featurestoreName")).to be true
expect(parsed_json.key?("featurestoreId")).to be true
Expand Down
Loading

0 comments on commit 8adf966

Please sign in to comment.