Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement stream-based input record decoder's instead of batch-based #52

Merged
merged 6 commits into from
Aug 17, 2020

Conversation

syucream
Copy link
Contributor

@syucream syucream commented Aug 6, 2020

It should reduce memory usage on most cases! I checked RSS memory usage with 270 MB sized msgpack encoded file, the highest usage was 341624 kB. Contrary it's 2373248 kB at the latest release version v0.0.4 It's a dramatic reduction effect, the value was about 0.15%!

Additionally it possibly accelerate the parquet encoding(it just might be affected by the memory reduction I think):

# v0.0.4
$ time ./columnify -schemaType avro -schemaFile ./rails-small.avsc -recordType msgpack ./tmp.msgpack > /dev/null
./columnify -schemaType avro -schemaFile ./rails-small.avsc -recordType   >   53.01s user 2.15s system 132% cpu 41.541 total

# with this patch
$ time ./columnify -schemaType avro -schemaFile ./rails-small.avsc -recordType msgpack ./tmp.msgpack > /dev/null
./columnify -schemaType avro -schemaFile ./rails-small.avsc -recordType   >   38.34s user 0.98s system 108% cpu 36.281 total

NOTE it has a breaking change; Columnifier interface stops supporting io.WriterCloser

@syucream syucream requested review from okkez, t2y and abicky August 6, 2020 14:05
@syucream syucream self-assigned this Aug 6, 2020
@syucream
Copy link
Contributor Author

syucream commented Aug 6, 2020

It's from #43

@codecov-commenter
Copy link

codecov-commenter commented Aug 6, 2020

Codecov Report

Merging #52 into master will increase coverage by 12.71%.
The diff coverage is 75.73%.

Impacted file tree graph

@@             Coverage Diff             @@
##           master      #52       +/-   ##
===========================================
+ Coverage   70.05%   82.77%   +12.71%     
===========================================
  Files          19       15        -4     
  Lines         875      505      -370     
