diff --git a/flight-data-ingester/README.md b/flight-data-ingester/README.md new file mode 100644 index 0000000..331da37 --- /dev/null +++ b/flight-data-ingester/README.md @@ -0,0 +1,41 @@ +# GreptimeDB Flight + +This project demonstrates GreptimeDB's ingesting geo-spatial data using the the `greptimedb-ingester-go` client. It selects the last 10 flights that departed in the last 30 minutes from the configured icao airport code. The ingester script utilizes the [OpenSky Network API](https://opensky-network.org/apidoc/) to fetch flight state data and inserts the flight metrics into GreptimeDB + +## How to run this demo + +Ensure you have `git`, `docker`, `docker-compose` +installed. To run this demo: + +```shell +git clone https://github.com/GreptimeTeam/demo-scene.git +cd demo-scene/flight-data-ingester +docker compose up +``` + +It can take a while for the first run to pull down images and also build necessary components. + +## How it works + +The topology is illustrated in this diagram. + +```mermaid +flowchart LR + greptimedb[(GreptimeDB)] + + api{Open Sky Data} --> go-ingester + go-ingester --> greptimedb +``` + +after GreptimeDB starts, we use the `ingester` script which uses the go client's [high level api](https://docs.greptime.com/user-guide/ingest-data/for-iot/grpc-sdks/go/#installation) to create the table and insert data. It's dead-simple to perform transformations and data munging on your struct and insert into target GreptimeDB columns by tagging your metric struct accordingly as seen in the `./ingester/dto.go` file . + +## Note + +please update the ICAO flight code for your local airport of the city you are +running the demo in to ensure there is data in the set. This can be done by +setting the ``ICAO_AIRPORT_CODE` environment variable when running `docker +compose up`. + +If you are going to restart this demo, press `Ctrl-C` and remember to call +`docker compose down` to clean up the data before you run `docker compose up` +again. diff --git a/flight-data-ingester/docker-compose.yml b/flight-data-ingester/docker-compose.yml new file mode 100644 index 0000000..4dc5e0c --- /dev/null +++ b/flight-data-ingester/docker-compose.yml @@ -0,0 +1,31 @@ +services: + greptimedb: + image: docker.io/greptime/greptimedb:v0.9.2 + command: standalone start --http-addr=0.0.0.0:4000 --rpc-addr=0.0.0.0:4001 --mysql-addr=0.0.0.0:4002 --postgres-addr 0.0.0.0:4003 + ports: + - 4000:4000 + - 4001:4001 + - 4002:4002 + - 4003:4003 + networks: + - demo-network + healthcheck: + test: ["CMD", "curl", "-f", "http://127.0.0.1:4000/health"] + interval: 3s + timeout: 3s + retries: 5 + + ingester: + build: + context: ./ingester + dockerfile: Dockerfile + networks: + - demo-network + environment: + - ICAO_AIRPORT_CODE=${ICAO_AIRPORT_CODE:-KSFO} + depends_on: + greptimedb: + condition: service_started + +networks: + demo-network: diff --git a/flight-data-ingester/ingester/Dockerfile b/flight-data-ingester/ingester/Dockerfile new file mode 100644 index 0000000..3417fa6 --- /dev/null +++ b/flight-data-ingester/ingester/Dockerfile @@ -0,0 +1,18 @@ +FROM docker.io/golang:1.21-alpine + +WORKDIR /app + +# Copy go mod and sum files +COPY go.mod go.sum ./ + +# Download all dependencies +RUN go mod download + +# Copy the source code +COPY *.go ./ + +# Build the application +RUN CGO_ENABLED=0 GOOS=linux go build -o /app/producer + +# Run the binary +CMD ["./producer"] diff --git a/flight-data-ingester/ingester/dto.go b/flight-data-ingester/ingester/dto.go new file mode 100644 index 0000000..6133bfd --- /dev/null +++ b/flight-data-ingester/ingester/dto.go @@ -0,0 +1,95 @@ +package main + +import "time" + +type FlightMetric struct { + Icao24 string `greptime:"tag;column:icao24;type:string"` + Longitude *float64 `greptime:"tag;column:longitude;type:float64"` + Latitude *float64 `greptime:"tag;column:latitude;type:float64"` + Velocity *float64 `greptime:"tag;column:velocity;type:float64"` + BaroAltitude *float64 `greptime:"tag;column:baro_altitude;type:float64"` + GeoAltitude *float64 `greptime:"tag;column:geo_altitude;type:float64"` + TimePosition *int64 `greptime:"tag;column:time_position;type:int64"` + Timestamp time.Time `greptime:"timestamp;column:ts;type:timestamp;precision:millisecond"` +} + +func (FlightMetric) TableName() string { + return "icao24_state" +} + +type Flight struct { + Icao24 string `json:"icao24"` + FirstSeen int64 `json:"firstSeen"` + EstDepartureAirport string `json:"estDepartureAirport"` + LastSeen int64 `json:"lastSeen"` + EstArrivalAirport string `json:"estArrivalAirport"` + CallSign string `json:"callsign"` + EstDepartureTime int64 `json:"estDepartureTime"` + EstArrivalTime int64 `json:"estArrivalTime"` + DepartureAirportCandidatesCount int `json:"departureAirportCandidatesCount"` + ArrivalAirportCandidatesCount int `json:"arrivalAirportCandidatesCount"` +} + +type StateResponse struct { + Time int64 `json:"time"` + States [][]interface{} `json:"states"` +} + +func (sr *StateResponse) ToFlightMetrics() []FlightMetric { + metrics := make([]FlightMetric, 0, len(sr.States)) + for _, state := range sr.States { + if len(state) < 17 { + continue // Skip if the state doesn't have enough elements + } + // check the state vector positions + + metric := FlightMetric{ + Icao24: state[0].(string), + Longitude: floatToFloat64Ptr(state[5]), + Latitude: floatToFloat64Ptr(state[6]), + BaroAltitude: floatToFloat64Ptr(state[7]), + Velocity: floatToFloat64Ptr(state[9]), + GeoAltitude: floatToFloat64Ptr(state[13]), + Timestamp: time.Unix(sr.Time, 0), + TimePosition: floatToInt64Ptr(state[3]), + } + metrics = append(metrics, metric) + } + return metrics +} + +// https://openskynetwork.github.io/opensky-api/rest.html#all-state-vectors +// StateData{ +// Icao24: state[0].(string), +// Callsign: state[1].(string), +// OriginCountry: state[2].(string), +// TimePosition: floatToInt64Ptr(state[3]), +// LastContact: floatToInt64(state[4]), +// Longitude: floatToFloat64Ptr(state[5]), +// Latitude: floatToFloat64Ptr(state[6]), +// BaroAltitude: floatToFloat64Ptr(state[7]), +// OnGround: state[8].(bool), +// Velocity: floatToFloat64Ptr(state[9]), +// TrueTrack: floatToFloat64Ptr(state[10]), +// VerticalRate: floatToFloat64Ptr(state[11]), +// Sensors: nil, // This field is null in the example +// GeoAltitude: floatToFloat64Ptr(state[13]), +// Squawk: nil, // This field is null in the example +// Spi: state[15].(bool), +// PositionSource: floatToInt(state[16]), + +func floatToInt64Ptr(v interface{}) *int64 { + if v == nil { + return nil + } + i := int64(v.(float64)) + return &i +} + +func floatToFloat64Ptr(v interface{}) *float64 { + if v == nil { + return nil + } + f := v.(float64) + return &f +} diff --git a/flight-data-ingester/ingester/go.mod b/flight-data-ingester/ingester/go.mod new file mode 100644 index 0000000..080ebd2 --- /dev/null +++ b/flight-data-ingester/ingester/go.mod @@ -0,0 +1,16 @@ +module github.com/GreptimeTeam/demo-scene/flight-data-ingester + +go 1.21 + +require ( + github.com/GreptimeTeam/greptime-proto v0.7.0 // indirect + github.com/GreptimeTeam/greptimedb-ingester-go v0.5.0 // indirect + github.com/golang/protobuf v1.5.4 // indirect + github.com/stoewer/go-strcase v1.3.0 // indirect + golang.org/x/net v0.28.0 // indirect + golang.org/x/sys v0.24.0 // indirect + golang.org/x/text v0.17.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240827150818-7e3bb234dfed // indirect + google.golang.org/grpc v1.66.0 // indirect + google.golang.org/protobuf v1.34.2 // indirect +) diff --git a/flight-data-ingester/ingester/go.sum b/flight-data-ingester/ingester/go.sum new file mode 100644 index 0000000..018abbf --- /dev/null +++ b/flight-data-ingester/ingester/go.sum @@ -0,0 +1,33 @@ +github.com/GreptimeTeam/greptime-proto v0.7.0 h1:WHBjAu+NWDFcbZgW9kPtksxEKEAeqYemP1HY63QuO48= +github.com/GreptimeTeam/greptime-proto v0.7.0/go.mod h1:jk5XBR9qIbSBiDF2Gix1KALyIMCVktcpx91AayOWxmE= +github.com/GreptimeTeam/greptimedb-ingester-go v0.5.0 h1:pSlGkdNCyxBjmh9WIHYlOn9UjLuymrUtpoOJOXMk+Uo= +github.com/GreptimeTeam/greptimedb-ingester-go v0.5.0/go.mod h1:/LuHS3Bimqcuja1yEKkhpeP8ZeO7MpxwxBvVqHF52n8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stoewer/go-strcase v1.3.0 h1:g0eASXYtp+yvN9fK8sH94oCIk0fau9uV1/ZdJ0AVEzs= +github.com/stoewer/go-strcase v1.3.0/go.mod h1:fAH5hQ5pehh+j3nZfvwdk2RgEgQjAoM8wodgtPmh1xo= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= +golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= +golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= +golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= +golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240827150818-7e3bb234dfed h1:J6izYgfBXAI3xTKLgxzTmUltdYaLsuBxFCgDHWJ/eXg= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240827150818-7e3bb234dfed/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= +google.golang.org/grpc v1.66.0 h1:DibZuoBznOxbDQxRINckZcUvnCEvrW9pcWIE2yF9r1c= +google.golang.org/grpc v1.66.0/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/flight-data-ingester/ingester/main.go b/flight-data-ingester/ingester/main.go new file mode 100644 index 0000000..aa0770e --- /dev/null +++ b/flight-data-ingester/ingester/main.go @@ -0,0 +1,168 @@ +package main + +import ( + "context" + "os" + "os/signal" + "strings" + + greptime "github.com/GreptimeTeam/greptimedb-ingester-go" + + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "syscall" + "time" +) + +var ( + ICAO_AIRPORT_CODE string + numFlights = 10 +) + +var greptimeClient *greptime.Client + +func init() { + ICAO_AIRPORT_CODE = os.Getenv("ICAO_AIRPORT_CODE") + if ICAO_AIRPORT_CODE == "" { + log.Fatalf("❌ ICAO_AIRPORT_CODE environment variable is not set") + } + + cfg := greptime.NewConfig("greptimedb"). + WithDatabase("public"). + WithAuth(os.Getenv("GREPTIME_USERNAME"), os.Getenv("GREPTIME_PASSWORD")) + + var err error + greptimeClient, err = greptime.NewClient(cfg) + if err != nil { + log.Fatalf("❌ Failed to create GreptimeDB client: %v", err) + } + + fmt.Println("📌 GreptimeDB client initialized successfully") +} + +func main() { + + fmt.Println("📌 starting flight-data-ingester demo") + + trackFlights, err := SelectLiveFlights(ICAO_AIRPORT_CODE, numFlights) + if err != nil { + log.Fatalf("❌ could not select live flights %+v\n", err) + } + + ticker := time.NewTicker(15 * time.Second) + defer ticker.Stop() + + done := make(chan os.Signal, 1) + signal.Notify(done, os.Interrupt, syscall.SIGTERM) + + for { + select { + case <-ticker.C: + statesResponse, err := GetFlightState(trackFlights) + if err != nil { + fmt.Printf("❌ could not get flight state: %+v\n", err) + continue + } + + flightMetrics := statesResponse.ToFlightMetrics() + + resp, err := greptimeClient.WriteObject(context.Background(), flightMetrics) + if err != nil { + fmt.Printf("❌ could not write to greptime: %+v\n", err) + continue + } + log.Printf("inserted rows: %d\n", resp.GetAffectedRows().GetValue()) + + case <-done: + fmt.Println("Received termination signal. Shutting down...") + return + } + } +} + +func GetFlightState(flight []Flight) (*StateResponse, error) { + icaoCsv := "" + icaos := make([]string, len(flight)) + for _, f := range flight { + icaos = append(icaos, f.Icao24) + } + icaoCsv = strings.Join(icaos, ",") + url := fmt.Sprintf("https://opensky-network.org/api/states/all?icao24=%s", icaoCsv) + + client := &http.Client{Timeout: 10 * time.Second} + resp, err := client.Get(url) + if err != nil { + return nil, fmt.Errorf("error making request: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + bodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("unexpected status code: %d, and error reading response body: %v", resp.StatusCode, err) + } + bodyString := string(bodyBytes) + return nil, fmt.Errorf("unexpected status code: %d : %s", resp.StatusCode, bodyString) + } + + bodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("error reading response body: %v", err) + } + + var statesResponse StateResponse + err = json.Unmarshal(bodyBytes, &statesResponse) + if err != nil { + return nil, fmt.Errorf("error decoding response: %v - response body: %s", err, string(bodyBytes)) + } + + if len(statesResponse.States) == 0 { + return nil, fmt.Errorf("no aircraft data found for the given flights") + } + + fmt.Printf("selected: state data for %+v flights\n", len(statesResponse.States)) + return &statesResponse, nil +} + +func SelectLiveFlights(airportCode string, num int) ([]Flight, error) { + endTime := time.Now().Unix() + startTime := endTime - 60*30 // all flights leaving in last 30 minutes + + url := fmt.Sprintf("https://opensky-network.org/api/flights/departure?begin=%d&end=%d&airport=%s", startTime, endTime, airportCode) + + client := &http.Client{Timeout: 10 * time.Second} + resp, err := client.Get(url) + if err != nil { + return nil, fmt.Errorf("error making request: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + bodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("unexpected status code: %d, and error reading response body: %v", resp.StatusCode, err) + } + bodyString := string(bodyBytes) + return nil, fmt.Errorf("unexpected status code: %d : %s", resp.StatusCode, bodyString) + } + + var flights []Flight + + if err := json.NewDecoder(resp.Body).Decode(&flights); err != nil { + return nil, fmt.Errorf("error decoding response: %+v", err) + } + + if len(flights) == 0 { + return nil, fmt.Errorf("no flights found in the last 10 minutes") + } + + if num > len(flights) { + num = len(flights) + } + + return flights[:num], nil + +}