diff --git a/lib/neography/connection.rb b/lib/neography/connection.rb index 5e30c06..b74f366 100644 --- a/lib/neography/connection.rb +++ b/lib/neography/connection.rb @@ -58,10 +58,24 @@ def merge_options(options) path = "/db/data" + path if reserved.include?(partial_path) is_batch = (partial_path == "batch") query_body = options[:body] - log path, query_body do - headers = merge_options(options)[:headers] - evaluate_response(@client.request(:method => action.to_sym, :path => path, :body => query_body, :headers => headers), - path, query_body, headers && (headers['X-Stream'] == true), is_batch) + stream = "" + streamer = lambda do |chunk, remaining_bytes, total_bytes| + stream += chunk + end + headers = merge_options(options)[:headers] + is_streaming = headers && (headers['X-Stream'] == true) + if is_batch && is_streaming + log path, query_body do + response = @client.request(:persistent => false, :method => action.to_sym, :path => path, :body => query_body, + :headers => headers, :response_block => streamer, :read_timeout => 100000000, :write_timeout => 100000000) + evaluate_response(response, path, query_body, is_streaming, is_batch, stream) + end + else + log path, query_body do + evaluate_response(@client.request(:method => action.to_sym, :path => path, :body => query_body, :headers => headers), + path, query_body, is_streaming, is_batch) + end + end end end @@ -139,9 +153,9 @@ def evaluate_chunk_response(response, result) return_result(code, result) end - def evaluate_response(response, path, query_body, streaming, batching) + def evaluate_response(response, path, query_body, streaming, batching, stream = nil) if streaming && batching - code, body, parsed = handle_batch(response) + code, body, parsed = handle_batch(stream) else code = response.status body = response.body.force_encoding("UTF-8") @@ -150,9 +164,9 @@ def evaluate_response(response, path, query_body, streaming, batching) return_result(response, code, body, parsed, path, query_body) end - def handle_batch(response) + def handle_batch(stream) code = 200 - body = @parser.json(response.body.force_encoding("UTF-8")) + body = @parser.json(stream.force_encoding("UTF-8")) body.each do |result| if result["status"] >= 400 code = result["status"] diff --git a/spec/integration/rest_batch_streaming_spec.rb b/spec/integration/rest_batch_streaming_spec.rb index a62a101..ce04560 100644 --- a/spec/integration/rest_batch_streaming_spec.rb +++ b/spec/integration/rest_batch_streaming_spec.rb @@ -27,16 +27,25 @@ batch_result.last["body"]["self"].split('/').last.should == "0" end - # fails in batch streaming - #it "can send a 20000 item batch" do - # commands = [] - # 20000.times do |x| - # commands << [:create_node, {"name" => "Max " + x.to_s}] - # end - # batch_result = @neo.batch *commands - # batch_result.first["body"]["data"]["name"].should == "Max 0" - # batch_result.last["body"]["data"]["name"].should == "Max 19999" - #end - end + it "can send a 20000 get item batch" do + commands = [] + 20000.times do |x| + commands << [:get_node, 0] + end + batch_result = @neo.batch *commands + batch_result.first["body"]["self"].split('/').last.should == "0" + batch_result.last["body"]["self"].split('/').last.should == "0" + end + # it "can send a 20000 create item batch" do + # commands = [] + # 20000.times do |x| + # commands << [:create_node, {"name" => "Max " + x.to_s}] + # end + # batch_result = @neo.batch *commands + # batch_result.first["body"]["data"]["name"].should == "Max 0" + # batch_result.last["body"]["data"]["name"].should == "Max 19999" + # end + + end end