-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement stream-based input record decoder's instead of batch-based #52
Conversation
It's from #43 |
Codecov Report
@@ Coverage Diff @@
## master #52 +/- ##
===========================================
+ Coverage 70.05% 82.77% +12.71%
===========================================
Files 19 15 -4
Lines 875 505 -370
===========================================
- Hits 613 418 -195
+ Misses 203 65 -138
+ Partials 59 22 -37
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report at Codecov.
|
@@ -103,11 +102,12 @@ func (c *parquetColumnifier) WriteFromFiles(paths []string) (int, error) { | |||
var n int | |||
|
|||
for _, p := range paths { | |||
data, err := ioutil.ReadFile(p) | |||
f, err := os.Open(p) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this PR makes iterating processes reading some blocks of the file instead of reading the whole file. That's good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NOTE it has a breaking change; Columnifier interface stops supporting io.WriterCloser
I agree with it. But it's better to write a note about a breaking change in README.
Wait to merge this until other reviewers will approve since the patch size is large. |
@t2y I added |
Converting 223MB msgpack to parquest same as #43. |
@okkez It looks like a similar result to mine. It's still not ideal, but anyway better than the latest release. So why don't we apply this change? If you're ok, I would like to publish it as |
This change will reduce memory consumption a lot! Before
After
Here is docker_log.avsc:
I created the file input.jsonl using the following script: # frozen_string_literal: true
require 'json'
require 'logger'
require 'faker'
N = 711150
CONTAINER_IDS = 100.times.map { SecureRandom.hex(32) }
PATHS = %w[/v1/foo /v1/bar /v1/baz]
$stdout.sync = true
logger = Logger.new($stdout)
prng = Random.new(42)
Faker::Config.random = prng
logger.info 'Start sending events'
base_time = Time.now.yield_self do |t|
Time.new(t.year, t.month, t.day + 1, 12)
end
delta = Rational(3600, 10_000_000)
File.open(ARGV[0], 'w') do |f|
N.times do |i|
now = base_time + delta * i
event = {
container_id: CONTAINER_IDS.sample(random: prng),
container_name: '/test-container',
source: 'stdout',
log_time: now.to_i * 10**9 + now.nsec
}
if prng.rand(1000).zero?
event[:log] = %Q|#{Faker::Internet.public_ip_v4_address} - - [#{now.strftime('%d/%b/%Y:%T %z')}] "GET /v2#{PATHS.sample(random: prng)} HTTP/1.1" 404 153 "-" "#{Faker::Internet.user_agent}" "-"|
else
event[:log] = %Q|#{Faker::Internet.public_ip_v4_address} - - [#{now.strftime('%d/%b/%Y:%T %z')}] "GET #{PATHS.sample(random: prng)} HTTP/1.1" 200 #{prng.rand(1000) + 100} "-" "#{Faker::Internet.user_agent}" "-"|
end
f.puts event.to_json
end
end
logger.info 'Sending events is complete' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR looks almost good! As I commented, I'm afraid that you forgot to check some errors. Can you take a look?
@@ -0,0 +1,34 @@ | |||
# v0.1.x |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good changelog 👍
if vv, err := strconv.ParseFloat(v, 64); err == nil { | ||
e[names[i]] = vv | ||
// bool | ||
if v != "0" && v != "1" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't you use the schema information? I'm concerned that the string "t"
is converted true
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer not to touch this place now and will aim it as more common and separated issue from this.
ref. #27 and an actual try #47 (comment)
m := make(map[string]interface{}) | ||
for k, v := range v { | ||
*r = make(map[string]interface{}) | ||
for k, v := range m { | ||
// bool | ||
if v != "0" && v != "1" { | ||
if vv, err := strconv.ParseBool(v); err == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't you use the schema information? I'm concerned that the string "t"
is converted true
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as #52 (comment)
applied some feedbacks :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
I made some suggestions, but they were my misunderstanding 💦 I deleted them. |
Thanks all! I merge it and release the next version! |
It should reduce memory usage on most cases! I checked RSS memory usage with 270 MB sized msgpack encoded file, the highest usage was
341624
kB. Contrary it's2373248
kB at the latest release version v0.0.4 It's a dramatic reduction effect, the value was about 0.15%!Additionally it possibly accelerate the parquet encoding(it just might be affected by the memory reduction I think):
NOTE it has a breaking change;
Columnifier
interface stops supportingio.WriterCloser