Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

"Live" data import with Fluentd #4271

Closed
philrz opened this issue Dec 15, 2022 · 6 comments · Fixed by #5190 or #5195
Closed

"Live" data import with Fluentd #4271

philrz opened this issue Dec 15, 2022 · 6 comments · Fixed by #5190 or #5195

Comments

@philrz
Copy link
Contributor

philrz commented Dec 15, 2022

In an exercise similar to #3151 with Logstash, I've run a test with external agent Fluentd to push "live" data continuously to a Zed lake.

tl;dr - As Fluentd is a more modern tool than Logstash, in the end it did indeed seem to play more nicely with Zed in a minimal out-of-the-box config.

The test was performed with a Zed lake running behind Zui insiders 0.30.1-142 (which is Zed commit 101d358), Fluentd 1.15.3, and Zeek 5.1.1 (with the json-streaming-logs package) sniffing the coincidental live local traffic on my wireless interface during a typical work-from-home day.

The fluentd.conf I ultimately settled on:

<source>
  @type tail
  path /usr/local/zeek-5.1.1/logs/current/json_streaming_*
  follow_inodes true
  pos_file /Users/phil/work/fluentd/fluentd.pos
  tag zeek
  <parse>
    @type json
  </parse>
</source>

<match zeek>
  @type http
  endpoint http://127.0.0.1:9988/pool/zeek/branch/main
  content_type application/json

  <format>
    @type json
  </format>
</match>

Then to start Fluentd:

$ ulimit -n 65535
$ fluentd -c fluentd.conf 

The ulimit guidance came from the "Before Installation" docs and did end up being significant, as the default setting on my Macbook was 256 and in an early run Fluentd did indeed complain of running out of file descriptors.

The only minor speedbump I hit along the way was needing to specify content_type application/json. At first after reading the relevant docs I thought the defaults would work since the doc explains that json_array is false by default and hence it posts in NDJSON (which zed serve accepts) and sets Content Type to application/x-ndjson. However, this caused an HTTP 400 error because the Zed backend handles both regular JSON and NDJSON through the same reader and this is wired up only through Content Type application/json, so I had to set that explicit content_type.

Other than that, the defaults are way more friendly than what we saw with Logstash. As #3151 gets into, Logstash's default behavior was to either send a single JSON object per post or be configured to send a JSON array of objects. The former is known to perform poorly (#4266) and with the latter the posts end up in the pool as whole arrays and hence currently need to be post-processed in some way if they're going to be treated as individual Zed records. By comparison, Fluentd's default behavior not only sends NDJSON but also does so at a flush_interval of 60s, which means more records per commit and hence better performance. There's also a lot of tuning parameters at that page that indicate users have a lot more flexibility at their disposal if these defaults are undesirable.

Having let it run for a couple hours, 7523 Zeek events have accumumulated in my pool, and here's a look at the metadata that shows object size/count:

$ zed query -z 'from zeek@main:objects'
{id:0x10281d7b0f05a343fe5296fe38a4604340a312da(=ksuid.KSUID),meta:{first:"2022-12-15T00:33:54.369077Z",last:"2022-12-15T00:31:38.906444Z",count:55(uint64),size:4195}(=data.Meta)}(=data.Object)
{id:0x102815f489f922f3d2c91aa854ca9a1daa22a9ee(=ksuid.KSUID),meta:{first:"2022-12-15T00:01:36.401594Z",last:"2022-12-14T23:36:13.903690Z",count:66(uint64),size:4248}(=data.Meta)}(=data.Object)
{id:0x1028167ce874f348a0b2b3bd50a80d13365db4ee(=ksuid.KSUID),meta:{first:"2022-12-15T00:04:01.581654Z",last:"2022-12-14T23:58:46.339061Z",count:80(uint64),size:5928}(=data.Meta)}(=data.Object)
{id:0x10281aaa1cb9e06d8ee6cdc53391cd6feac65216(=ksuid.KSUID),meta:{first:"2022-12-15T00:21:55.553981Z",last:"2022-12-15T00:16:02.116655Z",count:57(uint64),size:4240}(=data.Meta)}(=data.Object)
{id:0x10281318731c08fb28a40deaaf75c62386cb2d30(=ksuid.KSUID),meta:{first:"2022-12-14T23:49:36.320922Z",last:"2022-12-14T23:47:25.507561Z",count:67(uint64),size:4598}(=data.Meta)}(=data.Object)
{id:0x10280eafda094eeb2a72b43a25f20acc5ce290c7(=ksuid.KSUID),meta:{first:"2022-12-14T23:30:41.635227Z",last:"2022-12-14T23:24:14.233836Z",count:51(uint64),size:3814}(=data.Meta)}(=data.Object)
{id:0x10281bb0893b3e78776ab521e967eadf3af98680(=ksuid.KSUID),meta:{first:"2022-12-15T00:26:00.496052Z",last:"2022-12-15T00:13:58.991809Z",count:107(uint64),size:6939}(=data.Meta)}(=data.Object)
{id:0x10281d3899c455779dea527350e3425204d56647(=ksuid.KSUID),meta:{first:"2022-12-15T00:32:39.517490Z",last:"2022-12-15T00:15:30.676887Z",count:61(uint64),size:4406}(=data.Meta)}(=data.Object)
{id:0x1028195d6d2753600cfb04852c0ab53297d7d210(=ksuid.KSUID),meta:{first:"2022-12-15T00:16:23.722289Z",last:"2022-12-15T00:13:47.880190Z",count:142(uint64),size:8860}(=data.Meta)}(=data.Object)
{id:0x102813e2e6d1a9be970674aea46ba794ab38505d(=ksuid.KSUID),meta:{first:"2022-12-14T23:53:05.837437Z",last:"2022-12-14T23:50:24.830662Z",count:42(uint64),size:2835}(=data.Meta)}(=data.Object)
{id:0x1028190caf5bb74b52c1a5f0a22d9dddbcffbb35(=ksuid.KSUID),meta:{first:"2022-12-15T00:14:38.921771Z",last:"2022-12-15T00:06:56.790906Z",count:67(uint64),size:4451}(=data.Meta)}(=data.Object)
{id:0x10281aebb786aa1c2d12018f8aabfdc36eb58fc9(=ksuid.KSUID),meta:{first:"2022-12-15T00:23:02.237135Z",last:"2022-12-15T00:21:07.734245Z",count:149(uint64),size:10789}(=data.Meta)}(=data.Object)
{id:0x102810f799f3aef602b047e881573b2386f93577(=ksuid.KSUID),meta:{first:"2022-12-14T23:40:36.288578Z",last:"2022-12-14T23:26:59.060221Z",count:82(uint64),size:5420}(=data.Meta)}(=data.Object)
{id:0x102818ca249e2221d4c37a8b7edb65d0b7e751b9(=ksuid.KSUID),meta:{first:"2022-12-15T00:13:59.395104Z",last:"2022-12-15T00:11:47.185874Z",count:64(uint64),size:4479}(=data.Meta)}(=data.Object)
{id:0x10280faf96b5ea336207d34a980d3d1c50b9c079(=ksuid.KSUID),meta:{first:"2022-12-14T23:34:49.089985Z",last:"2022-12-14T23:24:12.728754Z",count:96(uint64),size:7282}(=data.Meta)}(=data.Object)
{id:0x1028135f7642ceda3e3c7fdfac16a89920838080(=ksuid.KSUID),meta:{first:"2022-12-14T23:50:36.789345Z",last:"2022-12-14T23:23:55.939031Z",count:57(uint64),size:4065}(=data.Meta)}(=data.Object)
{id:0x10280f6e6c4b1478313b60adbe2e03059a61988d(=ksuid.KSUID),meta:{first:"2022-12-14T23:33:58.709723Z",last:"2022-12-14T23:24:16.419988Z",count:70(uint64),size:4209}(=data.Meta)}(=data.Object)
{id:0x10281a2738714dcd940d567fd7cfc9edba5c9cd6(=ksuid.KSUID),meta:{first:"2022-12-15T00:19:40.686016Z",last:"2022-12-15T00:14:04.289765Z",count:60(uint64),size:4365}(=data.Meta)}(=data.Object)
{id:0x10281e030145e71c84e9b343384a4ccd9379f9a9(=ksuid.KSUID),meta:{first:"2022-12-15T00:36:04.653440Z",last:"2022-12-15T00:32:37.305235Z",count:99(uint64),size:5285}(=data.Meta)}(=data.Object)
{id:0x10281a68855c9701915c102c854ad9821db06f8d(=ksuid.KSUID),meta:{first:"2022-12-15T00:20:50.017824Z",last:"2022-12-15T00:16:01.573861Z",count:104(uint64),size:7887}(=data.Meta)}(=data.Object)
{id:0x10281c33bcd9eb58dd17bcc8052e6cca62b493b1(=ksuid.KSUID),meta:{first:"2022-12-15T00:28:22.641059Z",last:"2022-12-15T00:22:24.852142Z",count:70(uint64),size:5887}(=data.Meta)}(=data.Object)
{id:0x10280ff1b203645bd50cf056858c157227166f32(=ksuid.KSUID),meta:{first:"2022-12-14T23:36:15.207764Z",last:"2022-12-14T23:24:13.058901Z",count:89(uint64),size:7917}(=data.Meta)}(=data.Object)
{id:0x1028188878a3cf9bd2a6265637ecaaea2946cc2d(=ksuid.KSUID),meta:{first:"2022-12-15T00:12:55.221145Z",last:"2022-12-15T00:09:03.450834Z",count:50(uint64),size:4568}(=data.Meta)}(=data.Object)
{id:0x1028200e566e5b0274981f56b59ac002a2607b9e(=ksuid.KSUID),meta:{first:"2022-12-15T00:44:36.617342Z",last:"2022-12-15T00:37:33.515044Z",count:43(uint64),size:3181}(=data.Meta)}(=data.Object)
{id:0x10281c74dd87d80501d6f1447c618277fc1d7de6(=ksuid.KSUID),meta:{first:"2022-12-15T00:29:36.543174Z",last:"2022-12-15T00:18:07.726186Z",count:93(uint64),size:5721}(=data.Meta)}(=data.Object)
{id:0x102817c9cedd477a6144a75c58cc0adcc3983d0d(=ksuid.KSUID),meta:{first:"2022-12-15T00:09:37.863995Z",last:"2022-12-15T00:06:17.892402Z",count:74(uint64),size:4722}(=data.Meta)}(=data.Object)
{id:0x102812956425670086d023734747f1240d362349(=ksuid.KSUID),meta:{first:"2022-12-14T23:47:25.497124Z",last:"2022-12-14T23:44:53.188852Z",count:59(uint64),size:3580}(=data.Meta)}(=data.Object)
{id:0x102814239c83873459591c0486bc9ef17d632259(=ksuid.KSUID),meta:{first:"2022-12-14T23:53:54.960568Z",last:"2022-12-14T23:48:36.878593Z",count:85(uint64),size:5183}(=data.Meta)}(=data.Object)
{id:0x102810744e6db746bab27b72910c430ac659ec6f(=ksuid.KSUID),meta:{first:"2022-12-14T23:38:07.649214Z",last:"2022-12-14T23:23:54.665245Z",count:69(uint64),size:4572}(=data.Meta)}(=data.Object)
{id:0x102810329aa3670ea85ee553104ec911524bd305(=ksuid.KSUID),meta:{first:"2022-12-14T23:37:06.508527Z",last:"2022-12-14T23:24:16.757055Z",count:55(uint64),size:4388}(=data.Meta)}(=data.Object)
{id:0x10281b2d27b326d4aa9d0596829a5403e9811119(=ksuid.KSUID),meta:{first:"2022-12-15T00:24:05.366952Z",last:"2022-12-15T00:01:09.722919Z",count:127(uint64),size:9169}(=data.Meta)}(=data.Object)
{id:0x102813a185f347c8eca873ef93ad2928bed22507(=ksuid.KSUID),meta:{first:"2022-12-14T23:51:41.267126Z",last:"2022-12-14T23:23:37.202062Z",count:59(uint64),size:5991}(=data.Meta)}(=data.Object)
{id:0x102815713cf94e52da146bb4b01bea7068092da4(=ksuid.KSUID),meta:{first:"2022-12-14T23:59:36.393177Z",last:"2022-12-14T23:57:32.147785Z",count:89(uint64),size:6226}(=data.Meta)}(=data.Object)
{id:0x1028199f3b4a3d3fe174d4d5c388e39fcee942fc(=ksuid.KSUID),meta:{first:"2022-12-15T00:17:25.834405Z",last:"2022-12-15T00:15:31.970388Z",count:99(uint64),size:5415}(=data.Meta)}(=data.Object)
{id:0x10280ef07498a13017c3f72b4814716bd5a8b8d7(=ksuid.KSUID),meta:{first:"2022-12-14T23:31:57.672281Z",last:"2022-12-14T23:23:47.391406Z",count:115(uint64),size:10644}(=data.Meta)}(=data.Object)
{id:0x10281dc16398b7dfd2fe1601dd4fc5d06ac32431(=ksuid.KSUID),meta:{first:"2022-12-15T00:35:06.089253Z",last:"2022-12-15T00:27:50.310906Z",count:67(uint64),size:4630}(=data.Meta)}(=data.Object)
{id:0x10281bf2f364aefb042f757d3402681d177446d6(=ksuid.KSUID),meta:{first:"2022-12-15T00:27:01.122826Z",last:"2022-12-15T00:22:23.086189Z",count:64(uint64),size:2973}(=data.Meta)}(=data.Object)
{id:0x102810b57b2a75f0c4f843a8a2a950f91505c254(=ksuid.KSUID),meta:{first:"2022-12-14T23:39:20.038697Z",last:"2022-12-14T23:24:19.610793Z",count:98(uint64),size:5979}(=data.Meta)}(=data.Object)
{id:0x1028120294a6bb54036d4c82e7968adbbba8414d(=ksuid.KSUID),meta:{first:"2022-12-14T23:44:57.519133Z",last:"2022-12-14T23:23:40.779648Z",count:65(uint64),size:5148}(=data.Meta)}(=data.Object)
{id:0x102812d72523ae00b60987cbb575a72469ecc77e(=ksuid.KSUID),meta:{first:"2022-12-14T23:48:36.320776Z",last:"2022-12-14T23:44:38.466381Z",count:91(uint64),size:4993}(=data.Meta)}(=data.Object)
{id:0x10280da9410592ff805786f1fa76d26eb22845ff(=ksuid.KSUID),meta:{first:"2022-12-14T23:26:32.092939Z",last:"2022-12-14T23:24:12.664567Z",count:420(uint64),size:33356}(=data.Meta)}(=data.Object)
{id:0x1028163baa13feae4272420cbae287e852b522c6(=ksuid.KSUID),meta:{first:"2022-12-15T00:02:51.618986Z",last:"2022-12-15T00:00:18.357330Z",count:43(uint64),size:3068}(=data.Meta)}(=data.Object)
{id:0x102816ffc6f45ad917a48863e720c811d2e4858f(=ksuid.KSUID),meta:{first:"2022-12-15T00:06:17.884326Z",last:"2022-12-15T00:02:51.593496Z",count:76(uint64),size:4223}(=data.Meta)}(=data.Object)
{id:0x10281cb63d125bce69a5d1c3ff23c5a23d5a9935(=ksuid.KSUID),meta:{first:"2022-12-15T00:30:39.326320Z",last:"2022-12-15T00:24:05.339656Z",count:54(uint64),size:4006}(=data.Meta)}(=data.Object)
{id:0x10281805f4da43015e3c2cd1f6529cd9433068fb(=ksuid.KSUID),meta:{first:"2022-12-15T00:10:39.101937Z",last:"2022-12-15T00:08:20.966425Z",count:94(uint64),size:6081}(=data.Meta)}(=data.Object)
{id:0x1028117ae81f92957be828fb380ea87c1e31915c(=ksuid.KSUID),meta:{first:"2022-12-14T23:42:46.337119Z",last:"2022-12-14T23:38:14.970666Z",count:66(uint64),size:4657}(=data.Meta)}(=data.Object)
{id:0x10280deab296e587e50926bd9073ccfdfdad05f7(=ksuid.KSUID),meta:{first:"2022-12-14T23:27:36.864611Z",last:"2022-12-14T23:23:36.832279Z",count:189(uint64),size:15116}(=data.Meta)}(=data.Object)
{id:0x10281f8ce3043f8a24ea6448c05b0ba2ba9336a5(=ksuid.KSUID),meta:{first:"2022-12-15T00:42:36.611466Z",last:"2022-12-14T23:36:13.896447Z",count:55(uint64),size:4847}(=data.Meta)}(=data.Object)
{id:0x10281b6e32dd1fa2e27e893db79982c7adaf4980(=ksuid.KSUID),meta:{first:"2022-12-15T00:24:59.057296Z",last:"2022-12-15T00:09:42.396771Z",count:75(uint64),size:5777}(=data.Meta)}(=data.Object)
{id:0x102816b8f40f958c79c614ffe350488567ed0ed5(=ksuid.KSUID),meta:{first:"2022-12-15T00:05:07.322537Z",last:"2022-12-15T00:03:14.277248Z",count:45(uint64),size:3267}(=data.Meta)}(=data.Object)
{id:0x10281f4ac1a0148ffb16d69ccad117174e5943ee(=ksuid.KSUID),meta:{first:"2022-12-15T00:41:43.802271Z",last:"2022-12-15T00:39:07.101302Z",count:63(uint64),size:4517}(=data.Meta)}(=data.Object)
{id:0x1028146ad8046970038fff09cb8bb6098f6e6430(=ksuid.KSUID),meta:{first:"2022-12-14T23:55:18.123145Z",last:"2022-12-14T23:49:05.166232Z",count:52(uint64),size:4708}(=data.Meta)}(=data.Object)
{id:0x102817465357b8452efff0523797909cfe3a12b4(=ksuid.KSUID),meta:{first:"2022-12-15T00:07:14.611369Z",last:"2022-12-15T00:05:16.962595Z",count:105(uint64),size:5311}(=data.Meta)}(=data.Object)
{id:0x10281f09903cc45af868de8c4c55268a554eec0b(=ksuid.KSUID),meta:{first:"2022-12-15T00:40:36.597157Z",last:"2022-12-15T00:30:16.617628Z",count:71(uint64),size:4556}(=data.Meta)}(=data.Object)
{id:0x10281ec728a7971a45505307b653937ffe72c834(=ksuid.KSUID),meta:{first:"2022-12-15T00:39:33.545691Z",last:"2022-12-15T00:23:25.942687Z",count:91(uint64),size:6931}(=data.Meta)}(=data.Object)
{id:0x10281cf78b4811f22c6aac60bd7f15be3e7e2d61(=ksuid.KSUID),meta:{first:"2022-12-15T00:31:36.559041Z",last:"2022-12-15T00:20:36.725557Z",count:55(uint64),size:3240}(=data.Meta)}(=data.Object)
{id:0x10280e6d0c60dad134c5adbe34a3684b9c12cb24(=ksuid.KSUID),meta:{first:"2022-12-14T23:29:36.231974Z",last:"2022-12-14T23:23:54.241076Z",count:68(uint64),size:4860}(=data.Meta)}(=data.Object)
{id:0x102819e0037cb4488e80c3e2e2fa9c7cad5dd750(=ksuid.KSUID),meta:{first:"2022-12-15T00:18:36.472696Z",last:"2022-12-15T00:15:30.643509Z",count:106(uint64),size:6686}(=data.Meta)}(=data.Object)
{id:0x102814eebbf0a8ed36becb7b3cbaa61d7a814260(=ksuid.KSUID),meta:{first:"2022-12-14T23:57:32.329728Z",last:"2022-12-14T23:38:37.652742Z",count:99(uint64),size:5472}(=data.Meta)}(=data.Object)
{id:0x10280d6738f083c8d4a537d85d350532e3603d4b(=ksuid.KSUID),meta:{first:"2022-12-14T23:25:26.379656Z",last:null,count:1406(uint64),size:81180}(=data.Meta)}(=data.Object)
{id:0x102817882a024ff97342d1399df5613189628ef0(=ksuid.KSUID),meta:{first:"2022-12-15T00:08:36.442121Z",last:"2022-12-14T23:59:54.624202Z",count:98(uint64),size:5805}(=data.Meta)}(=data.Object)
{id:0x10281847966536a427cc07b001634ea41e55f313(=ksuid.KSUID),meta:{first:"2022-12-15T00:11:36.448901Z",last:"2022-12-15T00:08:56.725171Z",count:53(uint64),size:3877}(=data.Meta)}(=data.Object)
{id:0x102815b2beae1ffb64fb340ba1ef47f6bc129348(=ksuid.KSUID),meta:{first:"2022-12-15T00:00:32.684574Z",last:"2022-12-14T23:50:06.399812Z",count:94(uint64),size:6027}(=data.Meta)}(=data.Object)
{id:0x10281530ab54ed8b1240209839a583e2272276ed(=ksuid.KSUID),meta:{first:"2022-12-14T23:58:36.391613Z",last:"2022-12-14T23:50:08.179374Z",count:82(uint64),size:4683}(=data.Meta)}(=data.Object)
{id:0x102811bbbe7b2416576b55dc8a1f617b1c19dbfc(=ksuid.KSUID),meta:{first:"2022-12-14T23:43:36.296832Z",last:"2022-12-14T23:31:59.853610Z",count:93(uint64),size:6482}(=data.Meta)}(=data.Object)
{id:0x10280f2c5f055e896e0c79e307fd10b0076f562a(=ksuid.KSUID),meta:{first:"2022-12-14T23:32:44.208336Z",last:"2022-12-14T23:24:17.843368Z",count:87(uint64),size:5548}(=data.Meta)}(=data.Object)
{id:0x10281e85d9d6a5ecbc7de64fe65b74618aea8343(=ksuid.KSUID),meta:{first:"2022-12-15T00:38:28.405089Z",last:"2022-12-15T00:23:25.878890Z",count:89(uint64),size:7257}(=data.Meta)}(=data.Object)
{id:0x10281fcdec55f5346dd0081a5e5f0ecf93ed9eb8(=ksuid.KSUID),meta:{first:"2022-12-15T00:43:40.947908Z",last:"2022-12-15T00:36:59.909246Z",count:66(uint64),size:4999}(=data.Meta)}(=data.Object)
{id:0x10281e44e5f881970da05290410a1719386477a2(=ksuid.KSUID),meta:{first:"2022-12-15T00:37:04.864535Z",last:"2022-12-15T00:35:06.103223Z",count:84(uint64),size:4437}(=data.Meta)}(=data.Object)
{id:0x10281138462519fc5b7233bf6181838bda026b0a(=ksuid.KSUID),meta:{first:"2022-12-14T23:41:37.048916Z",last:"2022-12-14T23:39:37.850641Z",count:73(uint64),size:5265}(=data.Meta)}(=data.Object)
{id:0x10280e2c58e2668f51c47e590a924846a9b5fa66(=ksuid.KSUID),meta:{first:"2022-12-14T23:28:42.249016Z",last:"2022-12-14T23:23:37.146459Z",count:91(uint64),size:7556}(=data.Meta)}(=data.Object)
{id:0x102814ad319ba0609dc0f08b3977a530f8b57568(=ksuid.KSUID),meta:{first:"2022-12-14T23:56:08.768224Z",last:"2022-12-14T23:54:31.534224Z",count:93(uint64),size:5490}(=data.Meta)}(=data.Object)
{id:0x10281254e6df56cc20f5c7b502c1533521a19003(=ksuid.KSUID),meta:{first:"2022-12-14T23:46:11.789390Z",last:"2022-12-14T23:23:47.079369Z",count:150(uint64),size:8723}(=data.Meta)}(=data.Object)

Applying some crude math, the average data object size/count:

$ zed query -Z 'from zeek@main:objects | AvgSize:=avg(meta.size),AvgCount:=avg(meta.count)'
{
    AvgSize: 6904.77027027027,
    AvgCount: 102.70270270270271
}

Obviously that size is still way below the default 500 MB object size and hence we'd still be waiting a long time to see benefit from compaction. However, compared to the single-object commits discussed in #3151 and #4266, the data as stored here is much more forgiving when accessed in queries, e.g., counting is still very quick:

$ time zed query 'from zeek | count()'
{count:7819(uint64)}

real	0m0.054s
user	0m0.013s
sys	0m0.010s
@philrz
Copy link
Contributor Author

philrz commented Jun 21, 2023

I revived this config while testing the compaction currently performed by lake manager (#3923). This made me wish the JSON Zeek data could actually be properly shaped so I'd have real ts timestamps etc. I came up with this revised config that currently seems to be working ok with the reference shaper for Zeek.

$ cat fluentd-shaped.conf 
<source>
  @type tail
  path /usr/local/var/logs/current/json_streaming_*
  follow_inodes true
  pos_file /Users/phil/work/fluentd/fluentd.pos
  tag zeek
  <parse>
    @type json
  </parse>
</source>

<match zeek>
  @type exec_filter
  command zq -z -I shaper.zed '| quiet(this)' -
  tag shaped
  <format>
    @type json
  </format>
  <parse>
    @type none
  </parse>
  <buffer>
    flush_interval 1s
  </buffer>
</match>

<match shaped>
  @type http
  endpoint http://127.0.0.1:9988/pool/zeek/branch/main
  content_type application/x-zson
  <format>
    @type single_value
  </format>
</match>

Things to note:

  1. The quiet(this) deals with what appear to be some new log types added in more recent Zeek releases that aren't yet covered by the reference shaper. The way the shaper is currently written, attempts to shape these just become error(missing). I'll plan to add these new logs to the reference shaper and also improve the error messages to help alert the user about what precisely went wrong.

  2. I'd have preferred transporting the shaped data as ZNG rather than ZSON, but fluentd seems fussy about things being text lines. I may tinker more with binary ZNG the next time I revisit this.

  3. The buffering config may need more work. When playing around with just sending shaped data to stdout rather than posting to the Zed lake, it seemed like it was waiting excessively (forever?) for events to accumulate before sending output. I'm not sure if that flush_interval 1s is even having much effect right now, but it does seem like my live data is getting posted periodically, so I'm ok for now.

@mrbluecoat
Copy link

This looks awesome! +1 for ZNG support <3

https://zed.brimdata.io/docs/commands/zq#performance-comparisons

@mrbluecoat
Copy link

@philrz
Copy link
Contributor Author

philrz commented Jul 11, 2024

@mrbluecoat: Glad to hear you think it sounds encouraging! This material actually hasn't been updated in a while and the Zed lake has evolved a bit in that time (and I imagine Fluentd might have changed a bit too). I've been starting to more formally document these kinds of topics in the "Integrations" area of the Zed docs and would be keen to do that with this Fluentd stuff if it's something you could see putting to use. However, as I'm doing that, I'd like to make sure that whatever it covers is relevant to your goals. I imagine GitHub Issue comments might not be the best forum for laying out those details, so could you ping me through other channels so we could discuss? If you like Slack you could send yourself an invite to our community Slack workspace and message me @Phil or send an email to [email protected]. Hope to hear from you!

@mrbluecoat
Copy link

Will do, thanks

@philrz
Copy link
Contributor Author

philrz commented Aug 25, 2024

As anticipated in my previous comment here, I did end up writing an article expanding and formalizing the config originally sketched out in this issue. The article can be found at https://zed.brimdata.io/docs/next/integrations/fluentd and will be tagged with the docs version for the next GA Zed release which is expected soon. The user that chimed in here in previous comments did follow up on Slack and reported having followed the article and was successful in observing live data import. Therefore I'm going to go ahead and close this issue with the expectation that the article may continue to be improved over time as users continue to find/follow it and share their experiences.

@philrz philrz closed this as completed Aug 25, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants