From d128f752d7c4c7a2fa62d4d532163f4690d466d3 Mon Sep 17 00:00:00 2001 From: SpringMT Date: Fri, 12 Apr 2024 21:55:13 +0900 Subject: [PATCH] feat: unlock GVL for compress --- benchmarks/multi_thread_comporess.rb | 9 +++- benchmarks/zstd_compress_memory.rb | 2 +- benchmarks/zstd_decompress_memory.rb | 2 +- benchmarks/zstd_streaming_compress_memory.rb | 2 +- .../zstd_streaming_decompress_memory.rb | 2 +- examples/sinatra/Gemfile.lock | 2 +- ext/zstdruby/common.h | 42 +++++++++++++++++-- ext/zstdruby/extconf.rb | 2 +- 8 files changed, 52 insertions(+), 11 deletions(-) diff --git a/benchmarks/multi_thread_comporess.rb b/benchmarks/multi_thread_comporess.rb index 50ccde2..42517e5 100644 --- a/benchmarks/multi_thread_comporess.rb +++ b/benchmarks/multi_thread_comporess.rb @@ -14,12 +14,19 @@ json_string = File.read("./samples/#{sample_file_name}") queue = Queue.new +# queue = [] GUESSES.times { queue << json_string } +# stream = Zstd::StreamingCompress.new(thread_num: THREADS) THREADS.times { queue << nil } THREADS.times.map { Thread.new { while str = queue.pop - Zstd.compress(str) + # stream = Zstd::StreamingCompress.new(thread_num: THREADS) + #stream << str + #stream << str + #stream << str + #stream.flush + Zstd.compress(str, thread_num: 1) end } }.each(&:join) diff --git a/benchmarks/zstd_compress_memory.rb b/benchmarks/zstd_compress_memory.rb index d18e6e9..7edfb20 100644 --- a/benchmarks/zstd_compress_memory.rb +++ b/benchmarks/zstd_compress_memory.rb @@ -10,7 +10,7 @@ sample_file_name = ARGV[0] -json_data = JSON.parse(IO.read("./samples/#{sample_file_name}"), symbolize_names: true) +json_data = JSON.parse(File.read("./samples/#{sample_file_name}"), symbolize_names: true) json_string = json_data.to_json i = 0 diff --git a/benchmarks/zstd_decompress_memory.rb b/benchmarks/zstd_decompress_memory.rb index 3b35b4f..95aee0c 100644 --- a/benchmarks/zstd_decompress_memory.rb +++ b/benchmarks/zstd_decompress_memory.rb @@ -9,7 +9,7 @@ p "#{ObjectSpace.memsize_of_all/1000} #{ObjectSpace.count_objects} #{`ps -o rss= -p #{Process.pid}`.to_i}" sample_file_name = ARGV[0] -json_data = JSON.parse(IO.read("./samples/#{sample_file_name}"), symbolize_names: true) +json_data = JSON.parse(File.read("./samples/#{sample_file_name}"), symbolize_names: true) json_string = json_data.to_json i = 0 diff --git a/benchmarks/zstd_streaming_compress_memory.rb b/benchmarks/zstd_streaming_compress_memory.rb index 3ae99cf..82306f4 100644 --- a/benchmarks/zstd_streaming_compress_memory.rb +++ b/benchmarks/zstd_streaming_compress_memory.rb @@ -10,7 +10,7 @@ sample_file_name = ARGV[0] -json_string = IO.read("./samples/#{sample_file_name}") +json_string = File.read("./samples/#{sample_file_name}") i = 0 start_time = Time.now diff --git a/benchmarks/zstd_streaming_decompress_memory.rb b/benchmarks/zstd_streaming_decompress_memory.rb index 166bd98..1c791bc 100644 --- a/benchmarks/zstd_streaming_decompress_memory.rb +++ b/benchmarks/zstd_streaming_decompress_memory.rb @@ -10,7 +10,7 @@ sample_file_name = ARGV[0] -cstr = IO.read("./results/#{sample_file_name}.zstd") +cstr = File.read("./results/#{sample_file_name}.zstd") i = 0 start_time = Time.now while true do diff --git a/examples/sinatra/Gemfile.lock b/examples/sinatra/Gemfile.lock index 8708d15..7f1fa19 100644 --- a/examples/sinatra/Gemfile.lock +++ b/examples/sinatra/Gemfile.lock @@ -1,7 +1,7 @@ PATH remote: ../.. specs: - zstd-ruby (1.5.6.1) + zstd-ruby (1.5.6.2) GEM remote: https://rubygems.org/ diff --git a/ext/zstdruby/common.h b/ext/zstdruby/common.h index 8bc953a..a2169e2 100644 --- a/ext/zstdruby/common.h +++ b/ext/zstdruby/common.h @@ -2,6 +2,9 @@ #define ZSTD_RUBY_H 1 #include +#ifdef HAVE_RUBY_THREAD_H +#include +#endif #include "./libzstd/zstd.h" static int convert_compression_level(VALUE compression_level_value) @@ -12,18 +15,40 @@ static int convert_compression_level(VALUE compression_level_value) return NUM2INT(compression_level_value); } +struct compress_params { + ZSTD_CCtx* ctx; + ZSTD_outBuffer* output; + ZSTD_inBuffer* input; + ZSTD_EndDirective endOp; + size_t ret; +}; + +static void* compress_wrapper(void* args) +{ + struct compress_params* params = args; + params->ret = ZSTD_compressStream2(params->ctx, params->output, params->input, params->endOp); + return NULL; +} + static size_t zstd_compress(ZSTD_CCtx* const ctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input, ZSTD_EndDirective endOp) { - return ZSTD_compressStream2(ctx, output, input, endOp); +#ifdef HAVE_RUBY_THREAD_H + struct compress_params params = { ctx, output, input, endOp }; + rb_thread_call_without_gvl(compress_wrapper, ¶ms, RUBY_UBF_IO, NULL); + return params.ret; +#else + return ZSTD_compressStream2(ctx, output, input, endOp); +#endif } static void set_compress_params(ZSTD_CCtx* const ctx, VALUE level_from_args, VALUE kwargs) { - ID kwargs_keys[2]; + ID kwargs_keys[3]; kwargs_keys[0] = rb_intern("level"); kwargs_keys[1] = rb_intern("dict"); - VALUE kwargs_values[2]; - rb_get_kwargs(kwargs, kwargs_keys, 0, 2, kwargs_values); + kwargs_keys[2] = rb_intern("thread_num"); + VALUE kwargs_values[3]; + rb_get_kwargs(kwargs, kwargs_keys, 0, 3, kwargs_values); int compression_level = ZSTD_CLEVEL_DEFAULT; if (kwargs_values[0] != Qundef && kwargs_values[0] != Qnil) { @@ -43,6 +68,15 @@ static void set_compress_params(ZSTD_CCtx* const ctx, VALUE level_from_args, VAL rb_raise(rb_eRuntimeError, "%s", "ZSTD_CCtx_loadDictionary failed"); } } + + if (kwargs_values[2] != Qundef && kwargs_values[2] != Qnil) { + int thread_num = NUM2INT(kwargs_values[2]); + size_t const r = ZSTD_CCtx_setParameter(ctx, ZSTD_c_nbWorkers, thread_num); + if (ZSTD_isError(r)) { + rb_warn("Note: the linked libzstd library doesn't support multithreading.Reverting to single-thread mode. \n"); + } + // ZSTD_CCtx_setParameter(ctx, ZSTD_c_jobSize, thread_num); + } } static void set_decompress_params(ZSTD_DCtx* const dctx, VALUE kwargs) diff --git a/ext/zstdruby/extconf.rb b/ext/zstdruby/extconf.rb index 4575bd3..c470bfe 100644 --- a/ext/zstdruby/extconf.rb +++ b/ext/zstdruby/extconf.rb @@ -2,7 +2,7 @@ have_func('rb_gc_mark_movable') -$CFLAGS = '-I. -O3 -std=c99 -DZSTD_STATIC_LINKING_ONLY' +$CFLAGS = '-I. -O3 -std=c99 -DZSTD_STATIC_LINKING_ONLY -DZSTD_MULTITHREAD -pthread' $CPPFLAGS += " -fdeclspec" if CONFIG['CXX'] =~ /clang/ Dir.chdir File.expand_path('..', __FILE__) do