A component within the datAcron EU project; it provides a Flink component that processes a stream of raw messages (i.e., AIS Dynamic Messages), and enriches it with derived attributes such as min/max, average, and variance of original fields (See Output Format and Samples
& Output Schema Description
).
In addition, a stream simulator for the raw messages is developed in the context of this module. It provides a feature to re-play the original stream of raw messages by generating a simulated new Kafka Stream and taking into the account the time delay between two consecutive messages of a trajectory. Furthermore, this delay can be scaled in/out by a configuration parameter (streamDelayScale
).
- Clone in-situ code:
https://github.com/ehabqadah/in-situ-processing-datAcron.git
- Go the project directory
cd in-situ-processing-datAcron
. - Update the config.properties as described the
Configurations
section below. - Run
mvn clean package -Pbuild-jar
to compile and build the project jar. - Run the raw input stream simulator
java -cp ..../in-situ-processing-datAcron/target/in-situ-processing-2.0.0.jar eu.datacron.insitu.InputStreamSimulatorApp
. - Go the Flink directory
cd .../flink-1.4.1
- Start Flink yarn session using
cd ../flink-1.4.0/
./bin/yarn-session.sh -n 8 -tm 15360 -s 8
to allocate 10 Task Managers, with 15 GB of memory and 8 processing slots for each task manger.
* To run In-situ module:
cd ../flink-1.4.0/
./bin/flink run -m yarn-cluster -yn 10 -ynm insitu_p..../in-situ-processing-datAcron/target/in-situ-processing-*.jar
* Check the output stream that has the kafka topic name in [config.properties](http://datacron2.ds.unipi.gr:9081/eqadah/in-situ-processing-datAcron/blob/master/src/main/resources/config.properties) file, in particular, the value of `outputStreamTopicName` (e.g., ais_messages_in_situ_processing_out_v2).
# Output Format and Samples:
* The data schema of the In-Situ Processing component's output as the following:
` timestamp,id,longitude,latitude,speed,heading,msgErrorFlag,turn,course,status,
NumberOfPoints,AverageDiffTime,LastDiffTime,MinDiffTime,MaxDiffTime,MaxSpeed,
MinSpeed,AverageSpeed,VarianceSpeed,MinLong,MaxLong,MinLat,MaxLat,MinTurn,MaxTurn,
MinHeading,MaxHeading,isChangeInArea,detectedAreas`
* The following four samples of a trajectory (id=228037600):
```javascript
2443660589,228037600,-4.342825,48.186092,9.1,511,,-127.0,197.7,15,686,14.83965014577259,11,7,311,10.2,5.6,9.12900874635568,0.3503100961334131,-4.45445,-4.316455,48.113506,48.20134,-127.0,-127.0,511,511,false,area1112315100;area1173315100
1443660600,228037600,-4.34293,48.18563,9.1,511,,-127.0,190.4,15,687,14.834061135371174,11,7,311,10.2,5.6,9.128966521106255,0.34980140644830476,-4.45445,-4.316455,48.113506,48.20134,-127.0,-127.0,511,511,true,
1443660610,228037600,-4.3429365,48.18521,9.1,511,,-127.0,180.5,15,688,14.827034883720925,10,7,311,10.2,5.6,9.128924418604647,0.3492941919618716,-4.45445,-4.316455,48.113506,48.20134,-127.0,-127.0,511,511,false, 1443660620,228037600,-4.3428435,48.184795,9.1,511,,-127.0,171.3,15,689,14.820029027576192,10,7,311,10.2,5.6,9.128882438316397,0.34878844626633326,-4.45445,-4.316455,48.113506,48.20134,-127.0,-127.0,511,511,true,area1112315100
1443660629,228037600,-4.3426933,48.184425,9.2,511,,-127.0,166.0,15,690,14.811594202898545,9,7,311,10.2,5.6,9.128985507246373,0.3482902751522813,-4.45445,-4.316455,48.113506,48.20134,-127.0,-127.0,511,511,false,area1112315100
- We use the same attributes and the delimiter (i.e., ",") as the AIS messages of NARI source with adding addition attributes computed by this module as depicted in the following table:
Index | Attribute | Data type | Description |
---|---|---|---|
[0] | timestamp | long | timestamp in UNIX epochs (i.e., milliseconds elapsed since 1970-01-01 00:00:00.000). |
[1] | id | integer | A globally unique identifier for the moving object (usually, the MMSI of vessels). |
[2] | longitude | double | Longitude (georeference: WGS 1984). |
[3] | latitude | double | Latitude (georeference: WGS 1984). |
[4] | speed | double | Speed over ground in knotsint (allowed values: 0-102.2 knots). |
[5] | heading | integer | True heading in degrees (0-359), relative to true north. |
[6] | msgErrorFlag | string | The MariWeb Security Appliance (MSA) error flags (if available). |
[7] | turn | double | Rate of turn, right or left, 0 to 720 degrees per minute. |
[8] | course | double | Course over ground (allowed values: 0-359.9 degrees). |
[9] | status | integer | AIS Navigational Status reported by the vessel. |
[10] | NumberOfPoints | long | The accumulated number of the received points i.e., AIS messages of the trajectory. |
[11] | AverageDiffTime | double | The average of the time difference between the consecutive AIS messages of the trajectory |
[12] | LastDiffTime | long | The time difference of the current message and the previous received message |
[13] | MinDiffTime | long | The minimum value of time difference until the current AIS message. |
[14] | MaxDiffTime | long | The maximum value of time difference until the current AIS message. |
[15] | MaxSpeed | double | The maximum value of speed within the trajectory until the current AIS message. |
[16] | MinSpeed | double | The minimum value of speed with the trajectory until the current AIS message. |
[17] | AverageSpeed | double | The average value of the speed |
[18] | VarianceSpeed | double | The sample variance of speed |
[19] | MinLong | double | The minimum value of longitude of the trajectory until the current AIS message. |
[20] | MaxLong | double | The maximum value of longitude of the trajectory until the current AIS message. |
[21] | MinLat | double | The minimum value of latitude of the trajectory until the current AIS message. |
[22] | MaxLat | double | The maximum value of latitude of the trajectory until the current AIS message. |
[23] | MinTurn | double | The minimum value of turn until of the trajectory the current AIS message. |
[24] | MaxTurn | double | The maximum value of turn until the of the trajectory the current AIS message. |
[25] | MinHeading | int | The minimum value of heading until of the trajectory the current AIS message. |
[26] | MaxHeading | int | The maximum value of heading until of the trajectory the current AIS message. |
[27] | isChangeInArea | boolean | A flag to indicate if there is change in the area or not. |
[28] | detectedAreas | string | empty string or list of area codes/names separated by ; |
This section describes the different configurations/parameters that customize the execution of the In-Situ Processing module, and the following are the full parameters list of the module alongside with their description and usage, can be changed in the config.properties file.
Parameter Name | Example | Description | Used In |
---|---|---|---|
bootstrapServers |
localhost:9092,localhost:9093 | Kafka zookeeper host string | InSituProcessingApp & RawStreamSimulator |
zookeeper |
localhost:2181 | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster, for more details check here | InSituProcessingApp & RawStreamSimulator |
inputStreamTopicName |
aisInsituIn | This is the topic name of the output stream of the RawStreamSimulator and the topic name of the input stream of InSituProcessingApp , so both components are connected through a Kafka stream |
InSituProcessingApp & RawStreamSimulator |
outputStreamTopicName |
aisInsituOut | The topic name of the output stream of the In-Situ Processing (i.e., enriched stream) | InSituProcessingApp |
kafkaGroupId |
in_situ | The Kafka consumer group name for the InSituProcessingApp if the streamSourceType is set as KAFKA |
InSituProcessingApp |
aisDataSetFilePath |
./data/nari_test_dataset.csv | The path of the input dataset file | InSituProcessingApp & RawStreamSimulator |
streamSourceType |
FILE or KAFKA | This to select which source to be used as input for the In-Situ Processing module either directly by reading an input file (aisDataSetFilePath) or by ingesting a Kafka stream (inputStreamTopicName) |
InSituProcessingApp |
streamDelayScale |
1.0 | Scale factor of the time delay between the raw messages in the Stream simulator (i.e., simulated delay = actual delay *streamDelayScale ) | RawStreamSimulator |
inputDataSchema |
nariRawStreamSchema.json , imisRawStreamSchema.json and imisFullRawStreamSchema.json | Specifies the schema of the input raw messages to support multiple sources such as IMIS Global and NARI files in the maritime use case | InSituProcessingApp & RawStreamSimulator |
flinkCheckPointsPath |
checkpoints | The directory of the Flink checkpoints | InSituProcessingApp & RawStreamSimulator |
parallelism |
8 | The parallelism level of the Flink job | InSituProcessingApp |
outputLineDelimiter |
, | The separator of the line of the output stream | InSituProcessingApp |
outputFilePath |
The output path of enriched stream file if writeOutputStreamToFile is true |
InSituProcessingApp |
|
polygonsFilePath |
polygons.csv | The file path of areas polygons | InSituProcessingApp |
-
To deploy the In-Situ Processing module on Flink cluster (locally):
- Go the root directory of the project.
- Edit the
/in-situ-processing-datAcron/scripts/deployOnFlinkCluster.sh
and update the correct directory of the Flink installation by changing theFLINK_DIR
variable in the script. - Make sure that the script file has a permission to be executed, use
/in-situ-processing-datAcron/scripts/deployOnFlinkCluster.sh
. - Run the script by
/in-situ-processing-datAcron/scripts/deployOnFlinkCluster.sh
that submit the In-Situ Processing job to the local Flink cluster.
-
To run the Raw AIS Messages Stream Simulator component on Flink cluster (locally):
- Go the root directory of the project.
- Edit the
/in-situ-processing-datAcron/scripts/deploySimulatorOnFlink.sh
and update the correct local directory of the Flink installation by changing theFLINK_DIR
variable in the script. - Make sure that the script file has a permission to be executed, use
chmod +x /in-situ-processing-datAcron/scripts/deploySimulatorOnFlink.sh
. - Run the script by
./scripts/runSimulatorOnFlinkLocally.sh
that submit the Stream simulator job to the Flink cluster.
- To Setup the Kafka cluster locally, run a predefined shell script
scrips/setup-kafka.sh
- To execute the simulator for the stream of raw messages, run
eu.datacron.insitu.InputStreamSimulatorApp
. - To execute the In-Situ Processing module:
eu.datacron.insitu.InSituProcessingApp
. - To check the module output run the
scrips/runOutputStreamKafkaConsumer.sh
that lunches a consumer console for the output stream of the module.