Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[issue 63] Flink alert sample #64

Closed
wants to merge 51 commits into from
Closed

[issue 63] Flink alert sample #64

wants to merge 51 commits into from

Conversation

hldnova
Copy link
Contributor

@hldnova hldnova commented Mar 28, 2018

fixes #63

@hldnova hldnova changed the title Flinkalert 63 Flink alert sample Mar 28, 2018
@hldnova hldnova requested review from fpj and EronWright March 28, 2018 15:52
@hldnova hldnova assigned kvandra and unassigned kvandra Mar 28, 2018
Copy link
Contributor

@fpj fpj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, @hldnova ! I have only a few requests for change, please address them.

compile "org.slf4j:slf4j-log4j12:1.7.14"
compile "com.google.code.gson:gson:2.3.+"
compile "joda-time:joda-time:2.9.+"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this dependency, @hldnova ? What kind of license is this distributed under?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gson is the google json parser. joda-time, among other things, provides timestamp parsing. They are both under Apache 2.0 license.


In the logstash window, paste apache access logs like the followings:
```
10.1.1.11 - peter [19/Mar/2018:02:24:01 -0400] "GET /health/ HTTP/1.1" 200 182 "http://example.com/myapp" "Mozilla/5.0"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand this right, this is expecting some manual input from the user. Could we also have a way to tail access logs coming through files? For example, if I start a web server locally and point logstash to the directory with the access logs, it should be able to tail it, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The common practice is logs -> filebeat -> logstash -> ... Given the number components in the demo, I will make a docker image to set up the end to end environment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fpj, I've updated the sample to read from a file. I also built a docker image to make it easier to set up the environment. I believe i had addressed all the requests from you.

While committing changes, i forgot to set signed-off-by. Had to use amend/rebase to fix them. But somehow the latest commit still complains even if there is a signed-off-by identical to others. I am puzzled and may need some help to fix it.

@@ -0,0 +1,2 @@
input { stdin { } }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We add the license header to config files as well, see under config/ in pravega.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@hldnova hldnova force-pushed the flinkalert-63 branch 6 times, most recently from 4b34e85 to a76b37a Compare May 14, 2018 02:16
… output plugin

Signed-off-by: Lida He <[email protected]>

add example to process apache access log and generate high 500 response alert

logstash config and first cut of readme for alert sample

instruction to run high count alerter sample.

instruction to run the high count alerter sample

print output to stdout

Add flink references

update flink references

Steps to use wordCountWriter  instead of logstash
Signed-off-by: Lida He <[email protected]>

Add link to readme file for high error count alerter
Signed-off-by: Lida He <[email protected]>

Update README.md
Signed-off-by: Lida He <[email protected]>

add license to conf files
Signed-off-by: Lida He <[email protected]>

read access log from file instead of stdin
Signed-off-by: Lida He <[email protected]>

Update instruction to read access log from file.
Signed-off-by: Lida He <[email protected]>

Update instruction to read from file.
Signed-off-by: Lida He <[email protected]>

Update instruction to read access log from file
Signed-off-by: Lida He <[email protected]>

Update README.md
Signed-off-by: Lida He <[email protected]>

Update README.md

Signed-off-by: Lida He <[email protected]>
Signed-off-by: Lida He <[email protected]>
dataStream.flatMap(new FlatMapFunction<AccessLog, ResponseCount>() {
@Override
public void flatMap(AccessLog value, Collector<ResponseCount> out) throws Exception {
out.collect(new ResponseCount(value.getStatus(), 1));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we first filter the error log for the response code 500?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. i was thinking about doing something processing together with response code, but decide not to make it complicated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are not making use of the log event time to detect the 500 response code for specific window and it looks like it will be based on the processing time. Should we plan to change the code to make use of the log event time, we could modify something like this?

        // initialize the Flink execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        **env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);**

        // create the Pravega source to read a stream of text
        FlinkPravegaReader<String> reader = FlinkPravegaReader.<String>builder()
                .withPravegaConfig(pravegaConfig)
                .forStream(stream)
                .withDeserializationSchema(PravegaSerialization.deserializationFor(String.class))
                .build();

        // add the Pravega reader as the data source
        DataStream<String> inputStream = env.addSource(reader);

        // transform logs
        DataStream<AccessLog> dataStream =
                inputStream.map(new ParseLogData())
                        .filter(a -> a.getStatus().equals("500")).name("filter");
        DataStream<AccessLog> withTimestampsAndWatermarks =
                dataStream.assignTimestampsAndWatermarks(new EventTimeExtractor(Time.seconds(1000)))
                        .name("watermark");

        DataStream<ResponseCount> countStream =
        withTimestampsAndWatermarks
                .keyBy("verb")
                //.timeWindow(Time.seconds(Constants.ALERT_WINDOW))
                .window(TumblingEventTimeWindows.of(Time.seconds(Constants.ALERT_WINDOW)))
                .fold(new ResponseCount(), new FoldFunction<AccessLog, ResponseCount>() {
                    @Override
                    public ResponseCount fold(ResponseCount accumulator, AccessLog value) throws Exception {
                        accumulator.response = value.getVerb();
                        accumulator.count = accumulator.count + 1;
                        return accumulator;
                    }
                }).name("aggregated-count");

        // create an output sink to stdout for verification
        countStream.print().name("print-count");

        // create alert pattern
        Pattern<ResponseCount,?> pattern500 = Pattern.<ResponseCount>begin("500pattern")
            .where(new SimpleCondition<ResponseCount>() {
                @Override
                public boolean filter(ResponseCount value) throws Exception {
                    return value.count >= Constants.ALERT_THRESHOLD;
                }
            });

        PatternStream<ResponseCount> patternStream = CEP.pattern(countStream, pattern500);

        DataStream<Alert> alertStream = patternStream.select(
            new PatternSelectFunction<ResponseCount, Alert>() {
                @Override
                public Alert select(Map<String, List<ResponseCount>> pattern) throws Exception {
                    ResponseCount count = pattern.get("500pattern").get(0);
                    return new Alert(count.response, count.count, "High 500 responses");
                }
            }).name("alert");

        // create an output sink to stdout for verification
        alertStream.print().name("print-alert");


        // execute within the Flink environment
        env.execute("HighCountAlerter");

    public static class EventTimeExtractor extends BoundedOutOfOrdernessTimestampExtractor<AccessLog> {

        public EventTimeExtractor(Time time) { super(time); }

        @Override
        public long extractTimestamp(AccessLog accessLog) {
            return accessLog.getTimestampMillis();
        }
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the tip. I plan to come up with a few more examples based on the access log. I will use event time in them later. for this example, i'd like to keep it simple and straightforward to follow.

public void flatMap(AccessLog value, Collector<ResponseCount> out) throws Exception {
out.collect(new ResponseCount(value.getStatus(), 1));
}
}).filter((FilterFunction<ResponseCount>) count -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need this filter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was meant to deal with malformed data. No longer relevant. replaced with filter for 500 responses.

private static class ParseLogData implements MapFunction<String, AccessLog>{
public AccessLog map(String record) throws Exception {
// TODO: handle exceptions
Gson gson = new Gson();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like we are not using gson instance?

Gson gson = new Gson();
AccessLog accessLog = new AccessLog();
JsonParser parser = new JsonParser();
JsonObject obj = parser.parse(record).getAsJsonObject();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Jersey Jackson module (ObjectMapper) can help convert json string to bean if we want to avoid the boiler plate code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Use jackson now.

@hldnova hldnova changed the base branch from master to develop May 18, 2018 02:11
… output plugin

Signed-off-by: Lida He <[email protected]>

add example to process apache access log and generate high 500 response alert

logstash config and first cut of readme for alert sample

instruction to run high count alerter sample.

instruction to run the high count alerter sample

print output to stdout

Add flink references

update flink references

Steps to use wordCountWriter  instead of logstash
Signed-off-by: Lida He <[email protected]>

Add link to readme file for high error count alerter
Signed-off-by: Lida He <[email protected]>

Update README.md
Signed-off-by: Lida He <[email protected]>

add license to conf files
Signed-off-by: Lida He <[email protected]>

read access log from file instead of stdin
Signed-off-by: Lida He <[email protected]>

Update instruction to read access log from file.
Signed-off-by: Lida He <[email protected]>

Update instruction to read from file.
Signed-off-by: Lida He <[email protected]>

Update instruction to read access log from file
Signed-off-by: Lida He <[email protected]>

Update README.md
Signed-off-by: Lida He <[email protected]>

Update README.md

Signed-off-by: Lida He <[email protected]>
Signed-off-by: Lida He <[email protected]>
Signed-off-by: Lida He <[email protected]>
vijikarthi and others added 26 commits June 18, 2018 16:02
* Moves configurations from hadoop-examples gradle.properties
   to root gradle.properties
* Updates hadoop connector snapshot version

Signed-off-by: Vijay Srinivasaraghavan <[email protected]>
* Fix in TurbineHeatSensor.java to avoid the application
  to throw NullPointerException. Note that this PR only
  targets this specific problem (i.e., other exceptions
  or problems will be treated separately).

Signed-off-by: Raúl Gracia <[email protected]>
… well as the build associated tasks/properties. (#117)

Signed-off-by: Raúl Gracia <[email protected]>
…110)

* Fixes in TurbineHeatSensor.java the use of client factories
  while creating writers. Note that this PR specifically targets
  to avoid java.lang.RuntimeException: ManagedChannel
  allocation site exception to be  thrown (e.g., the app may
  throw other exceptions that will be addressed separately).

Signed-off-by: Raúl Gracia <[email protected]>
* updated connector snapshot version
* fixed connector writer transaction API change

Signed-off-by: Vijay Srinivasaraghavan <[email protected]>
… output plugin

Signed-off-by: Lida He <[email protected]>

add example to process apache access log and generate high 500 response alert

logstash config and first cut of readme for alert sample

instruction to run high count alerter sample.

instruction to run the high count alerter sample

print output to stdout

Add flink references

update flink references

Steps to use wordCountWriter  instead of logstash
Signed-off-by: Lida He <[email protected]>

Add link to readme file for high error count alerter
Signed-off-by: Lida He <[email protected]>

Update README.md
Signed-off-by: Lida He <[email protected]>

add license to conf files
Signed-off-by: Lida He <[email protected]>

read access log from file instead of stdin
Signed-off-by: Lida He <[email protected]>

Update instruction to read access log from file.
Signed-off-by: Lida He <[email protected]>

Update instruction to read from file.
Signed-off-by: Lida He <[email protected]>

Update instruction to read access log from file
Signed-off-by: Lida He <[email protected]>

Update README.md
Signed-off-by: Lida He <[email protected]>

Update README.md

Signed-off-by: Lida He <[email protected]>
Signed-off-by: Lida He <[email protected]>
Signed-off-by: Lida He <[email protected]>
… output plugin

Signed-off-by: Lida He <[email protected]>

add example to process apache access log and generate high 500 response alert

logstash config and first cut of readme for alert sample

instruction to run high count alerter sample.

instruction to run the high count alerter sample

print output to stdout

Add flink references

update flink references

Steps to use wordCountWriter  instead of logstash
Signed-off-by: Lida He <[email protected]>

Add link to readme file for high error count alerter
Signed-off-by: Lida He <[email protected]>

Update README.md
Signed-off-by: Lida He <[email protected]>

add license to conf files
Signed-off-by: Lida He <[email protected]>

read access log from file instead of stdin
Signed-off-by: Lida He <[email protected]>

Update instruction to read access log from file.
Signed-off-by: Lida He <[email protected]>

Update instruction to read from file.
Signed-off-by: Lida He <[email protected]>

Update instruction to read access log from file
Signed-off-by: Lida He <[email protected]>

Update README.md
Signed-off-by: Lida He <[email protected]>

Update README.md

Signed-off-by: Lida He <[email protected]>
Signed-off-by: Lida He <[email protected]>
Signed-off-by: Lida He <[email protected]>
Signed-off-by: Lida He <[email protected]>
Signed-off-by: Lida He <[email protected]>
Signed-off-by: Lida He <[email protected]>
@hldnova
Copy link
Contributor Author

hldnova commented Jun 22, 2018

messed up with merge. submitted replacement: #126

@hldnova hldnova closed this Jun 22, 2018
@EronWright EronWright deleted the flinkalert-63 branch October 22, 2019 22:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants