diff --git a/ext/zstdruby/streaming_compress.c b/ext/zstdruby/streaming_compress.c index 7f37f38..fcfde44 100644 --- a/ext/zstdruby/streaming_compress.c +++ b/ext/zstdruby/streaming_compress.c @@ -1,12 +1,10 @@ #include #include -#include struct streaming_compress_t { ZSTD_CCtx* ctx; VALUE buf; size_t buf_size; - char nogvl; }; static void @@ -54,18 +52,11 @@ static VALUE rb_streaming_compress_initialize(int argc, VALUE *argv, VALUE obj) { VALUE compression_level_value; - VALUE kwargs; - rb_scan_args(argc, argv, "01:", &compression_level_value, &kwargs); + rb_scan_args(argc, argv, "01", &compression_level_value); int compression_level = convert_compression_level(compression_level_value); - ID kwargs_keys[1]; - kwargs_keys[0] = rb_intern("no_gvl"); - VALUE kwargs_values[1]; - rb_get_kwargs(kwargs, kwargs_keys, 0, 1, kwargs_values); - struct streaming_compress_t* sc; TypedData_Get_Struct(obj, struct streaming_compress_t, &streaming_compress_type, sc); - sc->nogvl = kwargs_values[0] != Qundef && RTEST(kwargs_values[0]); size_t const buffOutSize = ZSTD_CStreamOutSize(); ZSTD_CCtx* ctx = ZSTD_createCCtx(); @@ -85,35 +76,6 @@ rb_streaming_compress_initialize(int argc, VALUE *argv, VALUE obj) : (FIX2INT((val)))) #define ARG_CONTINUE(val) FIXNUMARG((val), ZSTD_e_continue) -struct compress_stream_nogvl_t { - ZSTD_CCtx* ctx; - ZSTD_outBuffer* output; - ZSTD_inBuffer* input; - ZSTD_EndDirective endOp; - size_t ret; -}; - -static void* -compressStream2_nogvl(void* arg) -{ - struct compress_stream_nogvl_t* params = arg; - params->ret = ZSTD_compressStream2(params->ctx, params->output, params->input, params->endOp); - return NULL; -} - -static size_t -compressStream2(char nogvl, ZSTD_CCtx* ctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input, ZSTD_EndDirective endOp) -{ - struct compress_stream_nogvl_t params = { ctx, output, input, endOp, 0 }; - if (nogvl) { - rb_thread_call_without_gvl(compressStream2_nogvl, ¶ms, NULL, NULL); - } - else { - compressStream2_nogvl(¶ms); - } - return params.ret; -} - static VALUE no_compress(struct streaming_compress_t* sc, ZSTD_EndDirective endOp) { @@ -124,7 +86,7 @@ no_compress(struct streaming_compress_t* sc, ZSTD_EndDirective endOp) do { ZSTD_outBuffer output = { (void*)output_data, sc->buf_size, 0 }; - size_t const ret = compressStream2(sc->nogvl, sc->ctx, &output, &input, endOp); + size_t const ret = ZSTD_compressStream2(sc->ctx, &output, &input, endOp); if (ZSTD_isError(ret)) { rb_raise(rb_eRuntimeError, "flush error error code: %s", ZSTD_getErrorName(ret)); } @@ -147,7 +109,7 @@ rb_streaming_compress_compress(VALUE obj, VALUE src) VALUE result = rb_str_new(0, 0); while (input.pos < input.size) { ZSTD_outBuffer output = { (void*)output_data, sc->buf_size, 0 }; - size_t const ret = compressStream2(sc->nogvl, sc->ctx, &output, &input, ZSTD_e_continue); + size_t const ret = ZSTD_compressStream2(sc->ctx, &output, &input, ZSTD_e_continue); if (ZSTD_isError(ret)) { rb_raise(rb_eRuntimeError, "compress error error code: %s", ZSTD_getErrorName(ret)); } @@ -170,7 +132,7 @@ rb_streaming_compress_addstr(VALUE obj, VALUE src) while (input.pos < input.size) { ZSTD_outBuffer output = { (void*)output_data, sc->buf_size, 0 }; - size_t const result = compressStream2(sc->nogvl, sc->ctx, &output, &input, ZSTD_e_continue); + size_t const result = ZSTD_compressStream2(sc->ctx, &output, &input, ZSTD_e_continue); if (ZSTD_isError(result)) { rb_raise(rb_eRuntimeError, "compress error error code: %s", ZSTD_getErrorName(result)); } diff --git a/ext/zstdruby/streaming_decompress.c b/ext/zstdruby/streaming_decompress.c index f55ec4b..fd3c7b7 100644 --- a/ext/zstdruby/streaming_decompress.c +++ b/ext/zstdruby/streaming_decompress.c @@ -1,11 +1,9 @@ #include -#include struct streaming_decompress_t { ZSTD_DCtx* ctx; VALUE buf; size_t buf_size; - char nogvl; }; static void @@ -50,19 +48,10 @@ rb_streaming_decompress_allocate(VALUE klass) } static VALUE -rb_streaming_decompress_initialize(int argc, VALUE *argv, VALUE obj) +rb_streaming_decompress_initialize(VALUE obj) { - VALUE kwargs; - rb_scan_args(argc, argv, "00:", &kwargs); - - ID kwargs_keys[1]; - kwargs_keys[0] = rb_intern("no_gvl"); - VALUE kwargs_values[1]; - rb_get_kwargs(kwargs, kwargs_keys, 0, 1, kwargs_values); - struct streaming_decompress_t* sd; TypedData_Get_Struct(obj, struct streaming_decompress_t, &streaming_decompress_type, sd); - sd->nogvl = kwargs_values[0] != Qundef && RTEST(kwargs_values[0]); size_t const buffOutSize = ZSTD_DStreamOutSize(); ZSTD_DCtx* ctx = ZSTD_createDCtx(); @@ -76,34 +65,6 @@ rb_streaming_decompress_initialize(int argc, VALUE *argv, VALUE obj) return obj; } -struct decompress_stream_nogvl_t { - ZSTD_DCtx* ctx; - ZSTD_outBuffer* output; - ZSTD_inBuffer* input; - size_t ret; -}; - -static void* -decompressStream_nogvl(void* args) -{ - struct decompress_stream_nogvl_t* params = args; - params->ret = ZSTD_decompressStream(params->ctx, params->output, params->input); - return NULL; -} - -static size_t -decompressStream(char nogvl, ZSTD_DCtx* ctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input) -{ - struct decompress_stream_nogvl_t params = { ctx, output, input, 0 }; - if (nogvl) { - rb_thread_call_without_gvl(decompressStream_nogvl, ¶ms, NULL, NULL); - } - else { - decompressStream_nogvl(¶ms); - } - return params.ret; -} - static VALUE rb_streaming_decompress_decompress(VALUE obj, VALUE src) { @@ -118,7 +79,7 @@ rb_streaming_decompress_decompress(VALUE obj, VALUE src) VALUE result = rb_str_new(0, 0); while (input.pos < input.size) { ZSTD_outBuffer output = { (void*)output_data, sd->buf_size, 0 }; - size_t const ret = decompressStream(sd->nogvl, sd->ctx, &output, &input); + size_t const ret = ZSTD_decompressStream(sd->ctx, &output, &input); if (ZSTD_isError(ret)) { rb_raise(rb_eRuntimeError, "compress error error code: %s", ZSTD_getErrorName(ret)); } @@ -141,7 +102,7 @@ rb_streaming_decompress_addstr(VALUE obj, VALUE src) while (input.pos < input.size) { ZSTD_outBuffer output = { (void*)output_data, sd->buf_size, 0 }; - size_t const result = decompressStream(sd->nogvl, sd->ctx, &output, &input); + size_t const result = ZSTD_decompressStream(sd->ctx, &output, &input); if (ZSTD_isError(result)) { rb_raise(rb_eRuntimeError, "compress error error code: %s", ZSTD_getErrorName(result)); } @@ -155,7 +116,7 @@ zstd_ruby_streaming_decompress_init(void) { VALUE cStreamingDecompress = rb_define_class_under(rb_mZstd, "StreamingDecompress", rb_cObject); rb_define_alloc_func(cStreamingDecompress, rb_streaming_decompress_allocate); - rb_define_method(cStreamingDecompress, "initialize", rb_streaming_decompress_initialize, -1); + rb_define_method(cStreamingDecompress, "initialize", rb_streaming_decompress_initialize, 0); rb_define_method(cStreamingDecompress, "decompress", rb_streaming_decompress_decompress, 1); rb_define_method(cStreamingDecompress, "<<", rb_streaming_decompress_addstr, 1); } diff --git a/spec/zstd-ruby-streaming-compress_spec.rb b/spec/zstd-ruby-streaming-compress_spec.rb index 335e62e..5c3f3b4 100644 --- a/spec/zstd-ruby-streaming-compress_spec.rb +++ b/spec/zstd-ruby-streaming-compress_spec.rb @@ -1,10 +1,10 @@ require "spec_helper" require 'zstd-ruby' -shared_examples "a streaming compressor" do +RSpec.describe Zstd::StreamingCompress do describe '<<' do it 'shoud work' do - stream = Zstd::StreamingCompress.new(no_gvl: no_gvl) + stream = Zstd::StreamingCompress.new stream << "abc" << "def" res = stream.finish expect(Zstd.decompress(res)).to eq('abcdef') @@ -13,7 +13,7 @@ describe '<< + GC.compat' do it 'shoud work' do - stream = Zstd::StreamingCompress.new(no_gvl: no_gvl) + stream = Zstd::StreamingCompress.new stream << "abc" << "def" GC.compact stream << "ghi" @@ -24,7 +24,7 @@ describe '<< + flush' do it 'shoud work' do - stream = Zstd::StreamingCompress.new(no_gvl: no_gvl) + stream = Zstd::StreamingCompress.new stream << "abc" << "def" res = stream.flush stream << "ghi" @@ -35,7 +35,7 @@ describe 'compress + flush' do it 'shoud work' do - stream = Zstd::StreamingCompress.new(no_gvl: no_gvl) + stream = Zstd::StreamingCompress.new res = stream.compress("abc") res << stream.flush res << stream.compress("def") @@ -46,7 +46,7 @@ describe 'compression level' do it 'shoud work' do - stream = Zstd::StreamingCompress.new(5, no_gvl: no_gvl) + stream = Zstd::StreamingCompress.new(5) stream << "abc" << "def" res = stream.finish expect(Zstd.decompress(res)).to eq('abcdef') @@ -56,26 +56,14 @@ if Gem::Version.new(RUBY_VERSION) >= Gem::Version.new('3.0.0') describe 'Ractor' do it 'should be supported' do - r = Ractor.new(no_gvl) do |no_gvl| - stream = Zstd::StreamingCompress.new(5, no_gvl: no_gvl) + r = Ractor.new { + stream = Zstd::StreamingCompress.new(5) stream << "abc" << "def" res = stream.finish - end + } expect(Zstd.decompress(r.take)).to eq('abcdef') end end end end -RSpec.describe Zstd::StreamingCompress do - describe "with the global lock" do - let(:no_gvl) { false } - it_behaves_like "a streaming compressor" - end - - describe "without the global lock" do - let(:no_gvl) { true } - it_behaves_like "a streaming compressor" - end -end - diff --git a/spec/zstd-ruby-streaming-decompress_spec.rb b/spec/zstd-ruby-streaming-decompress_spec.rb index 28ed1f5..cddc119 100644 --- a/spec/zstd-ruby-streaming-decompress_spec.rb +++ b/spec/zstd-ruby-streaming-decompress_spec.rb @@ -2,7 +2,7 @@ require 'zstd-ruby' require 'securerandom' -shared_examples "a streaming decompressor" do +RSpec.describe Zstd::StreamingDecompress do describe 'streaming decompress' do it 'shoud work' do # str = SecureRandom.hex(150) @@ -22,7 +22,7 @@ # str = SecureRandom.hex(150) str = "foo bar buzz" * 100 cstr = Zstd.compress(str) - stream = Zstd::StreamingDecompress.new(no_gvl: no_gvl) + stream = Zstd::StreamingDecompress.new result = '' result << stream.decompress(cstr[0, 5]) result << stream.decompress(cstr[5, 5]) @@ -35,30 +35,18 @@ if Gem::Version.new(RUBY_VERSION) >= Gem::Version.new('3.0.0') describe 'Ractor' do it 'should be supported' do - r = Ractor.new(no_gvl) do |no_gvl| + r = Ractor.new { cstr = Zstd.compress('foo bar buzz') - stream = Zstd::StreamingDecompress.new(no_gvl: no_gvl) + stream = Zstd::StreamingDecompress.new result = '' result << stream.decompress(cstr[0, 5]) result << stream.decompress(cstr[5, 5]) result << stream.decompress(cstr[10..-1]) result - end + } expect(r.take).to eq('foo bar buzz') end end end end -RSpec.describe Zstd::StreamingDecompress do - describe "with the gvl" do - let(:no_gvl) { false } - it_behaves_like "a streaming decompressor" - end - - describe "without the gvl" do - let(:no_gvl) { true } - it_behaves_like "a streaming decompressor" - end -end -