Skip to content

Commit

Permalink
feat: unlock GVL for compress
Browse files Browse the repository at this point in the history
  • Loading branch information
SpringMT committed Apr 12, 2024
1 parent 788f4f5 commit d128f75
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 11 deletions.
9 changes: 8 additions & 1 deletion benchmarks/multi_thread_comporess.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,19 @@
json_string = File.read("./samples/#{sample_file_name}")

queue = Queue.new
# queue = []
GUESSES.times { queue << json_string }
# stream = Zstd::StreamingCompress.new(thread_num: THREADS)
THREADS.times { queue << nil }
THREADS.times.map {
Thread.new {
while str = queue.pop
Zstd.compress(str)
# stream = Zstd::StreamingCompress.new(thread_num: THREADS)
#stream << str
#stream << str
#stream << str
#stream.flush
Zstd.compress(str, thread_num: 1)
end
}
}.each(&:join)
2 changes: 1 addition & 1 deletion benchmarks/zstd_compress_memory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

sample_file_name = ARGV[0]

json_data = JSON.parse(IO.read("./samples/#{sample_file_name}"), symbolize_names: true)
json_data = JSON.parse(File.read("./samples/#{sample_file_name}"), symbolize_names: true)
json_string = json_data.to_json

i = 0
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/zstd_decompress_memory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
p "#{ObjectSpace.memsize_of_all/1000} #{ObjectSpace.count_objects} #{`ps -o rss= -p #{Process.pid}`.to_i}"

sample_file_name = ARGV[0]
json_data = JSON.parse(IO.read("./samples/#{sample_file_name}"), symbolize_names: true)
json_data = JSON.parse(File.read("./samples/#{sample_file_name}"), symbolize_names: true)
json_string = json_data.to_json

i = 0
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/zstd_streaming_compress_memory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

sample_file_name = ARGV[0]

json_string = IO.read("./samples/#{sample_file_name}")
json_string = File.read("./samples/#{sample_file_name}")

i = 0
start_time = Time.now
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/zstd_streaming_decompress_memory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

sample_file_name = ARGV[0]

cstr = IO.read("./results/#{sample_file_name}.zstd")
cstr = File.read("./results/#{sample_file_name}.zstd")
i = 0
start_time = Time.now
while true do
Expand Down
2 changes: 1 addition & 1 deletion examples/sinatra/Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ../..
specs:
zstd-ruby (1.5.6.1)
zstd-ruby (1.5.6.2)

GEM
remote: https://rubygems.org/
Expand Down
42 changes: 38 additions & 4 deletions ext/zstdruby/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
#define ZSTD_RUBY_H 1

#include <ruby.h>
#ifdef HAVE_RUBY_THREAD_H
#include <ruby/thread.h>
#endif
#include "./libzstd/zstd.h"

static int convert_compression_level(VALUE compression_level_value)
Expand All @@ -12,18 +15,40 @@ static int convert_compression_level(VALUE compression_level_value)
return NUM2INT(compression_level_value);
}

struct compress_params {
ZSTD_CCtx* ctx;
ZSTD_outBuffer* output;
ZSTD_inBuffer* input;
ZSTD_EndDirective endOp;
size_t ret;
};

static void* compress_wrapper(void* args)
{
struct compress_params* params = args;
params->ret = ZSTD_compressStream2(params->ctx, params->output, params->input, params->endOp);
return NULL;
}

static size_t zstd_compress(ZSTD_CCtx* const ctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input, ZSTD_EndDirective endOp)
{
return ZSTD_compressStream2(ctx, output, input, endOp);
#ifdef HAVE_RUBY_THREAD_H
struct compress_params params = { ctx, output, input, endOp };
rb_thread_call_without_gvl(compress_wrapper, &params, RUBY_UBF_IO, NULL);
return params.ret;
#else
return ZSTD_compressStream2(ctx, output, input, endOp);
#endif
}

static void set_compress_params(ZSTD_CCtx* const ctx, VALUE level_from_args, VALUE kwargs)
{
ID kwargs_keys[2];
ID kwargs_keys[3];
kwargs_keys[0] = rb_intern("level");
kwargs_keys[1] = rb_intern("dict");
VALUE kwargs_values[2];
rb_get_kwargs(kwargs, kwargs_keys, 0, 2, kwargs_values);
kwargs_keys[2] = rb_intern("thread_num");
VALUE kwargs_values[3];
rb_get_kwargs(kwargs, kwargs_keys, 0, 3, kwargs_values);

int compression_level = ZSTD_CLEVEL_DEFAULT;
if (kwargs_values[0] != Qundef && kwargs_values[0] != Qnil) {
Expand All @@ -43,6 +68,15 @@ static void set_compress_params(ZSTD_CCtx* const ctx, VALUE level_from_args, VAL
rb_raise(rb_eRuntimeError, "%s", "ZSTD_CCtx_loadDictionary failed");
}
}

if (kwargs_values[2] != Qundef && kwargs_values[2] != Qnil) {
int thread_num = NUM2INT(kwargs_values[2]);
size_t const r = ZSTD_CCtx_setParameter(ctx, ZSTD_c_nbWorkers, thread_num);
if (ZSTD_isError(r)) {
rb_warn("Note: the linked libzstd library doesn't support multithreading.Reverting to single-thread mode. \n");
}
// ZSTD_CCtx_setParameter(ctx, ZSTD_c_jobSize, thread_num);
}
}

static void set_decompress_params(ZSTD_DCtx* const dctx, VALUE kwargs)
Expand Down
2 changes: 1 addition & 1 deletion ext/zstdruby/extconf.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

have_func('rb_gc_mark_movable')

$CFLAGS = '-I. -O3 -std=c99 -DZSTD_STATIC_LINKING_ONLY'
$CFLAGS = '-I. -O3 -std=c99 -DZSTD_STATIC_LINKING_ONLY -DZSTD_MULTITHREAD -pthread'
$CPPFLAGS += " -fdeclspec" if CONFIG['CXX'] =~ /clang/

Dir.chdir File.expand_path('..', __FILE__) do
Expand Down

0 comments on commit d128f75

Please sign in to comment.