===========================================
- Hits          613      418      -195     
+ Misses        203       65      -138     
+ Partials       59       22       -37     
Flag Coverage Δ
#unittests 82.77% <75.73%> (+12.71%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
columnifier/columnifier.go 0.00% <ø> (ø)
parquet/stdio.go 80.00% <ø> (ø)
record/record.go 19.35% <27.27%> (-5.65%) ⬇️
record/avro.go 77.14% <63.63%> (+4.41%) ⬆️
columnifier/parquet.go 83.60% <78.57%> (-1.58%) ⬇️
record/jsonl.go 83.33% <83.33%> (+27.08%) ⬆️
record/ltsv.go 92.85% <90.00%> (+10.71%) ⬆️
record/csv.go 90.00% <94.28%> (+10.45%) ⬆️
record/msgpack.go 100.00% <100.00%> (+36.84%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update f6d5b93...d2d8591. Read the comment docs.

@syucream syucream mentioned this pull request Aug 6, 2020
8 tasks
@@ -103,11 +102,12 @@ func (c *parquetColumnifier) WriteFromFiles(paths []string) (int, error) {
var n int

for _, p := range paths {
data, err := ioutil.ReadFile(p)
f, err := os.Open(p)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this PR makes iterating processes reading some blocks of the file instead of reading the whole file. That's good.

Copy link
Contributor

@t2y t2y left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE it has a breaking change; Columnifier interface stops supporting io.WriterCloser

I agree with it. But it's better to write a note about a breaking change in README.

@t2y
Copy link
Contributor

t2y commented Aug 7, 2020

Wait to merge this until other reviewers will approve since the patch size is large.

@syucream
Copy link
Contributor Author

syucream commented Aug 7, 2020

@t2y I added CHANGELOG.md file to record change history. :) (it's inspired by fluentd's changelog file

@syucream
Copy link
Contributor Author

@abicky @okkez any comment?

@okkez
Copy link
Contributor

okkez commented Aug 12, 2020

Converting 223MB msgpack to parquest same as #43.
Columnify consumes memory around 330MB (ps command's RSS).

@syucream
Copy link
Contributor Author

@okkez It looks like a similar result to mine. It's still not ideal, but anyway better than the latest release. So why don't we apply this change? If you're ok, I would like to publish it as v0.1.0 improved release. For a long term improvement, I guess we need additional improvements with including a part encoding to parquet.

@abicky
Copy link
Member

abicky commented Aug 12, 2020

This change will reduce memory consumption a lot!

Before

% gtime -v columnify -schemaType avro -schemaFile docker_log.avsc -recordType jsonl input.jsonl > out.parquet
        Command being timed: "columnify -schemaType avro -schemaFile docker_log.avsc -recordType jsonl input.jsonl"
        User time (seconds): 26.14
        System time (seconds): 1.63
        Percent of CPU this job got: 120%
        Elapsed (wall clock) time (h:mm:ss or m:ss): 0:23.00
        Average shared text size (kbytes): 0
        Average unshared data size (kbytes): 0
        Average stack size (kbytes): 0
        Average total size (kbytes): 0
        Maximum resident set size (kbytes): 1542252
        Average resident set size (kbytes): 0
        Major (requiring I/O) page faults: 1179
        Minor (reclaiming a frame) page faults: 445384
        Voluntary context switches: 419
        Involuntary context switches: 121936
        Swaps: 0
        File system inputs: 0
        File system outputs: 0
        Socket messages sent: 0
        Socket messages received: 0
        Signals delivered: 2505
        Page size (bytes): 4096
        Exit status: 0

After

% gtime -v columnify-latest -schemaType avro -schemaFile docker_log.avsc -recordType jsonl input.jsonl > out.parquet
        Command being timed: "columnify-latest -schemaType avro -schemaFile docker_log.avsc -recordType jsonl input.jsonl"
        User time (seconds): 19.35
        System time (seconds): 0.71
        Percent of CPU this job got: 115%
        Elapsed (wall clock) time (h:mm:ss or m:ss): 0:17.39
        Average shared text size (kbytes): 0
        Average unshared data size (kbytes): 0
        Average stack size (kbytes): 0
        Average total size (kbytes): 0
        Maximum resident set size (kbytes): 281776
        Average resident set size (kbytes): 0
        Major (requiring I/O) page faults: 0
        Minor (reclaiming a frame) page faults: 70743
        Voluntary context switches: 388
        Involuntary context switches: 48274
        Swaps: 0
        File system inputs: 0
        File system outputs: 0
        Socket messages sent: 0
        Socket messages received: 0
        Signals delivered: 2538
        Page size (bytes): 4096
        Exit status: 0

Here is docker_log.avsc:

{
  "type": "record",
  "name": "DockerLog",
  "fields" : [
    {"name": "log_time",       "type": "long"},
    {"name": "container_id",   "type": "string"},
    {"name": "container_name", "type": "string"},
    {"name": "source",         "type": "string"},
    {"name": "log",            "type": "string"}
  ]
}

I created the file input.jsonl using the following script:

# frozen_string_literal: true

require 'json'
require 'logger'

require 'faker'

N = 711150

CONTAINER_IDS = 100.times.map { SecureRandom.hex(32) }
PATHS = %w[/v1/foo /v1/bar /v1/baz]

$stdout.sync = true
logger = Logger.new($stdout)

prng = Random.new(42)
Faker::Config.random = prng

logger.info 'Start sending events'

base_time = Time.now.yield_self do |t|
  Time.new(t.year, t.month, t.day + 1, 12)
end

delta = Rational(3600, 10_000_000)
File.open(ARGV[0], 'w') do |f|
  N.times do |i|
    now = base_time + delta * i
    event = {
      container_id: CONTAINER_IDS.sample(random: prng),
      container_name: '/test-container',
      source: 'stdout',
      log_time: now.to_i * 10**9 + now.nsec
    }
    if prng.rand(1000).zero?
      event[:log] = %Q|#{Faker::Internet.public_ip_v4_address} - - [#{now.strftime('%d/%b/%Y:%T %z')}] "GET /v2#{PATHS.sample(random: prng)} HTTP/1.1" 404 153 "-" "#{Faker::Internet.user_agent}" "-"|
    else
      event[:log] = %Q|#{Faker::Internet.public_ip_v4_address} - - [#{now.strftime('%d/%b/%Y:%T %z')}] "GET #{PATHS.sample(random: prng)} HTTP/1.1" 200 #{prng.rand(1000) + 100} "-" "#{Faker::Internet.user_agent}" "-"|
    end

    f.puts event.to_json
  end
end
logger.info 'Sending events is complete'

Copy link
Member

@abicky abicky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR looks almost good! As I commented, I'm afraid that you forgot to check some errors. Can you take a look?

@@ -0,0 +1,34 @@
# v0.1.x
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good changelog 👍

columnifier/parquet.go Outdated Show resolved Hide resolved
columnifier/parquet.go Outdated Show resolved Hide resolved
columnifier/parquet.go Outdated Show resolved Hide resolved
record/avro.go Show resolved Hide resolved
record/csv.go Outdated Show resolved Hide resolved
if vv, err := strconv.ParseFloat(v, 64); err == nil {
e[names[i]] = vv
// bool
if v != "0" && v != "1" {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't you use the schema information? I'm concerned that the string "t" is converted true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer not to touch this place now and will aim it as more common and separated issue from this.
ref. #27 and an actual try #47 (comment)

record/jsonl.go Show resolved Hide resolved
m := make(map[string]interface{})
for k, v := range v {
*r = make(map[string]interface{})
for k, v := range m {
// bool
if v != "0" && v != "1" {
if vv, err := strconv.ParseBool(v); err == nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't you use the schema information? I'm concerned that the string "t" is converted true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as #52 (comment)

record/ltsv.go Show resolved Hide resolved
record/record_test.go Outdated Show resolved Hide resolved
record/record_test.go Outdated Show resolved Hide resolved
@syucream
Copy link
Contributor Author

applied some feedbacks :)

Copy link
Member

@abicky abicky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@abicky
Copy link
Member

abicky commented Aug 16, 2020

I made some suggestions, but they were my misunderstanding 💦 I deleted them.

@syucream
Copy link
Contributor Author

Thanks all! I merge it and release the next version!

@syucream syucream merged commit be6cb1e into master Aug 17, 2020
@syucream syucream deleted the decoder branch August 17, 2020 05:33
@syucream
Copy link
Contributor Author

🎉 https://github.com/reproio/columnify/releases/tag/v0.1.0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants