Skip to content

Commit

Permalink
adding streaming cypher support, using two types of JSON parsing sinc…
Browse files Browse the repository at this point in the history
…e Crack was breaking on large cypher results, stream is twice as fast as not when pulling in 20k nodes
  • Loading branch information
maxdemarzi committed Jun 3, 2012
1 parent c6075fb commit fcf7c3b
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 19 deletions.
3 changes: 1 addition & 2 deletions lib/neography/config.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module Neography
class Config
class << self; attr_accessor :protocol, :server, :port, :directory, :cypher_path, :gremlin_path, :log_file, :log_enabled, :logger, :max_threads, :authentication, :username, :password, :parser end
class << self; attr_accessor :protocol, :server, :port, :directory, :cypher_path, :gremlin_path, :log_file, :log_enabled, :logger, :max_threads, :authentication, :username, :password end

@protocol = 'http://'
@server = 'localhost'
Expand All @@ -15,6 +15,5 @@ class << self; attr_accessor :protocol, :server, :port, :directory, :cypher_path
@authentication = {}
@username = nil
@password = nil
@parser = {:parser => CrackParser}
end
end
42 changes: 25 additions & 17 deletions lib/neography/rest.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module Neography
class Rest
include HTTParty

attr_accessor :protocol, :server, :port, :directory, :cypher_path, :gremlin_path, :log_file, :log_enabled, :logger, :max_threads, :authentication, :username, :password, :parser
attr_accessor :protocol, :server, :port, :directory, :cypher_path, :gremlin_path, :log_file, :log_enabled, :logger, :max_threads, :authentication, :username, :password

def initialize(options=ENV['NEO4J_URL'] || {})
init = {:protocol => Neography::Config.protocol,
Expand All @@ -17,8 +17,7 @@ def initialize(options=ENV['NEO4J_URL'] || {})
:max_threads => Neography::Config.max_threads,
:authentication => Neography::Config.authentication,
:username => Neography::Config.username,
:password => Neography::Config.password,
:parser => Neography::Config.parser
:password => Neography::Config.password
}

unless options.respond_to?(:each_pair)
Expand Down Expand Up @@ -141,12 +140,13 @@ def reset_node_properties(id, properties)
end

def get_node_properties(id, properties = nil)
options = {:parser => CrackParser}
if properties.nil?
get("/node/#{get_id(id)}/properties")
get("/node/#{get_id(id)}/properties", options)
else
node_properties = Hash.new
Array(properties).each do |property|
value = get("/node/#{get_id(id)}/properties/#{property}")
value = get("/node/#{get_id(id)}/properties/#{property}", options)
node_properties[property] = value unless value.nil?
end
return nil if node_properties.empty?
Expand All @@ -155,18 +155,19 @@ def get_node_properties(id, properties = nil)
end

def remove_node_properties(id, properties = nil)
options = {:parser => CrackParser}
if properties.nil?
delete("/node/#{get_id(id)}/properties")
delete("/node/#{get_id(id)}/properties", options)
else
Array(properties).each do |property|
delete("/node/#{get_id(id)}/properties/#{property}")
delete("/node/#{get_id(id)}/properties/#{property}", options)
end
end
end

def set_node_properties(id, properties)
properties.each do |key, value|
options = { :body => value.to_json, :headers => {'Content-Type' => 'application/json'} }
options = { :body => value.to_json, :headers => {'Content-Type' => 'application/json'}, :parser => CrackParser}
put("/node/#{get_id(id)}/properties/#{key}", options)
end
end
Expand Down Expand Up @@ -206,12 +207,13 @@ def reset_relationship_properties(id, properties)
end

def get_relationship_properties(id, properties = nil)
options = {:parser => CrackParser}
if properties.nil?
get("/relationship/#{get_id(id)}/properties")
get("/relationship/#{get_id(id)}/properties", options)
else
relationship_properties = Hash.new
Array(properties).each do |property|
value = get("/relationship/#{get_id(id)}/properties/#{property}")
value = get("/relationship/#{get_id(id)}/properties/#{property}", options)
relationship_properties[property] = value unless value.nil?
end
return nil if relationship_properties.empty?
Expand All @@ -220,11 +222,12 @@ def get_relationship_properties(id, properties = nil)
end

def remove_relationship_properties(id, properties = nil)
options = {:parser => CrackParser}
if properties.nil?
delete("/relationship/#{get_id(id)}/properties")
delete("/relationship/#{get_id(id)}/properties", options)
else
Array(properties).each do |property|
delete("/relationship/#{get_id(id)}/properties/#{property}")
delete("/relationship/#{get_id(id)}/properties/#{property}", options)
end
end
end
Expand Down Expand Up @@ -368,10 +371,15 @@ def get_shortest_weighted_path(from, to, relationships, weight_attr='weight', de
end

def execute_query(query, params = {})
options = { :body => {:query => query, :params => params}.to_json, :headers => {'Content-Type' => 'application/json'} }
options = { :body => {:query => query, :params => params}.to_json, :headers => {'Content-Type' => 'application/json', 'accept' => 'application/json;stream=true'} , :parser => HTTParty::Parser}
result = post(@cypher_path, options)
end

def execute_query_not_streaming(query, params = {})
options = { :body => {:query => query, :params => params}.to_json, :headers => {'Content-Type' => 'application/json'}, :parser => HTTParty::Parser }
result = post(@cypher_path, options)
end

def execute_script(script, params = {})
options = { :body => {:script => script, :params => params}.to_json , :headers => {'Content-Type' => 'application/json'} }
result = post(@gremlin_path, options)
Expand Down Expand Up @@ -463,19 +471,19 @@ def evaluate_response(response)
end

def get(path,options={})
evaluate_response(HTTParty.get(configuration + URI.encode(path), options.merge!(@authentication).merge!(@parser)))
evaluate_response(HTTParty.get(configuration + URI.encode(path), options.merge!(@authentication)))
end

def post(path,options={})
evaluate_response(HTTParty.post(configuration + URI.encode(path), options.merge!(@authentication).merge!(@parser)))
evaluate_response(HTTParty.post(configuration + URI.encode(path), options.merge!(@authentication)))
end

def put(path,options={})
evaluate_response(HTTParty.put(configuration + URI.encode(path), options.merge!(@authentication).merge!(@parser)))
evaluate_response(HTTParty.put(configuration + URI.encode(path), options.merge!(@authentication)))
end

def delete(path,options={})
evaluate_response(HTTParty.delete(configuration + URI.encode(path), options.merge!(@authentication).merge!(@parser)))
evaluate_response(HTTParty.delete(configuration + URI.encode(path), options.merge!(@authentication)))
end

def get_id(id)
Expand Down
15 changes: 15 additions & 0 deletions spec/integration/rest_plugin_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@
existing_node.should have_key("self")
existing_node["self"].split('/').last.should == id
end

#it "can create a ton of nodes" do
# ton_nodes = @neo.execute_script("5000.times { g.addVertex();}")
# ton_nodes.should be_nil
#end

end

describe "execute cypher query" do
Expand Down Expand Up @@ -62,6 +68,15 @@
existing_node["data"][0][0]["self"].split('/').last.should == id
end

it "can get the a bunch of nodes streaming", :slow => true do
Benchmark.bm do |x|
x.report("cypher ") { @existing_nodes = @neo.execute_query_not_streaming("start n=node(*) return n") }
x.report("streaming cypher ") { @existing_nodes_streaming = @neo.execute_query("start n=node(*) return n") }
end
@existing_nodes.should_not be_nil
@existing_nodes_streaming.should_not be_nil
end

end

end

0 comments on commit fcf7c3b

Please sign in to comment.