-
Notifications
You must be signed in to change notification settings - Fork 62
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[issue 63] Flink alert sample #64
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, @hldnova ! I have only a few requests for change, please address them.
flink-examples/build.gradle
Outdated
compile "org.slf4j:slf4j-log4j12:1.7.14" | ||
compile "com.google.code.gson:gson:2.3.+" | ||
compile "joda-time:joda-time:2.9.+" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's this dependency, @hldnova ? What kind of license is this distributed under?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
gson is the google json parser. joda-time, among other things, provides timestamp parsing. They are both under Apache 2.0 license.
|
||
In the logstash window, paste apache access logs like the followings: | ||
``` | ||
10.1.1.11 - peter [19/Mar/2018:02:24:01 -0400] "GET /health/ HTTP/1.1" 200 182 "http://example.com/myapp" "Mozilla/5.0" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand this right, this is expecting some manual input from the user. Could we also have a way to tail access logs coming through files? For example, if I start a web server locally and point logstash to the directory with the access logs, it should be able to tail it, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The common practice is logs -> filebeat -> logstash -> ... Given the number components in the demo, I will make a docker image to set up the end to end environment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fpj, I've updated the sample to read from a file. I also built a docker image to make it easier to set up the environment. I believe i had addressed all the requests from you.
While committing changes, i forgot to set signed-off-by. Had to use amend/rebase to fix them. But somehow the latest commit still complains even if there is a signed-off-by identical to others. I am puzzled and may need some help to fix it.
@@ -0,0 +1,2 @@ | |||
input { stdin { } } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We add the license header to config files as well, see under config/
in pravega.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
4b34e85
to
a76b37a
Compare
… output plugin Signed-off-by: Lida He <[email protected]> add example to process apache access log and generate high 500 response alert logstash config and first cut of readme for alert sample instruction to run high count alerter sample. instruction to run the high count alerter sample print output to stdout Add flink references update flink references Steps to use wordCountWriter instead of logstash Signed-off-by: Lida He <[email protected]> Add link to readme file for high error count alerter Signed-off-by: Lida He <[email protected]> Update README.md Signed-off-by: Lida He <[email protected]> add license to conf files Signed-off-by: Lida He <[email protected]> read access log from file instead of stdin Signed-off-by: Lida He <[email protected]> Update instruction to read access log from file. Signed-off-by: Lida He <[email protected]> Update instruction to read from file. Signed-off-by: Lida He <[email protected]> Update instruction to read access log from file Signed-off-by: Lida He <[email protected]> Update README.md Signed-off-by: Lida He <[email protected]> Update README.md Signed-off-by: Lida He <[email protected]> Signed-off-by: Lida He <[email protected]>
dataStream.flatMap(new FlatMapFunction<AccessLog, ResponseCount>() { | ||
@Override | ||
public void flatMap(AccessLog value, Collector<ResponseCount> out) throws Exception { | ||
out.collect(new ResponseCount(value.getStatus(), 1)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't we first filter the error log for the response code 500
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. i was thinking about doing something processing together with response code, but decide not to make it complicated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are not making use of the log event time to detect the 500 response code for specific window and it looks like it will be based on the processing time. Should we plan to change the code to make use of the log event time, we could modify something like this?
// initialize the Flink execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
**env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);**
// create the Pravega source to read a stream of text
FlinkPravegaReader<String> reader = FlinkPravegaReader.<String>builder()
.withPravegaConfig(pravegaConfig)
.forStream(stream)
.withDeserializationSchema(PravegaSerialization.deserializationFor(String.class))
.build();
// add the Pravega reader as the data source
DataStream<String> inputStream = env.addSource(reader);
// transform logs
DataStream<AccessLog> dataStream =
inputStream.map(new ParseLogData())
.filter(a -> a.getStatus().equals("500")).name("filter");
DataStream<AccessLog> withTimestampsAndWatermarks =
dataStream.assignTimestampsAndWatermarks(new EventTimeExtractor(Time.seconds(1000)))
.name("watermark");
DataStream<ResponseCount> countStream =
withTimestampsAndWatermarks
.keyBy("verb")
//.timeWindow(Time.seconds(Constants.ALERT_WINDOW))
.window(TumblingEventTimeWindows.of(Time.seconds(Constants.ALERT_WINDOW)))
.fold(new ResponseCount(), new FoldFunction<AccessLog, ResponseCount>() {
@Override
public ResponseCount fold(ResponseCount accumulator, AccessLog value) throws Exception {
accumulator.response = value.getVerb();
accumulator.count = accumulator.count + 1;
return accumulator;
}
}).name("aggregated-count");
// create an output sink to stdout for verification
countStream.print().name("print-count");
// create alert pattern
Pattern<ResponseCount,?> pattern500 = Pattern.<ResponseCount>begin("500pattern")
.where(new SimpleCondition<ResponseCount>() {
@Override
public boolean filter(ResponseCount value) throws Exception {
return value.count >= Constants.ALERT_THRESHOLD;
}
});
PatternStream<ResponseCount> patternStream = CEP.pattern(countStream, pattern500);
DataStream<Alert> alertStream = patternStream.select(
new PatternSelectFunction<ResponseCount, Alert>() {
@Override
public Alert select(Map<String, List<ResponseCount>> pattern) throws Exception {
ResponseCount count = pattern.get("500pattern").get(0);
return new Alert(count.response, count.count, "High 500 responses");
}
}).name("alert");
// create an output sink to stdout for verification
alertStream.print().name("print-alert");
// execute within the Flink environment
env.execute("HighCountAlerter");
public static class EventTimeExtractor extends BoundedOutOfOrdernessTimestampExtractor<AccessLog> {
public EventTimeExtractor(Time time) { super(time); }
@Override
public long extractTimestamp(AccessLog accessLog) {
return accessLog.getTimestampMillis();
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for the tip. I plan to come up with a few more examples based on the access log. I will use event time in them later. for this example, i'd like to keep it simple and straightforward to follow.
public void flatMap(AccessLog value, Collector<ResponseCount> out) throws Exception { | ||
out.collect(new ResponseCount(value.getStatus(), 1)); | ||
} | ||
}).filter((FilterFunction<ResponseCount>) count -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we really need this filter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was meant to deal with malformed data. No longer relevant. replaced with filter for 500 responses.
private static class ParseLogData implements MapFunction<String, AccessLog>{ | ||
public AccessLog map(String record) throws Exception { | ||
// TODO: handle exceptions | ||
Gson gson = new Gson(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like we are not using gson
instance?
Gson gson = new Gson(); | ||
AccessLog accessLog = new AccessLog(); | ||
JsonParser parser = new JsonParser(); | ||
JsonObject obj = parser.parse(record).getAsJsonObject(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think Jersey Jackson module (ObjectMapper) can help convert json string to bean if we want to avoid the boiler plate code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Use jackson now.
Signed-off-by: Lida He <[email protected]>
Signed-off-by: Lida He <[email protected]>
Signed-off-by: Lida He <[email protected]>
Signed-off-by: Lida He <[email protected]>
… output plugin Signed-off-by: Lida He <[email protected]> add example to process apache access log and generate high 500 response alert logstash config and first cut of readme for alert sample instruction to run high count alerter sample. instruction to run the high count alerter sample print output to stdout Add flink references update flink references Steps to use wordCountWriter instead of logstash Signed-off-by: Lida He <[email protected]> Add link to readme file for high error count alerter Signed-off-by: Lida He <[email protected]> Update README.md Signed-off-by: Lida He <[email protected]> add license to conf files Signed-off-by: Lida He <[email protected]> read access log from file instead of stdin Signed-off-by: Lida He <[email protected]> Update instruction to read access log from file. Signed-off-by: Lida He <[email protected]> Update instruction to read from file. Signed-off-by: Lida He <[email protected]> Update instruction to read access log from file Signed-off-by: Lida He <[email protected]> Update README.md Signed-off-by: Lida He <[email protected]> Update README.md Signed-off-by: Lida He <[email protected]> Signed-off-by: Lida He <[email protected]>
Signed-off-by: Lida He <[email protected]>
Signed-off-by: Lida He <[email protected]>
Signed-off-by: Lida He <[email protected]>
* Moves configurations from hadoop-examples gradle.properties to root gradle.properties * Updates hadoop connector snapshot version Signed-off-by: Vijay Srinivasaraghavan <[email protected]>
* Fix in TurbineHeatSensor.java to avoid the application to throw NullPointerException. Note that this PR only targets this specific problem (i.e., other exceptions or problems will be treated separately). Signed-off-by: Raúl Gracia <[email protected]>
Signed-off-by: Raúl Gracia <[email protected]>
… well as the build associated tasks/properties. (#117) Signed-off-by: Raúl Gracia <[email protected]>
…110) * Fixes in TurbineHeatSensor.java the use of client factories while creating writers. Note that this PR specifically targets to avoid java.lang.RuntimeException: ManagedChannel allocation site exception to be thrown (e.g., the app may throw other exceptions that will be addressed separately). Signed-off-by: Raúl Gracia <[email protected]>
* updated connector snapshot version * fixed connector writer transaction API change Signed-off-by: Vijay Srinivasaraghavan <[email protected]>
… output plugin Signed-off-by: Lida He <[email protected]> add example to process apache access log and generate high 500 response alert logstash config and first cut of readme for alert sample instruction to run high count alerter sample. instruction to run the high count alerter sample print output to stdout Add flink references update flink references Steps to use wordCountWriter instead of logstash Signed-off-by: Lida He <[email protected]> Add link to readme file for high error count alerter Signed-off-by: Lida He <[email protected]> Update README.md Signed-off-by: Lida He <[email protected]> add license to conf files Signed-off-by: Lida He <[email protected]> read access log from file instead of stdin Signed-off-by: Lida He <[email protected]> Update instruction to read access log from file. Signed-off-by: Lida He <[email protected]> Update instruction to read from file. Signed-off-by: Lida He <[email protected]> Update instruction to read access log from file Signed-off-by: Lida He <[email protected]> Update README.md Signed-off-by: Lida He <[email protected]> Update README.md Signed-off-by: Lida He <[email protected]> Signed-off-by: Lida He <[email protected]>
Signed-off-by: Lida He <[email protected]>
Signed-off-by: Lida He <[email protected]>
Signed-off-by: Lida He <[email protected]>
Signed-off-by: Lida He <[email protected]>
… output plugin Signed-off-by: Lida He <[email protected]> add example to process apache access log and generate high 500 response alert logstash config and first cut of readme for alert sample instruction to run high count alerter sample. instruction to run the high count alerter sample print output to stdout Add flink references update flink references Steps to use wordCountWriter instead of logstash Signed-off-by: Lida He <[email protected]> Add link to readme file for high error count alerter Signed-off-by: Lida He <[email protected]> Update README.md Signed-off-by: Lida He <[email protected]> add license to conf files Signed-off-by: Lida He <[email protected]> read access log from file instead of stdin Signed-off-by: Lida He <[email protected]> Update instruction to read access log from file. Signed-off-by: Lida He <[email protected]> Update instruction to read from file. Signed-off-by: Lida He <[email protected]> Update instruction to read access log from file Signed-off-by: Lida He <[email protected]> Update README.md Signed-off-by: Lida He <[email protected]> Update README.md Signed-off-by: Lida He <[email protected]> Signed-off-by: Lida He <[email protected]>
Signed-off-by: Lida He <[email protected]>
Signed-off-by: Lida He <[email protected]>
Signed-off-by: Lida He <[email protected]>
Signed-off-by: Lida He <[email protected]>
Signed-off-by: Lida He <[email protected]>
Signed-off-by: Lida He <[email protected]>
Signed-off-by: Lida He <[email protected]>
Signed-off-by: Lida He <[email protected]>
Signed-off-by: Lida He <[email protected]>
Singed-off-by: Lida He <[email protected]> Signed-off-by: Lida He <[email protected]>
Signed-off-by: Lida He <[email protected]>
Signed-off-by: Lida He <[email protected]>
Signed-off-by: Lida He <[email protected]>
…mples into flinkalert-63 Signed-off-by: Lida He <[email protected]>
messed up with merge. submitted replacement: #126 |
fixes #63