-
Notifications
You must be signed in to change notification settings - Fork 38
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
physical memory issues on EMR #18
Comments
Hello. I'm not aware of any memory leak in current 4mc. To be honest I never tried it as output of reduce as we've been using it directly in spark as final output format, and it's ages faster than standard map/reduce, and btw it's not leaking even there. Carlo |
Thanks! It happens when I'm running on larger amounts of data (still not large enough... just a few 10s of GBs). I'm using 4mz (not 4mc) as the compression of the final output. I basically stream files through one end and compressing them out to another place on the other end. If you have any suggestions it'll be great! |
@carlomedas still having this issue. Can you suggest a solution? It happens even when running with smaller amounts of data (a few hundred MBs). Note: we use this codec as the output format: |
does it fail only with 4mz or also 4mc? |
@carlomedas thanks! We only tried 4mz because we needed a splittable compression that is better than Snappy. Can you offer a different compression until it'll be ready? Maybe something that comes natively with Hadoop? |
@carlomedas also tested FourMcMediumCodec and it's still failing :( |
I guess it's not working properly with EMR. What version of hadoop do you have there and can you better describe how your job is composed? |
Hadoop distribution: Amazon 2.7.3 (probably based on Hadoop 2.7.3) These is the step configuration:
It's not spark ... it's a only hadoop streaming on EMR. It must be something with the compression b/c the moment I change back to BZip codec it works fine. Of course I don't want to use BZip (VERY slow) and I can't find any other good compression codec that supports streaming.
Thank you very much for your help. I really want it to work b/c it'll be a pain for us to change compression again. |
As far as I understood you are using it to store output of reduce, each reduce is writing to a 4mz/4mz file. Is it failing in standard OOM or in direct memory buffer related OOM? |
Thanks for the info! Do you mean setting it for mapper or reducer?
|
Try both please, they will not harm. But if you are compressing with 4mz only on final reduce stage, it could stay only on reducer. |
I tried setting it on both like this:
But I'm getting an error: I'm trying to figure out how it should be done in hadoop streaming. |
I tried this I don't understand why it doesn't start using virtual memory once physical memory is "filled up". |
can you please paste the full exception? |
Container [pid=19884,containerID=container_1482229402435_0001_01_000035] is running beyond physical memory limits. Current usage: 4.0 GB of 4 GB physical memory used; 6.4 GB of 20 GB virtual memory used. Killing container. Dump of the process-tree for container_1482229402435_0001_01_000035 : |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE |- 19892 19884 19884 19884 (java) 1551 216 6754443264 1052796 /usr/lib/jvm/java-openjdk/bin/java -Djava.net.preferIPv4Stack=true -Dhadoop.metrics.log.level=WARN -Xmx3277m -Djava.io.tmpdir=/mnt/yarn/usercache/hadoop/appcache/application_1482229402435_0001/container_1482229402435_0001_01_000035/tmp -Dlog4j.configuration=container-log4j.properties -Dyarn.app.container.log.dir=/var/log/hadoop-yarn/containers/application_1482229402435_0001/container_1482229402435_0001_01_000035 -Dyarn.app.container.log.filesize=0 -Dhadoop.root.logger=INFO,CLA -Dhadoop.root.logfile=syslog -Dyarn.app.mapreduce.shuffle.logger=INFO,shuffleCLA -Dyarn.app.mapreduce.shuffle.logfile=syslog.shuffle -Dyarn.app.mapreduce.shuffle.log.filesize=0 -Dyarn.app.mapreduce.shuffle.log.backups=0 org.apache.hadoop.mapred.YarnChild 10.0.4.82 42967 attempt_1482229402435_0001_r_000001_0 35 |- 19955 19892 19884 19884 (split_raw_reduc) 460 70 19734528 3565 /mnt/yarn/usercache/hadoop/appcache/application_1482229402435_0001/container_1482229402435_0001_01_000035/./split_raw_reducer |- 19960 19955 19884 19884 (split_raw_reduc) 0 0 12095488 1824 /mnt/yarn/usercache/hadoop/filecache/13/split_raw_reducer |- 19884 19882 19884 19884 (bash) 0 0 115806208 673 /bin/bash -c /usr/lib/jvm/java-openjdk/bin/java -Djava.net.preferIPv4Stack=true -Dhadoop.metrics.log.level=WARN -Xmx3277m -Djava.io.tmpdir=/mnt/yarn/usercache/hadoop/appcache/application_1482229402435_0001/container_1482229402435_0001_01_000035/tmp -Dlog4j.configuration=container-log4j.properties -Dyarn.app.container.log.dir=/var/log/hadoop-yarn/containers/application_1482229402435_0001/container_1482229402435_0001_01_000035 -Dyarn.app.container.log.filesize=0 -Dhadoop.root.logger=INFO,CLA -Dhadoop.root.logfile=syslog -Dyarn.app.mapreduce.shuffle.logger=INFO,shuffleCLA -Dyarn.app.mapreduce.shuffle.logfile=syslog.shuffle -Dyarn.app.mapreduce.shuffle.log.filesize=0 -Dyarn.app.mapreduce.shuffle.log.backups=0 org.apache.hadoop.mapred.YarnChild 10.0.4.82 42967 attempt_1482229402435_0001_r_000001_0 35 1>/var/log/hadoop-yarn/containers/application_1482229402435_0001/container_1482229402435_0001_01_000035/stdout 2>/var/log/hadoop-yarn/containers/application_1482229402435_0001/container_1482229402435_0001_01_000035/stderr Container killed on request. Exit code is 143 Container exited with a non-zero exit code 143 |
To be honest I don't know how to help you on this as I'm not familiar with hadoop streaming PipeMapRed and don't know how the EMR is tuned on AWS. While the error you reported above in previous comment is from YARN that is automatically killing the container since it exceeded the max allowed memory. Did you try already to increase it, just to see if it helps? I'm talking about the setting of 4 GB physical. |
@carlomedas I did try to increase it as much as possible (hadoop allows up to ~5gb on a c4.xlarge instance) but still it fails. Something interesting is that when I run it with Maybe the only solution is really to start using larges instances that have more memory ... but it also costs more. |
What do you mean by |
It buffers chunks up to 4 MB and then compresses and writes. |
So it's really weird that it fails ... Either there's a memory leak or something with my config is wrong. Why would it succeed with BZ2 and fail with 4mc ? |
@carlomedas we have a theory ... does it keep 4MB per open file that the reducer needs to compress? |
Yes exactly, and I agree with you: that's the reason. |
@carlomedas No I don't have a lot of reducers .. just a lot of files are created from the 7 reducers that I have. I mean, the output consists of 1000s of files. |
@carlomedas I think we've figured out the problem and it looks like it is related to the output consisting of A LOT of files. You said that you keep 4MB of memory that you compress and print. The thing is that when we want to output ~4000 files that are all less than 4MB then it keeps 4MB*4000 in memory which is about 16GB of memory and it crashes the job. About the ~4000 files: We use a custom output format that separates the data according to different params in the input. Sometimes it causes the output to consist of a lot of very small files which is fine with us b/c we need it split into curtain folders. Do you have an idea how this can be solved? Maybe we can keep 1MB instead of 4MB? Will it cause a performance issue? |
I see now. It was not designed to work with such use cases, so I don't think it's easily fixable for this non usual big-data use case. |
@carlomedas :( Do you have a suggested compression method that is splittable and can work for this use case? |
A contributor, @advancedxy , added compression/decompression for standard zstd (i.e. no 4mc). As far as I remember that is just 128KB and not 4MB. |
Thanks. What's the difference with/without mc? Is it not splittable now? |
ZstCodec is not splittable and uses each ~128KB for raw input buffer and compressed buffer. @refaelos When you are generating so many small files, there is no point to keep them splittable. You need to combine them rather than splitting. |
As for the 4MB compress buffer, it should be possible to make it configurable with some additional work. But I don't have the spare time now and I don't think your case justify it. However, the 4MB buffer makes it unpractical to use it in the mapreduce.map.output.compress, as the reduce task may have a lot of copier threads(80 for example) to fetch map output. That's 80 * (4+4) = 640MB additional memory overhead. But that's when ZstCodec comes in handy. |
Yes, as matter of fact I did remove the configurability of the buffer because of some issue we had on the native part, that unfortunately I did not have time to follow up and fix. |
@advancedxy thanks! The reason why we keep so many files is that we separate files per day and category. We need this to be able to have an s3 storage that we can invoke some processes on top of. The processes we invoke are running on a combination of that day+category. The thing is that not all files are small... Some might get to 1gb or more (compressed). I think we want that splittable... Don't you? |
After reviewing the source code of MultipleTextOutputFormat, one possible solution is to increase the number of reducers, then one reducer can process a smaller number of files and run faster |
@advancedxy yeah but still ... there will be less mappers so it'll be slow. |
What do you mean by less mappers?
The total number of mappers should be same for same input. The concurrent
number of mappers should be determined by your settings and bound to your
available resources.
The number of reducers shouldn't affect that. The only overhead with more
reducers is containers and JVMs startup, and that should be small.
…On Fri, 23 Dec 2016 at 2:47 AM Refael Dakar ***@***.***> wrote:
@advancedxy <https://github.com/advancedxy> yeah but still ... there will
be less mappers so it'll be slow.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#18 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AAxScSvfqGfnUb_zNzTiCgtk1kNKdHAwks5rKsW4gaJpZM4K4Fe8>
.
|
@advancedxy I mean that if there's a big file and it's not splittable then the entire file will be handled by one mapper. I hope I'm not wrong on this one ... |
Increase the number of reducers to avoid oom while still using
FourMzMediumCodec.
…On Fri, 23 Dec 2016 at 5:34 PM Refael Dakar ***@***.***> wrote:
@advancedxy <https://github.com/advancedxy> I mean that if there's a big
file and it's not splittable then the entire file will be handled by one
mapper. I hope I'm not wrong on this one ...
That's the whole idea of splitting ... isn't it?
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#18 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AAxScYmwrb3qcjlmZbbcc2LDSm4w_PZrks5rK5WUgaJpZM4K4Fe8>
.
|
@refaelos @advancedxy @carlomedas I may be a little late to the party, this is can be solved another way by setting the yarn memory overhead to be higher. The container itself has 4GB of memory. And JVM has 3277MB of heap. That only leaves 819MB of overhead. In CDH distro, the default overhead is 20%. (0.8 * 4096 = 3277). So you're eating up most of the overhead with your direct buffers and there isn't enough additional overhead for the other off-heap usages. I suspect that if you leave the heap as is and set |
@advancedxy getting back to this one. So you're saying increase number of reducers without increasing number of nodes or memory configurations? On MR2 - increasing number of reducers means decreasing memory limits with |
Any solution here I am seeing the same issue on my end. |
@DataWanderer I wish there was. The problem is that when your reducer is creating too many output files, 4MC needs to use a lot of memory to handle them. So it'll probably fail. |
This is an old issue...
Yes, increase the number of reducers: the more reducers, the less memory requirement for one reducer task. And there should be capacity setting. Let's say, you job has 100 reducers and your resource supports you run the job concurrently, then increase the number of reducers to 200, but set your job reducer's capacity to 100, there will be only 100 reducers running concurrently. |
@advancedxy thanks. Increasing the number of reducers means adding more machines to the cluster? If yes, This makes the entire process more expensive to run. |
No. Number of reducers should be an setting of your MR Job. It's a logical concept |
@advancedxy ok got it. I'll take a look. |
@carlomedas I see you haven't released after the merge of @advancedxy PR. Do you plan to do so? |
Yes it's in my plan but I'm travelling and not be from here to recompile the native on all platforms, which is needed for a release. |
@carlomedas maybe you can help me here ...
When I'm running my EMR mapreduce and want the output compressed with 4mz I get multiple errors like:
... is running beyond physical memory limits. Current usage: 4.6 GB of 4 GB physical memory used; 6.3 GB of 20 GB virtual memory used. Killing container. ...
I tried increasing the memory limit and I still get these errors (only later on the reducer processing).
Do you have an idea why the moment I started compressing with 4mz I started getting these errors? (When I compressed with lzo or bz I wasn't getting it)
Thanks!
The text was updated successfully, but these errors were encountered: