From 6e02d38f26f0462ddda8f2a27d39b3bf728830cf Mon Sep 17 00:00:00 2001 From: truedrju <76677784+truedrju@users.noreply.github.com> Date: Fri, 30 Aug 2024 09:09:14 -0700 Subject: [PATCH 1/2] adds demo scripts and docker compose files --- flight-data-ingester/README.md | 38 +++++ flight-data-ingester/docker-compose.yml | 31 +++++ flight-data-ingester/ingester/Dockerfile | 18 +++ flight-data-ingester/ingester/dto.go | 95 +++++++++++++ flight-data-ingester/ingester/go.mod | 16 +++ flight-data-ingester/ingester/go.sum | 33 +++++ flight-data-ingester/ingester/main.go | 168 +++++++++++++++++++++++ 7 files changed, 399 insertions(+) create mode 100644 flight-data-ingester/README.md create mode 100644 flight-data-ingester/docker-compose.yml create mode 100644 flight-data-ingester/ingester/Dockerfile create mode 100644 flight-data-ingester/ingester/dto.go create mode 100644 flight-data-ingester/ingester/go.mod create mode 100644 flight-data-ingester/ingester/go.sum create mode 100644 flight-data-ingester/ingester/main.go diff --git a/flight-data-ingester/README.md b/flight-data-ingester/README.md new file mode 100644 index 0000000..32f1a5d --- /dev/null +++ b/flight-data-ingester/README.md @@ -0,0 +1,38 @@ +# GreptimeDB Flight + +This project demonstrates GreptimeDB's ingesting geo-spatial data using the the `greptimedb-ingester-go` client. It selects the last 10 flights that departed in the last 30 minutes from the configured icao airport code. The ingester script utilizes the [OpenSky Network API](https://opensky-network.org/apidoc/) to fetch flight state data and inserts the flight metrics into GreptimeDB + +## How to run this demo + +Ensure you have `git`, `docker`, `docker-compose` +installed. To run this demo: + +```shell +git clone https://github.com/GreptimeTeam/demo-scene.git +cd demo-scene/flight-data-ingester +docker compose up +``` + +It can take a while for the first run to pull down images and also build necessary components. + +## How it works + +The topology is illustrated in this diagram. + +```mermaid +flowchart LR + greptimedb[(GreptimeDB)] + + api{Open Sky Data} --> go-ingester + go-ingester --> greptimedb +``` + +after GreptimeDB starts, we use the `ingester` script which uses the go client's [high level api](https://docs.greptime.com/user-guide/ingest-data/for-iot/grpc-sdks/go/#installation) to create the table and insert data. It's dead-simple to perform transformations and data munging on your struct and insert into target GreptimeDB columns by tagging your metric struct accordingly as seen in the `./ingester/dto.go` file . + +## Note + +please update the ICAO flight code for your local airport of the city you are running the demo in to ensure there is data in the set. This can be done by updating the ICAO_AIRPORT_CODE environment variable in your docker compose file + +If you are going to restart this demo, press `Ctrl-C` and remember to call +`docker compose down` to clean up the data before you run `docker compose up` +again. diff --git a/flight-data-ingester/docker-compose.yml b/flight-data-ingester/docker-compose.yml new file mode 100644 index 0000000..bfc85f6 --- /dev/null +++ b/flight-data-ingester/docker-compose.yml @@ -0,0 +1,31 @@ +services: + greptimedb: + image: docker.io/greptime/greptimedb:v0.9.2 + command: standalone start --http-addr=0.0.0.0:4000 --rpc-addr=0.0.0.0:4001 --mysql-addr=0.0.0.0:4002 --postgres-addr 0.0.0.0:4003 + ports: + - 4000:4000 + - 4001:4001 + - 4002:4002 + - 4003:4003 + networks: + - demo-network + healthcheck: + test: ["CMD", "curl", "-f", "http://127.0.0.1:4000/health"] + interval: 3s + timeout: 3s + retries: 5 + + ingester: + build: + context: ./ingester + dockerfile: Dockerfile + networks: + - demo-network + environment: + - ICAO_AIRPORT_CODE=KSFO + depends_on: + greptimedb: + condition: service_started + +networks: + demo-network: diff --git a/flight-data-ingester/ingester/Dockerfile b/flight-data-ingester/ingester/Dockerfile new file mode 100644 index 0000000..759326c --- /dev/null +++ b/flight-data-ingester/ingester/Dockerfile @@ -0,0 +1,18 @@ +FROM golang:1.21-alpine + +WORKDIR /app + +# Copy go mod and sum files +COPY go.mod go.sum ./ + +# Download all dependencies +RUN go mod download + +# Copy the source code +COPY *.go ./ + +# Build the application +RUN CGO_ENABLED=0 GOOS=linux go build -o /app/producer + +# Run the binary +CMD ["./producer"] \ No newline at end of file diff --git a/flight-data-ingester/ingester/dto.go b/flight-data-ingester/ingester/dto.go new file mode 100644 index 0000000..6133bfd --- /dev/null +++ b/flight-data-ingester/ingester/dto.go @@ -0,0 +1,95 @@ +package main + +import "time" + +type FlightMetric struct { + Icao24 string `greptime:"tag;column:icao24;type:string"` + Longitude *float64 `greptime:"tag;column:longitude;type:float64"` + Latitude *float64 `greptime:"tag;column:latitude;type:float64"` + Velocity *float64 `greptime:"tag;column:velocity;type:float64"` + BaroAltitude *float64 `greptime:"tag;column:baro_altitude;type:float64"` + GeoAltitude *float64 `greptime:"tag;column:geo_altitude;type:float64"` + TimePosition *int64 `greptime:"tag;column:time_position;type:int64"` + Timestamp time.Time `greptime:"timestamp;column:ts;type:timestamp;precision:millisecond"` +} + +func (FlightMetric) TableName() string { + return "icao24_state" +} + +type Flight struct { + Icao24 string `json:"icao24"` + FirstSeen int64 `json:"firstSeen"` + EstDepartureAirport string `json:"estDepartureAirport"` + LastSeen int64 `json:"lastSeen"` + EstArrivalAirport string `json:"estArrivalAirport"` + CallSign string `json:"callsign"` + EstDepartureTime int64 `json:"estDepartureTime"` + EstArrivalTime int64 `json:"estArrivalTime"` + DepartureAirportCandidatesCount int `json:"departureAirportCandidatesCount"` + ArrivalAirportCandidatesCount int `json:"arrivalAirportCandidatesCount"` +} + +type StateResponse struct { + Time int64 `json:"time"` + States [][]interface{} `json:"states"` +} + +func (sr *StateResponse) ToFlightMetrics() []FlightMetric { + metrics := make([]FlightMetric, 0, len(sr.States)) + for _, state := range sr.States { + if len(state) < 17 { + continue // Skip if the state doesn't have enough elements + } + // check the state vector positions + + metric := FlightMetric{ + Icao24: state[0].(string), + Longitude: floatToFloat64Ptr(state[5]), + Latitude: floatToFloat64Ptr(state[6]), + BaroAltitude: floatToFloat64Ptr(state[7]), + Velocity: floatToFloat64Ptr(state[9]), + GeoAltitude: floatToFloat64Ptr(state[13]), + Timestamp: time.Unix(sr.Time, 0), + TimePosition: floatToInt64Ptr(state[3]), + } + metrics = append(metrics, metric) + } + return metrics +} + +// https://openskynetwork.github.io/opensky-api/rest.html#all-state-vectors +// StateData{ +// Icao24: state[0].(string), +// Callsign: state[1].(string), +// OriginCountry: state[2].(string), +// TimePosition: floatToInt64Ptr(state[3]), +// LastContact: floatToInt64(state[4]), +// Longitude: floatToFloat64Ptr(state[5]), +// Latitude: floatToFloat64Ptr(state[6]), +// BaroAltitude: floatToFloat64Ptr(state[7]), +// OnGround: state[8].(bool), +// Velocity: floatToFloat64Ptr(state[9]), +// TrueTrack: floatToFloat64Ptr(state[10]), +// VerticalRate: floatToFloat64Ptr(state[11]), +// Sensors: nil, // This field is null in the example +// GeoAltitude: floatToFloat64Ptr(state[13]), +// Squawk: nil, // This field is null in the example +// Spi: state[15].(bool), +// PositionSource: floatToInt(state[16]), + +func floatToInt64Ptr(v interface{}) *int64 { + if v == nil { + return nil + } + i := int64(v.(float64)) + return &i +} + +func floatToFloat64Ptr(v interface{}) *float64 { + if v == nil { + return nil + } + f := v.(float64) + return &f +} diff --git a/flight-data-ingester/ingester/go.mod b/flight-data-ingester/ingester/go.mod new file mode 100644 index 0000000..080ebd2 --- /dev/null +++ b/flight-data-ingester/ingester/go.mod @@ -0,0 +1,16 @@ +module github.com/GreptimeTeam/demo-scene/flight-data-ingester + +go 1.21 + +require ( + github.com/GreptimeTeam/greptime-proto v0.7.0 // indirect + github.com/GreptimeTeam/greptimedb-ingester-go v0.5.0 // indirect + github.com/golang/protobuf v1.5.4 // indirect + github.com/stoewer/go-strcase v1.3.0 // indirect + golang.org/x/net v0.28.0 // indirect + golang.org/x/sys v0.24.0 // indirect + golang.org/x/text v0.17.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240827150818-7e3bb234dfed // indirect + google.golang.org/grpc v1.66.0 // indirect + google.golang.org/protobuf v1.34.2 // indirect +) diff --git a/flight-data-ingester/ingester/go.sum b/flight-data-ingester/ingester/go.sum new file mode 100644 index 0000000..018abbf --- /dev/null +++ b/flight-data-ingester/ingester/go.sum @@ -0,0 +1,33 @@ +github.com/GreptimeTeam/greptime-proto v0.7.0 h1:WHBjAu+NWDFcbZgW9kPtksxEKEAeqYemP1HY63QuO48= +github.com/GreptimeTeam/greptime-proto v0.7.0/go.mod h1:jk5XBR9qIbSBiDF2Gix1KALyIMCVktcpx91AayOWxmE= +github.com/GreptimeTeam/greptimedb-ingester-go v0.5.0 h1:pSlGkdNCyxBjmh9WIHYlOn9UjLuymrUtpoOJOXMk+Uo= +github.com/GreptimeTeam/greptimedb-ingester-go v0.5.0/go.mod h1:/LuHS3Bimqcuja1yEKkhpeP8ZeO7MpxwxBvVqHF52n8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stoewer/go-strcase v1.3.0 h1:g0eASXYtp+yvN9fK8sH94oCIk0fau9uV1/ZdJ0AVEzs= +github.com/stoewer/go-strcase v1.3.0/go.mod h1:fAH5hQ5pehh+j3nZfvwdk2RgEgQjAoM8wodgtPmh1xo= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= +golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= +golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= +golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= +golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240827150818-7e3bb234dfed h1:J6izYgfBXAI3xTKLgxzTmUltdYaLsuBxFCgDHWJ/eXg= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240827150818-7e3bb234dfed/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= +google.golang.org/grpc v1.66.0 h1:DibZuoBznOxbDQxRINckZcUvnCEvrW9pcWIE2yF9r1c= +google.golang.org/grpc v1.66.0/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/flight-data-ingester/ingester/main.go b/flight-data-ingester/ingester/main.go new file mode 100644 index 0000000..aa0770e --- /dev/null +++ b/flight-data-ingester/ingester/main.go @@ -0,0 +1,168 @@ +package main + +import ( + "context" + "os" + "os/signal" + "strings" + + greptime "github.com/GreptimeTeam/greptimedb-ingester-go" + + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "syscall" + "time" +) + +var ( + ICAO_AIRPORT_CODE string + numFlights = 10 +) + +var greptimeClient *greptime.Client + +func init() { + ICAO_AIRPORT_CODE = os.Getenv("ICAO_AIRPORT_CODE") + if ICAO_AIRPORT_CODE == "" { + log.Fatalf("❌ ICAO_AIRPORT_CODE environment variable is not set") + } + + cfg := greptime.NewConfig("greptimedb"). + WithDatabase("public"). + WithAuth(os.Getenv("GREPTIME_USERNAME"), os.Getenv("GREPTIME_PASSWORD")) + + var err error + greptimeClient, err = greptime.NewClient(cfg) + if err != nil { + log.Fatalf("❌ Failed to create GreptimeDB client: %v", err) + } + + fmt.Println("📌 GreptimeDB client initialized successfully") +} + +func main() { + + fmt.Println("📌 starting flight-data-ingester demo") + + trackFlights, err := SelectLiveFlights(ICAO_AIRPORT_CODE, numFlights) + if err != nil { + log.Fatalf("❌ could not select live flights %+v\n", err) + } + + ticker := time.NewTicker(15 * time.Second) + defer ticker.Stop() + + done := make(chan os.Signal, 1) + signal.Notify(done, os.Interrupt, syscall.SIGTERM) + + for { + select { + case <-ticker.C: + statesResponse, err := GetFlightState(trackFlights) + if err != nil { + fmt.Printf("❌ could not get flight state: %+v\n", err) + continue + } + + flightMetrics := statesResponse.ToFlightMetrics() + + resp, err := greptimeClient.WriteObject(context.Background(), flightMetrics) + if err != nil { + fmt.Printf("❌ could not write to greptime: %+v\n", err) + continue + } + log.Printf("inserted rows: %d\n", resp.GetAffectedRows().GetValue()) + + case <-done: + fmt.Println("Received termination signal. Shutting down...") + return + } + } +} + +func GetFlightState(flight []Flight) (*StateResponse, error) { + icaoCsv := "" + icaos := make([]string, len(flight)) + for _, f := range flight { + icaos = append(icaos, f.Icao24) + } + icaoCsv = strings.Join(icaos, ",") + url := fmt.Sprintf("https://opensky-network.org/api/states/all?icao24=%s", icaoCsv) + + client := &http.Client{Timeout: 10 * time.Second} + resp, err := client.Get(url) + if err != nil { + return nil, fmt.Errorf("error making request: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + bodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("unexpected status code: %d, and error reading response body: %v", resp.StatusCode, err) + } + bodyString := string(bodyBytes) + return nil, fmt.Errorf("unexpected status code: %d : %s", resp.StatusCode, bodyString) + } + + bodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("error reading response body: %v", err) + } + + var statesResponse StateResponse + err = json.Unmarshal(bodyBytes, &statesResponse) + if err != nil { + return nil, fmt.Errorf("error decoding response: %v - response body: %s", err, string(bodyBytes)) + } + + if len(statesResponse.States) == 0 { + return nil, fmt.Errorf("no aircraft data found for the given flights") + } + + fmt.Printf("selected: state data for %+v flights\n", len(statesResponse.States)) + return &statesResponse, nil +} + +func SelectLiveFlights(airportCode string, num int) ([]Flight, error) { + endTime := time.Now().Unix() + startTime := endTime - 60*30 // all flights leaving in last 30 minutes + + url := fmt.Sprintf("https://opensky-network.org/api/flights/departure?begin=%d&end=%d&airport=%s", startTime, endTime, airportCode) + + client := &http.Client{Timeout: 10 * time.Second} + resp, err := client.Get(url) + if err != nil { + return nil, fmt.Errorf("error making request: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + bodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("unexpected status code: %d, and error reading response body: %v", resp.StatusCode, err) + } + bodyString := string(bodyBytes) + return nil, fmt.Errorf("unexpected status code: %d : %s", resp.StatusCode, bodyString) + } + + var flights []Flight + + if err := json.NewDecoder(resp.Body).Decode(&flights); err != nil { + return nil, fmt.Errorf("error decoding response: %+v", err) + } + + if len(flights) == 0 { + return nil, fmt.Errorf("no flights found in the last 10 minutes") + } + + if num > len(flights) { + num = len(flights) + } + + return flights[:num], nil + +} From bd4093fc336640c1e9d237d356f5c197cd4dbfbf Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Fri, 30 Aug 2024 09:47:42 -0700 Subject: [PATCH 2/2] chore: minor update for using environment variable for icao_airport_code --- flight-data-ingester/README.md | 5 ++++- flight-data-ingester/docker-compose.yml | 2 +- flight-data-ingester/ingester/Dockerfile | 4 ++-- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/flight-data-ingester/README.md b/flight-data-ingester/README.md index 32f1a5d..331da37 100644 --- a/flight-data-ingester/README.md +++ b/flight-data-ingester/README.md @@ -31,7 +31,10 @@ after GreptimeDB starts, we use the `ingester` script which uses the go client's ## Note -please update the ICAO flight code for your local airport of the city you are running the demo in to ensure there is data in the set. This can be done by updating the ICAO_AIRPORT_CODE environment variable in your docker compose file +please update the ICAO flight code for your local airport of the city you are +running the demo in to ensure there is data in the set. This can be done by +setting the ``ICAO_AIRPORT_CODE` environment variable when running `docker +compose up`. If you are going to restart this demo, press `Ctrl-C` and remember to call `docker compose down` to clean up the data before you run `docker compose up` diff --git a/flight-data-ingester/docker-compose.yml b/flight-data-ingester/docker-compose.yml index bfc85f6..4dc5e0c 100644 --- a/flight-data-ingester/docker-compose.yml +++ b/flight-data-ingester/docker-compose.yml @@ -22,7 +22,7 @@ services: networks: - demo-network environment: - - ICAO_AIRPORT_CODE=KSFO + - ICAO_AIRPORT_CODE=${ICAO_AIRPORT_CODE:-KSFO} depends_on: greptimedb: condition: service_started diff --git a/flight-data-ingester/ingester/Dockerfile b/flight-data-ingester/ingester/Dockerfile index 759326c..3417fa6 100644 --- a/flight-data-ingester/ingester/Dockerfile +++ b/flight-data-ingester/ingester/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.21-alpine +FROM docker.io/golang:1.21-alpine WORKDIR /app @@ -15,4 +15,4 @@ COPY *.go ./ RUN CGO_ENABLED=0 GOOS=linux go build -o /app/producer # Run the binary -CMD ["./producer"] \ No newline at end of file +CMD ["./producer"]