Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support parquet columnar format in the aws_s3 sink #1374

Open
binarylogic opened this issue Dec 14, 2019 · 44 comments · May be fixed by #17395
Open

Support parquet columnar format in the aws_s3 sink #1374

binarylogic opened this issue Dec 14, 2019 · 44 comments · May be fixed by #17395
Labels
domain: codecs Anything related to Vector's codecs (encoding/decoding) needs: approval Needs review & approval before work can begin. needs: requirements Needs a a list of requirements before work can be begin sink: aws_s3 Anything `aws_s3` sink related type: enhancement A value-adding code change that enhances its existing functionality.

Comments

@binarylogic
Copy link
Contributor

Similar to #1373 we should support the Parquet format. The parquet format is a columnar format that enables faster and more efficient data access schemes such as column selection and indexing.

Implementation

Unfortunately, I do not have deep experience with this format as I do with ORC, but like everything else, we should start very simple. Fortunately, there appears to be a Rust library that supports basic writing of this data.

@binarylogic binarylogic added sink: aws_s3 Anything `aws_s3` sink related type: enhancement A value-adding code change that enhances its existing functionality. needs: approval Needs review & approval before work can begin. needs: requirements Needs a a list of requirements before work can be begin labels Dec 14, 2019
@LucioFranco
Copy link
Contributor

One issue with that parquet library is that it requires nightly rust https://github.com/sunchao/parquet-rs/blob/master/src/lib.rs#L123 and https://github.com/apache/arrow/blob/master/rust/parquet/src/lib.rs#L18, we can't use these features because we are on the stable compiler.

@binarylogic
Copy link
Contributor Author

Any idea what the timeline is for those to be on stable?

@LucioFranco
Copy link
Contributor

Well really specialization is the last one on the updated apache/arrow but that feature has been in progress since 2016, so we will likely not see it any time soon. Our best bet would be to attempt to remove that feature from that library but I am not sure how hard that would be or if that is even possible.

@camerondavison
Copy link

I think that it still requires nightly, but just throwing into the mix https://github.com/apache/arrow/tree/master/rust/parquet

@binarylogic binarylogic added the domain: codecs Anything related to Vector's codecs (encoding/decoding) label Aug 7, 2020
@davemt
Copy link

davemt commented Dec 3, 2020

@alexgavrisco
Copy link
Contributor

IIRC Apache Arrow now works on stable Rust 🎉 (unless SIMD feature is used). Would it be possible to re-evaluate this one?
In case the team doesn't have bandwidth, but somebody can help with review and some pointers, I can look into it.

@jszwedko
Copy link
Member

Hi @Alexx-G . This hasn't been scheduled yet, but you wanted to pick it up, I'd be happy to advise/review!

@alexgavrisco
Copy link
Contributor

I've checked the parquet crate it seems to have some basic support for writing data.
@binarylogic @jszwedko I see that the ORC issue has some additional context, do you have any insights/pointers for this one? Especially about schema and the changes around the current encoding logic.
I'll try to run some tests with Athena+S3 and parquet to see whether I can get an MVP.

@CorbyAsCode
Copy link

Is there an update on this? I'm very interested in JSON input to parquet output to S3.

@bhataprameya
Copy link

bhataprameya commented Feb 15, 2022

Any update on this? I am also interested in JSON input and Parquet output to s3

@jgournet
Copy link

Any chance this can be looked at ?
Right now, we send logs from vector to Kinesis just to get them converted to Parquet... I'd be happy to skip that costly step

@spencergilbert
Copy link
Contributor

This codec isn't currently on our roadmap, but we could help a community PR get merged 👍

@ktff
Copy link
Contributor

ktff commented Mar 24, 2023

Proposal

Add parquet codec and add support for it in the aws_s3 sink.

This can be achieved with official Rust crate, through Serializer construct, and tied with custom type implementing Encoder<Vec<Event>>.

Each batch would be transformed to a single Parquet file with a single row group. With that, configuration of batch can be used to define desired size of row group/Parquet file.

Schema

Since Parquet requires schema we need to derive one.

Each passing event has it's own implicit schema and by joining them we get a unified one. This unified schema can be:

  1. Specific for each batch.
  2. Build up from batch to batch during runtime.

While 2. option can still result in exported schemas to be different from batch to batch they would have tendency to change less than with 1. option. This is relevant for streams that have events with varying schemas, while for consistent ones both options behave the same.

When joining schemas we can get a conflicting situation. When there are multiple types used for the same field/column some resolution is needed:

  1. Choose one type and drop conflicting events.
  2. Unify types into more general one. This needs to be done for primitive and logical type. For a unifying type, String looks like the best option.

In my opinion, 2. option is better. It's more reliable and we can document this behavior.

Options

Add options:

  • encoding.parquet.encoding which accepts a map between fields and encodings. If encoding isn't supported for the field type, warning would be logged and plain encoding used.
  • encoding.parquet.compression which applies compression on all of the columns.

By default plain encoding and no compression are used.

Alternatives

We can expose an option for users to define their own static schema for passing events which would try to cast or filter out conflicting values.

@fuchsnj
Copy link
Member

fuchsnj commented Mar 30, 2023

Hey @ktff. @spencergilbert and I discussed this a bit. I don't think anyone on the team is really an expert in Parquet, so we have a couple questions.

  1. Today the encoders in Vector can only operate on individual events. We have plans to eventually have a 2nd layer of encoders that can run on batches of events, but that doesn't exist today. So the main question here is, can a batch be built from already encoded events.

  2. What's the reasoning for deriving a schema over having the user provide one (and Vector could ensure it matches). Having the schema potentially change for each batch emitted seems undesirable, but again I don't really know enough about parquet to fully understand the impact here.

@ktff
Copy link
Contributor

ktff commented Mar 31, 2023

Hey @fuchsnj

Regarding 1. question, no, a Parquet batch can't be built from already encoded events. It's necessary to intercept them before that, or process them in a suitable way for Parquet. Fortunately there is a way to to that, by implementing

pub trait Encoder<T> {
for Vec<Event>.

There are similar cases:

Current aws_s3 sink is using

impl Encoder<Vec<Event>> for (Transformer, crate::codecs::Encoder<Framer>) {
which would be replaced by ParquetEncoder when it's configured.


Regarding 2. question, Parquet is a file format that is usually the final/long term destination for storing data that can later be read for queries, by ,for example, Amazon Athena. Such systems will/do encounter differences in schema with time, especially if they are performing federated queries, so they usually have the means to reconcile different schemas. That said, I came to this conclusion via research so comments from those with experience would be much appreciated.

So my main argument is that it's better to have the event reach it's destination and leave it to the user to transform the event into desired schema before or after the sink if they so wish/need to, then to require configuration of fixed schema that drops events which forces those that do have events with varying schema to yet again define the schema in some transformer before this sink to transform the events or to change the fixed schema.

@fuchsnj
Copy link
Member

fuchsnj commented Mar 31, 2023

  1. The impl Encoder<Vec<ProcessedEvent>> is the sink specific encoder. While you could have a setting that replaces this on the sink, this doesn't follow our current conventions, and would only work for that specific sink. What we would prefer (which isn't quite supported yet) is expanding our codecs to add parquet, which would allow this to be used in any sink. The current (single event only) codecs are selected here.

  2. So my main argument is that it's better to have the event reach it's destination and leave it to the user to transform the event into desired schema before or after the sink if they so wish/need to, then to require configuration of fixed schema that drops events which forces those that do have events with varying schema to yet again define the schema in some transformer before this sink to transform the events or to change the fixed schema.

    Our current codecs define a schema_requirement which can type check events at Vector startup to ensure events will match the expected parquet schema. This would prevent events from being dropped at runtime for not matching the schema. (Note that full schema support in Vector is relatively new, and not enabled by default yet (It's the global setting schema.enabled)).

@jszwedko
Copy link
Member

jszwedko commented Mar 31, 2023

For point 1, I'm in agreement with @fuchsnj . It'd be really nice if we could update the codec model to do the batch encoding in a generic way to work across all sinks using codecs. The lack of this came up recently with the avro codec not outputting the schema at the head of each batch (#16994) and would come up with other codecs that require headers like csv.

@fuchsnj
Copy link
Member

fuchsnj commented Mar 31, 2023

Adding support for "batched" codecs is also discussed here: #16637

@ktff
Copy link
Contributor

ktff commented Mar 31, 2023

For point 1., while implementation of parquet would be sink specific encoder at the moment, it can be written to be generic in a sense that it can be reused in a different sink and/or in the batch encoder abstraction/feature once that comes.

Going by #16637 for parquet codec envelope is not enough, it's column vise format so it needs to deconstruct all events to form those columns. That is inline with #16637 (comment) for batch sinks.

If batch encoding was already implemented that would simplify things. Going by impl diversity of RequestBuilder that's not something trivially doable. From what I see there are two parts to it. Abstraction over encoders and abstraction over events. For parquet codec in aws_s3 sink, abstraction over encoders is necessary. Just using Box<dyn Encoder<E>> seems fine for that. The second abstraction is the one that would allow for this codec to be used in other sinks. I would say that that is an orthogonal concern/feature since it would need to unify all types of events in batch sinks into a single thing and then modify, if necessary, all of the codecs to use this unified type. That's a completely other issue.

For point 2. @fuchsnj can you provide some docs for schema_requirement, I can't find it in the docs/website. But that feature would be useful.

@davemt
Copy link

davemt commented Mar 31, 2023

We currently collect logs and store as parquet-on-s3, using fluentd and an internal plugin we manage for the parquet conversion on our aggregators. We are very interested in migrating to Vector and this issue is currently the remaining blocker.

FWIW, in our setup we configure a schema to use for each type of log and we would hope that the Vector implementation would at least support an option for specifying schemas.

@ktff
Copy link
Contributor

ktff commented Apr 10, 2023

@fuchsnj I found the schema_requirement. That seems like exactly what's needed. So we can go with that.

Instead of determining schema during runtime, add option to specify schema for passing events.

@scMarkus
Copy link
Contributor

Happy to see this discussion being continued. We are currently using Vector for Kubernetes cluster log collection but would have other use cases with much higher throughput where Vector is an interesting option. I would like to add some thoughts which would be mandatory to us:

  1. Since we are dividing concerns in multiple micro services some producer (source) shall be able to change a schema in a predefined manner (e.g. adding fields in JSON). Vector (transformer / aggregator) is another service with its own schema. It should simply ignore the additional field of the source without dropping any events. Later Vector may update its schema independently so it will pick up previously added fields send to it. On the other hand if fields defined in Vector but not send by the source they should be null for that event when written to some sink. This relaxed behavior may be configurable to be more restrictive? I am mentioning this specifically since I tested fluentd plugin with coolumnify and it made it difficult to migrate schemas without data loss. Still the configuration options of the fluentd plugin may be a nice hint of what people are expecting?
  2. On the batch encoding (may not be a concern of the current issue) it would be quite powerful to have some additional preprocessing options. For my use case I am thinking of sorting the batch by an array of fields. Utilizing parquets run length encoding (which should be done by the respective parquet writing library?) it can make a rather large difference in output size when sorted by fields with low cardinality or fields which are functional dependent on each other. Considering we are talking about an S3 sink which probably is long time storage this would reduce costs on the one hand side and also improve performance when reading and filtering data with apache spark, trino / athena, etc. In my case i could reduces file sizes by up to 20% but i have not seen this as a feature anywhere so far except the hive syntex. Still the benefits are measured without hive.
  3. What I would be genuinely curious about is how acknowledgements would behave in a batched manner? To stay consistent with the current semantics obviously only when an event gets committed to a sink the acknowledgement would be propagated. But would it be reasonable to consider an event acknowledged towards upstream sources when written to Vectors disk buffer which may be utilized for batching anyway? Surely this is a difficult problem and out of the scope of the parquet encoder. Still I like to spark the thought.

@spencergilbert
Copy link
Contributor

  1. Since we are dividing concerns in multiple micro services some producer (source) shall be able to change a schema in a predefined manner (e.g. adding fields in JSON). Vector (transformer / aggregator) is another service with its own schema. It should simply ignore the additional field of the source without dropping any events. Later Vector may update its schema independently so it will pick up previously added fields send to it. On the other hand if fields defined in Vector but not send by the source they should be null for that event when written to some sink. This relaxed behavior may be configurable to be more restrictive? I am mentioning this specifically since I tested fluentd plugin with coolumnify and it made it difficult to migrate schemas without data loss. Still the configuration options of the fluentd plugin may be a nice hint of what people are expecting?

Yeah, that's interesting - I certainly appreciate wanting to migrations easier. I imagine that could be "manually" done today with remap, checking if your field exists and if not insert a null value. On the flip side, I don't feel great about dropping portions of events that aren't part of the schema, but that matches some existing behavior in csv. An alternative could be erroring those when encoding and being able to route them to a different sink, DLQ style.

  1. On the batch encoding (may not be a concern of the current issue) it would be quite powerful to have some additional preprocessing options. For my use case I am thinking of sorting the batch by an array of fields. Utilizing parquets run length encoding (which should be done by the respective parquet writing library?) it can make a rather large difference in output size when sorted by fields with low cardinality or fields which are functional dependent on each other. Considering we are talking about an S3 sink which probably is long time storage this would reduce costs on the one hand side and also improve performance when reading and filtering data with apache spark, trino / athena, etc. In my case i could reduces file sizes by up to 20% but i have not seen this as a feature anywhere so far except the hive syntex. Still the benefits are measured without hive.

Definitely an interesting feature to keep in mind, we'd definitely want to get the basic implementation in before adding additional tooling to it though 👍

  1. What I would be genuinely curious about is how acknowledgements would behave in a batched manner? To stay consistent with the current semantics obviously only when an event gets committed to a sink the acknowledgement would be propagated. But would it be reasonable to consider an event acknowledged towards upstream sources when written to Vectors disk buffer which may be utilized for batching anyway? Surely this is a difficult problem and out of the scope of the parquet encoder. Still I like to spark the thought.

It's been a while but I feel as though I remember seeing that discussion in the past, if it's not currently the behavior I expect there are arguments that it could be.

@ktff is this still something you're planning to work on?

@ktff
Copy link
Contributor

ktff commented May 8, 2023

@spencergilbert I do. I was on vacation hence the silence. The 1. point raised by @fuchsnj remains uresolved. Simplified, there are two ways forward:

  1. Implement parquet codec only for aws_s3 sink. Once the support for batched codecs lands, that limitation can be lifted.
  2. Wait for batched codecs support and then merge parquet codec.

In both cases, my plan is to implement codec first and in the case of 2. submit it once batched codecs have landed.

@spencergilbert
Copy link
Contributor

Hope you had a nice vacation! Sounds good to me.

@ktff ktff linked a pull request May 15, 2023 that will close this issue
4 tasks
@Kikkon
Copy link

Kikkon commented Aug 9, 2023

@ktff I happened to find a similar solution. If you need help anywhere, I'd be happy to contribute together! If I can figure it out, that is. 🚀

@ktff
Copy link
Contributor

ktff commented Aug 10, 2023

@Kikkon the draft contains functioning parquet codec for aws_s3 sink. It's missing two important things though:

  • Proper dealing with non conforming events. This can be resolved with accepting a performance hit.
  • Support for batched codecs. This is the only blocking issue.

I'm currently not in the position to add support for batched codecs hence the limbo state of #17395. If this is something you feel confident to add to Vector then reach out to @jszwedko. Once that lands I'll be able to finish the PR and get it merged.

@Kikkon
Copy link

Kikkon commented Aug 11, 2023

@Kikkon the draft contains functioning parquet codec for aws_s3 sink. It's missing two important things though:

  • Proper dealing with non conforming events. This can be resolved with accepting a performance hit.
  • Support for batched codecs. This is the only blocking issue.

I'm currently not in the position to add support for batched codecs hence the limbo state of #17395. If this is something you feel confident to add to Vector then reach out to @jszwedko. Once that lands I'll be able to finish the PR and get it merged.

@ktff I have some experience using Parquet, but am not familiar with Vector. The issue with this PR is: Parquet does not support append writes. If appending, it may require merging new and old files which has performance costs. However, batch codecs implementation does not yet exist in Vector. So for now the PR is pending. if right?

@jszwedko The Vector community has plans for a proposal to support batch codecs. Perhaps once I become more familiar with Vector's architecture, I can discuss with everyone how we could add batch codecs.

@jszwedko
Copy link
Member

@jszwedko The Vector community has plans for a proposal to support batch codecs. Perhaps once I become more familiar with Vector's architecture, I can discuss with everyone how we could add batch codecs.

Hey! Yes, we would like to add the concept of batches to codecs but haven't been able to prioritize it on our side just yet. We'd be happy to help guide a contribution for it. I believe @lukesteensen would have some thoughts about what it could look like and also be able to answer questions.

@Kikkon
Copy link

Kikkon commented Aug 13, 2023

Can you give me some advice? @lukesteensen 🫡

@ktff
Copy link
Contributor

ktff commented Aug 14, 2023

The issue with this PR is: Parquet does not support append writes. If appending, it may require merging new and old files which has performance costs. However, batch codecs implementation does not yet exist in Vector. So for now the PR is pending. if right?

@Kikkon it's not doable with regular codecs, but if it would be then yes it would have performance cost. Also one thing to note, currently the PR does add limited support for batch codecs to support Parquet but it's hacky and so not something that can be merged. The goal is to replace this with proper support.

@Kikkon
Copy link

Kikkon commented Aug 15, 2023

The issue with this PR is: Parquet does not support append writes. If appending, it may require merging new and old files which has performance costs. However, batch codecs implementation does not yet exist in Vector. So for now the PR is pending. if right?

@Kikkon it's not doable with regular codecs, but if it would be then yes it would have performance cost. Also one thing to note, currently the PR does add limited support for batch codecs to support Parquet but it's hacky and so not something that can be merged. The goal is to replace this with proper support.

Got it, thank you!

@lukesteensen
Copy link
Member

@Kikkon that's correct, you can't really do this right now with the current design of our codecs crate. It's something I would love to enable, but I haven't had time to figure out a good path forward. It will likely take quite a bit of refactoring and design work. I'm hoping we'll be able to tackle that before long, as I agree this is a feature we should have!

@Kikkon
Copy link

Kikkon commented Aug 22, 2023

@lukesteensen Is there a corresponding roadmap currently? Are there any parts that I can participate in?

@rstml
Copy link

rstml commented Nov 17, 2023

This is a really big blocker for us. Any plans to make this happen?

@Ralkion
Copy link

Ralkion commented Nov 17, 2023

@rstml : We've used the patch provided by @ktff and applied it ourselves to a release branch (can't remember which one off the top of my head). It's been working fine for us for a while now. It only works for S3, but it does work.

@rstml
Copy link

rstml commented Nov 17, 2023

@Ralkion thanks! that's helpful. Will give it a try.

I got an impression that #16637 was a blocker, but it seems that we should be able to define metadata during consumption.

Why not to merge #17395 then? If there's a support for Avro that works without headers, Parquet should be acceptable too?

@Ralkion
Copy link

Ralkion commented Nov 17, 2023

My understanding from the discussion above (and others may correct me if I'm over-simplifying) is that @ktff 's patch only works for S3, and only works in a certain way that isn't necessarily applicable to other output destinations. Because it can't be generalized to other destinations it's not really 'production ready'.

@bruceg
Copy link
Member

bruceg commented Nov 17, 2023

@rstml We have not been able to prioritize it codec batches at this point, nor is it likely in the near future, unfortunately.

@ktff
Copy link
Contributor

ktff commented Nov 18, 2023

@rstml it's as @Ralkion said, this codec is currently accessible only through aws_s3 sink. For it to be a general codec for all sinks codec batches need to be implemented but they aren't on the roadmap for the near future as @bruceg mentioned.

@fpytloun
Copy link
Contributor

fpytloun commented May 7, 2024

I am also very interested in this feature. Going to be on roadmap anytime soon?

@yjagdale
Copy link

@jszwedko - any possibility to get this in roadmap?

@bharathiram
Copy link

Any update on this feature?

@pront
Copy link
Member

pront commented Jan 3, 2025

We are aware there is a lot of demand for this feature but it's not something we currently have in our roadmap. We would be happy to review a community contribution.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
domain: codecs Anything related to Vector's codecs (encoding/decoding) needs: approval Needs review & approval before work can begin. needs: requirements Needs a a list of requirements before work can be begin sink: aws_s3 Anything `aws_s3` sink related type: enhancement A value-adding code change that enhances its existing functionality.
Projects
None yet
Development

Successfully merging a pull request may close this issue.