Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

flight data ingester demo #39

Merged
merged 2 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions flight-data-ingester/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# GreptimeDB Flight

This project demonstrates GreptimeDB's ingesting geo-spatial data using the the `greptimedb-ingester-go` client. It selects the last 10 flights that departed in the last 30 minutes from the configured icao airport code. The ingester script utilizes the [OpenSky Network API](https://opensky-network.org/apidoc/) to fetch flight state data and inserts the flight metrics into GreptimeDB

## How to run this demo

Ensure you have `git`, `docker`, `docker-compose`
installed. To run this demo:

```shell
git clone https://github.com/GreptimeTeam/demo-scene.git
cd demo-scene/flight-data-ingester
docker compose up
```

It can take a while for the first run to pull down images and also build necessary components.

## How it works

The topology is illustrated in this diagram.

```mermaid
flowchart LR
greptimedb[(GreptimeDB)]

api{Open Sky Data} --> go-ingester
go-ingester --> greptimedb
```

after GreptimeDB starts, we use the `ingester` script which uses the go client's [high level api](https://docs.greptime.com/user-guide/ingest-data/for-iot/grpc-sdks/go/#installation) to create the table and insert data. It's dead-simple to perform transformations and data munging on your struct and insert into target GreptimeDB columns by tagging your metric struct accordingly as seen in the `./ingester/dto.go` file .

## Note

please update the ICAO flight code for your local airport of the city you are
running the demo in to ensure there is data in the set. This can be done by
setting the ``ICAO_AIRPORT_CODE` environment variable when running `docker
compose up`.

If you are going to restart this demo, press `Ctrl-C` and remember to call
`docker compose down` to clean up the data before you run `docker compose up`
again.
31 changes: 31 additions & 0 deletions flight-data-ingester/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
services:
greptimedb:
image: docker.io/greptime/greptimedb:v0.9.2
command: standalone start --http-addr=0.0.0.0:4000 --rpc-addr=0.0.0.0:4001 --mysql-addr=0.0.0.0:4002 --postgres-addr 0.0.0.0:4003
ports:
- 4000:4000
- 4001:4001
- 4002:4002
- 4003:4003
networks:
- demo-network
healthcheck:
test: ["CMD", "curl", "-f", "http://127.0.0.1:4000/health"]
interval: 3s
timeout: 3s
retries: 5

ingester:
build:
context: ./ingester
dockerfile: Dockerfile
networks:
- demo-network
environment:
- ICAO_AIRPORT_CODE=${ICAO_AIRPORT_CODE:-KSFO}
depends_on:
greptimedb:
condition: service_started

networks:
demo-network:
18 changes: 18 additions & 0 deletions flight-data-ingester/ingester/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
FROM docker.io/golang:1.21-alpine

WORKDIR /app

# Copy go mod and sum files
COPY go.mod go.sum ./

# Download all dependencies
RUN go mod download

# Copy the source code
COPY *.go ./

# Build the application
RUN CGO_ENABLED=0 GOOS=linux go build -o /app/producer

# Run the binary
CMD ["./producer"]
95 changes: 95 additions & 0 deletions flight-data-ingester/ingester/dto.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package main

import "time"

type FlightMetric struct {
Icao24 string `greptime:"tag;column:icao24;type:string"`
Longitude *float64 `greptime:"tag;column:longitude;type:float64"`
Latitude *float64 `greptime:"tag;column:latitude;type:float64"`
Velocity *float64 `greptime:"tag;column:velocity;type:float64"`
BaroAltitude *float64 `greptime:"tag;column:baro_altitude;type:float64"`
GeoAltitude *float64 `greptime:"tag;column:geo_altitude;type:float64"`
TimePosition *int64 `greptime:"tag;column:time_position;type:int64"`
Timestamp time.Time `greptime:"timestamp;column:ts;type:timestamp;precision:millisecond"`
}

func (FlightMetric) TableName() string {
return "icao24_state"
}

type Flight struct {
Icao24 string `json:"icao24"`
FirstSeen int64 `json:"firstSeen"`
EstDepartureAirport string `json:"estDepartureAirport"`
LastSeen int64 `json:"lastSeen"`
EstArrivalAirport string `json:"estArrivalAirport"`
CallSign string `json:"callsign"`
EstDepartureTime int64 `json:"estDepartureTime"`
EstArrivalTime int64 `json:"estArrivalTime"`
DepartureAirportCandidatesCount int `json:"departureAirportCandidatesCount"`
ArrivalAirportCandidatesCount int `json:"arrivalAirportCandidatesCount"`
}

type StateResponse struct {
Time int64 `json:"time"`
States [][]interface{} `json:"states"`
}

func (sr *StateResponse) ToFlightMetrics() []FlightMetric {
metrics := make([]FlightMetric, 0, len(sr.States))
for _, state := range sr.States {
if len(state) < 17 {
continue // Skip if the state doesn't have enough elements
}
// check the state vector positions

metric := FlightMetric{
Icao24: state[0].(string),
Longitude: floatToFloat64Ptr(state[5]),
Latitude: floatToFloat64Ptr(state[6]),
BaroAltitude: floatToFloat64Ptr(state[7]),
Velocity: floatToFloat64Ptr(state[9]),
GeoAltitude: floatToFloat64Ptr(state[13]),
Timestamp: time.Unix(sr.Time, 0),
TimePosition: floatToInt64Ptr(state[3]),
}
metrics = append(metrics, metric)
}
return metrics
}

// https://openskynetwork.github.io/opensky-api/rest.html#all-state-vectors
// StateData{
// Icao24: state[0].(string),
// Callsign: state[1].(string),
// OriginCountry: state[2].(string),
// TimePosition: floatToInt64Ptr(state[3]),
// LastContact: floatToInt64(state[4]),
// Longitude: floatToFloat64Ptr(state[5]),
// Latitude: floatToFloat64Ptr(state[6]),
// BaroAltitude: floatToFloat64Ptr(state[7]),
// OnGround: state[8].(bool),
// Velocity: floatToFloat64Ptr(state[9]),
// TrueTrack: floatToFloat64Ptr(state[10]),
// VerticalRate: floatToFloat64Ptr(state[11]),
// Sensors: nil, // This field is null in the example
// GeoAltitude: floatToFloat64Ptr(state[13]),
// Squawk: nil, // This field is null in the example
// Spi: state[15].(bool),
// PositionSource: floatToInt(state[16]),

func floatToInt64Ptr(v interface{}) *int64 {
if v == nil {
return nil
}
i := int64(v.(float64))
return &i
}

func floatToFloat64Ptr(v interface{}) *float64 {
if v == nil {
return nil
}
f := v.(float64)
return &f
}
16 changes: 16 additions & 0 deletions flight-data-ingester/ingester/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
module github.com/GreptimeTeam/demo-scene/flight-data-ingester

go 1.21

require (
github.com/GreptimeTeam/greptime-proto v0.7.0 // indirect
github.com/GreptimeTeam/greptimedb-ingester-go v0.5.0 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/stoewer/go-strcase v1.3.0 // indirect
golang.org/x/net v0.28.0 // indirect
golang.org/x/sys v0.24.0 // indirect
golang.org/x/text v0.17.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240827150818-7e3bb234dfed // indirect
google.golang.org/grpc v1.66.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
)
33 changes: 33 additions & 0 deletions flight-data-ingester/ingester/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
github.com/GreptimeTeam/greptime-proto v0.7.0 h1:WHBjAu+NWDFcbZgW9kPtksxEKEAeqYemP1HY63QuO48=
github.com/GreptimeTeam/greptime-proto v0.7.0/go.mod h1:jk5XBR9qIbSBiDF2Gix1KALyIMCVktcpx91AayOWxmE=
github.com/GreptimeTeam/greptimedb-ingester-go v0.5.0 h1:pSlGkdNCyxBjmh9WIHYlOn9UjLuymrUtpoOJOXMk+Uo=
github.com/GreptimeTeam/greptimedb-ingester-go v0.5.0/go.mod h1:/LuHS3Bimqcuja1yEKkhpeP8ZeO7MpxwxBvVqHF52n8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stoewer/go-strcase v1.3.0 h1:g0eASXYtp+yvN9fK8sH94oCIk0fau9uV1/ZdJ0AVEzs=
github.com/stoewer/go-strcase v1.3.0/go.mod h1:fAH5hQ5pehh+j3nZfvwdk2RgEgQjAoM8wodgtPmh1xo=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE=
golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg=
golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg=
golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc=
golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240827150818-7e3bb234dfed h1:J6izYgfBXAI3xTKLgxzTmUltdYaLsuBxFCgDHWJ/eXg=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240827150818-7e3bb234dfed/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU=
google.golang.org/grpc v1.66.0 h1:DibZuoBznOxbDQxRINckZcUvnCEvrW9pcWIE2yF9r1c=
google.golang.org/grpc v1.66.0/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Loading