Skip to content

Commit

Permalink
Merge pull request #84 from SpringMT/feature/unlock-gvl-simple-compre…
Browse files Browse the repository at this point in the history
…ss-decompress

feat: unlock GVL for simple compression and decompression
  • Loading branch information
SpringMT authored Apr 25, 2024
2 parents 0d6de0f + a61c625 commit 3eedc15
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 21 deletions.
11 changes: 9 additions & 2 deletions .github/workflows/ruby.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,16 @@ jobs:
with:
ruby-version: ${{ matrix.ruby-version }}
bundler-cache: true # runs 'bundle install' and caches installed gems automatically
- name: Install dependencies
run: bundle install
- name: Compile
run: bundle exec rake compile
- name: Run tests
run: bundle exec rspec
- name: Run benchmarks
working-directory: benchmarks
run: |
bundle install
THREADS=4 /usr/bin/time -f "command %C\treal %e\tuser %U\tsys %S\tCPU %P" bundle exec ruby multi_thread_comporess.rb city.json
THREADS=4 /usr/bin/time -f "command %C\treal %e\tuser %U\tsys %S\tCPU %P" bundle exec ruby multi_thread_decomporess.rb city.json
THREADS=4 /usr/bin/time -f "command %C\treal %e\tuser %U\tsys %S\tCPU %P" bundle exec ruby multi_thread_streaming_comporess.rb city.json
THREADS=4 /usr/bin/time -f "command %C\treal %e\tuser %U\tsys %S\tCPU %P" bundle exec ruby multi_thread_streaming_decomporess.rb city.json
bundle exec ruby large_bytes.rb
11 changes: 4 additions & 7 deletions benchmarks/large_bytes.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,18 @@
require 'zstd-ruby'
require 'securerandom'

source_data = ""
512.times { source_data += SecureRandom.uuid }
# 1<<17 だと GitHub ActionsでOOMになる
source_data = Random.bytes(1<<16)

puts "source_data.size:#{source_data.size}"

# Test compressing and de-compressing our source data 100,000 times. The cycles
# are intended to exercise the libary and reproduce a memory leak.
100_000.times do |i|
10.times do |i|
compressed_data = Zstd.compress(source_data)
expanded_data = Zstd.decompress(compressed_data)
unless expanded_data == source_data
puts "Error: expanded data does not match source data"
end
if i % 1000 == 0
puts " - #{i}: c:#{compressed_data.size} e:#{expanded_data.size} memory:#{`ps -o rss= -p #{Process.pid}`.to_i}"
end

puts " - #{i}: c:#{compressed_data.size} e:#{expanded_data.size} memory:#{`ps -o rss= -p #{Process.pid}`.to_i}"
end
22 changes: 22 additions & 0 deletions benchmarks/multi_thread_comporess.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
$LOAD_PATH.unshift '../lib'
require 'zstd-ruby'
require 'thread'

GUESSES = (ENV['GUESSES'] || 1000).to_i
THREADS = (ENV['THREADS'] || 1).to_i

p GUESSES: GUESSES, THREADS: THREADS

sample_file_name = ARGV[0]
json_string = File.read("./samples/#{sample_file_name}")

queue = Queue.new
GUESSES.times { queue << json_string }
THREADS.times { queue << nil }
THREADS.times.map {
Thread.new {
while str = queue.pop
Zstd.compress(json_string)
end
}
}.each(&:join)
23 changes: 23 additions & 0 deletions benchmarks/multi_thread_decomporess.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
$LOAD_PATH.unshift '../lib'
require 'zstd-ruby'
require 'thread'

GUESSES = (ENV['GUESSES'] || 1000).to_i
THREADS = (ENV['THREADS'] || 1).to_i

p GUESSES: GUESSES, THREADS: THREADS

sample_file_name = ARGV[0]
json_string = File.read("./samples/#{sample_file_name}")
target = Zstd.compress(json_string)

queue = Queue.new
GUESSES.times { queue << target }
THREADS.times { queue << nil }
THREADS.times.map {
Thread.new {
while str = queue.pop
Zstd.decompress(str)
end
}
}.each(&:join)
Binary file modified benchmarks/results/city.json.gzip
Binary file not shown.
74 changes: 68 additions & 6 deletions ext/zstdruby/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,37 @@ static size_t zstd_stream_compress(ZSTD_CCtx* const ctx, ZSTD_outBuffer* output,
#endif
}

struct compress_params {
ZSTD_CCtx* ctx;
char* output_data;
size_t output_size;
char* input_data;
size_t input_size;
size_t ret;
};

static void* compress_wrapper(void* args)
{
struct compress_params* params = args;
params->ret = ZSTD_compress2(params->ctx ,params->output_data, params->output_size, params->input_data, params->input_size);
return NULL;
}

static size_t zstd_compress(ZSTD_CCtx* const ctx, char* output_data, size_t output_size, char* input_data, size_t input_size, bool gvl)
{
#ifdef HAVE_RUBY_THREAD_H
if (gvl) {
return ZSTD_compress2(ctx , output_data, output_size, input_data, input_size);
} else {
struct compress_params params = { ctx, output_data, output_size, input_data, input_size };
rb_thread_call_without_gvl(compress_wrapper, &params, NULL, NULL);
return params.ret;
}
#else
return ZSTD_compress2(ctx , output_data, output_size, input_data, input_size);
#endif
}

static void set_decompress_params(ZSTD_DCtx* const dctx, VALUE kwargs)
{
ID kwargs_keys[1];
Expand All @@ -92,33 +123,64 @@ static void set_decompress_params(ZSTD_DCtx* const dctx, VALUE kwargs)
}
}

struct decompress_params {
struct stream_decompress_params {
ZSTD_DCtx* dctx;
ZSTD_outBuffer* output;
ZSTD_inBuffer* input;
size_t ret;
};

static void* decompress_wrapper(void* args)
static void* stream_decompress_wrapper(void* args)
{
struct decompress_params* params = args;
struct stream_decompress_params* params = args;
params->ret = ZSTD_decompressStream(params->dctx, params->output, params->input);
return NULL;
}

static size_t zstd_decompress(ZSTD_DCtx* const dctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input, bool gvl)
static size_t zstd_stream_decompress(ZSTD_DCtx* const dctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input, bool gvl)
{
#ifdef HAVE_RUBY_THREAD_H
if (gvl) {
return ZSTD_decompressStream(dctx, output, input);
} else {
struct decompress_params params = { dctx, output, input };
rb_thread_call_without_gvl(decompress_wrapper, &params, NULL, NULL);
struct stream_decompress_params params = { dctx, output, input };
rb_thread_call_without_gvl(stream_decompress_wrapper, &params, NULL, NULL);
return params.ret;
}
#else
return ZSTD_decompressStream(dctx, output, input);
#endif
}

struct decompress_params {
ZSTD_DCtx* dctx;
char* output_data;
size_t output_size;
char* input_data;
size_t input_size;
size_t ret;
};

static void* decompress_wrapper(void* args)
{
struct decompress_params* params = args;
params->ret = ZSTD_decompressDCtx(params->dctx, params->output_data, params->output_size, params->input_data, params->input_size);
return NULL;
}

static size_t zstd_decompress(ZSTD_DCtx* const dctx, char* output_data, size_t output_size, char* input_data, size_t input_size, bool gvl)
{
#ifdef HAVE_RUBY_THREAD_H
if (gvl) {
return ZSTD_decompressDCtx(dctx, output_data, output_size, input_data, input_size);
} else {
struct decompress_params params = { dctx, output_data, output_size, input_data, input_size };
rb_thread_call_without_gvl(decompress_wrapper, &params, NULL, NULL);
return params.ret;
}
#else
return ZSTD_decompressDCtx(dctx, output_data, output_size, input_data, input_size);
#endif
}

#endif /* ZSTD_RUBY_H */
2 changes: 1 addition & 1 deletion ext/zstdruby/streaming_decompress.c
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ rb_streaming_decompress_decompress(VALUE obj, VALUE src)
VALUE result = rb_str_new(0, 0);
while (input.pos < input.size) {
ZSTD_outBuffer output = { (void*)output_data, sd->buf_size, 0 };
size_t const ret = zstd_decompress(sd->dctx, &output, &input, false);
size_t const ret = zstd_stream_decompress(sd->dctx, &output, &input, false);
if (ZSTD_isError(ret)) {
rb_raise(rb_eRuntimeError, "decompress error error code: %s", ZSTD_getErrorName(ret));
}
Expand Down
10 changes: 5 additions & 5 deletions ext/zstdruby/zstdruby.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ static VALUE rb_compress(int argc, VALUE *argv, VALUE self)
char* input_data = RSTRING_PTR(input_value);
size_t input_size = RSTRING_LEN(input_value);

size_t const max_compressed_size = ZSTD_compressBound(input_size);
size_t max_compressed_size = ZSTD_compressBound(input_size);
VALUE output = rb_str_new(NULL, max_compressed_size);
const char* output_data = RSTRING_PTR(output);
char* output_data = RSTRING_PTR(output);

size_t const ret = ZSTD_compress2(ctx,(void*)output_data, max_compressed_size, (void*)input_data, input_size);
size_t const ret = zstd_compress(ctx, output_data, max_compressed_size, input_data, input_size, false);
if (ZSTD_isError(ret)) {
rb_raise(rb_eRuntimeError, "compress error error code: %s", ZSTD_getErrorName(ret));
}
Expand Down Expand Up @@ -96,7 +96,7 @@ static VALUE decompress_buffered(ZSTD_DCtx* dctx, const char* input_data, size_t
rb_str_resize(output_string, output.size);
output.dst = RSTRING_PTR(output_string);

size_t ret = zstd_decompress(dctx, &output, &input, true);
size_t ret = zstd_stream_decompress(dctx, &output, &input, false);
if (ZSTD_isError(ret)) {
ZSTD_freeDCtx(dctx);
rb_raise(rb_eRuntimeError, "%s: %s", "ZSTD_decompressStream failed", ZSTD_getErrorName(ret));
Expand Down Expand Up @@ -134,7 +134,7 @@ static VALUE rb_decompress(int argc, VALUE *argv, VALUE self)
VALUE output = rb_str_new(NULL, uncompressed_size);
char* output_data = RSTRING_PTR(output);

size_t const decompress_size = ZSTD_decompressDCtx(dctx, output_data, uncompressed_size, input_data, input_size);
size_t const decompress_size = zstd_decompress(dctx, output_data, uncompressed_size, input_data, input_size, false);
if (ZSTD_isError(decompress_size)) {
rb_raise(rb_eRuntimeError, "%s: %s", "decompress error", ZSTD_getErrorName(decompress_size));
}
Expand Down

0 comments on commit 3eedc15

Please sign in to comment.