Skip to content

Commit

Permalink
fix(server,worker): change citygml pack API responses
Browse files Browse the repository at this point in the history
  • Loading branch information
rot1024 committed Nov 13, 2024
1 parent 887159d commit 677e3d3
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 27 deletions.
2 changes: 1 addition & 1 deletion server/citygml/echo.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func Echo(conf PackerConfig, g *echo.Group) error {
idZip := c.Param("id.zip")
const suffix = ".zip"
if !strings.HasSuffix(idZip, suffix) {
return c.NoContent(http.StatusBadRequest)
return c.JSON(http.StatusNotFound, map[string]string{"error": "not found"})
}
return p.handleGetZip(c, strings.TrimSuffix(idZip, suffix))
})
Expand Down
52 changes: 31 additions & 21 deletions server/citygml/packer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"cloud.google.com/go/storage"
"github.com/labstack/echo/v4"
"github.com/reearth/reearthx/log"
"google.golang.org/api/cloudbuild/v1"
"google.golang.org/api/googleapi"
)
Expand All @@ -32,6 +33,8 @@ const (
PackStatusProcessing = "processing"
PackStatusSucceeded = "succeeded"
PackStatusFailed = "failed"

timeooutSignedURL = 10 * time.Minute
)

type packer struct {
Expand All @@ -57,35 +60,39 @@ func (p *packer) handleGetZip(c echo.Context, hash string) error {
obj := p.bucket.Object(hash + ".zip")
attrs, err := obj.Attrs(ctx)
if errors.Is(err, storage.ErrObjectNotExist) {
return c.NoContent(http.StatusNotFound)
return c.JSON(http.StatusNotFound, map[string]any{"error": "not found"})
}
if status := GetStatus(attrs.Metadata); status != PackStatusSucceeded {

if status := getStatus(attrs.Metadata); status != PackStatusSucceeded {
return c.JSON(http.StatusBadRequest, map[string]any{
"error": "invalid status",
"status": status,
"reason": "invalid status",
})
}

signedURL, err := p.bucket.SignedURL(obj.ObjectName(), &storage.SignedURLOptions{
Method: http.MethodGet,
Expires: time.Now().Add(5 * time.Minute),
Expires: time.Now().Add(timeooutSignedURL),
})

if err != nil {
log.Errorfc(ctx, "citygml: packer: failed to issue signed url: %v", err)
return c.JSON(http.StatusInternalServerError, map[string]any{
"error": err.Error(),
"reason": "failed to issue signed url",
"error": "failed to issue url",
})
}

return c.Redirect(http.StatusFound, signedURL)
}

func (p *packer) handleGetStatus(c echo.Context, hash string) error {
ctx := c.Request().Context()
attrs, err := p.bucket.Object(hash + ".zip").Attrs(ctx)
if errors.Is(err, storage.ErrObjectNotExist) {
return c.NoContent(http.StatusNotFound)
return c.JSON(http.StatusNotFound, map[string]any{"error": "not found"})
}
return c.JSON(http.StatusOK, map[string]any{
"status": GetStatus(attrs.Metadata),
"status": getStatus(attrs.Metadata),
})
}

Expand All @@ -96,27 +103,27 @@ func (p *packer) handlePackRequest(c echo.Context) error {
}
if err := c.Bind(&req); err != nil {
return c.JSON(http.StatusBadRequest, map[string]any{
"reason": "invalid request body",
"error": err.Error(),
"error": "invalid request body",
"reason": err.Error(),
})
}
if len(req.URLs) == 0 {
return c.JSON(http.StatusBadRequest, map[string]any{
"reason": "no urls provided",
"error": "no urls provided",
})
}
for _, citygmlURL := range req.URLs {
u, err := url.Parse(citygmlURL)
if err != nil {
return c.JSON(http.StatusBadRequest, map[string]any{
"url": citygmlURL,
"reason": "invalid url",
"url": citygmlURL,
"error": "invalid url",
})
}
if p.conf.Domain != "" && u.Host != p.conf.Domain {
return c.JSON(http.StatusBadRequest, map[string]any{
"url": citygmlURL,
"reason": "invalid domain",
"url": citygmlURL,
"error": "invalid domain",
})
}
}
Expand All @@ -135,9 +142,9 @@ func (p *packer) handlePackRequest(c echo.Context) error {
if err := w.Close(); err != nil {
var gErr *googleapi.Error
if !(errors.As(err, &gErr) && gErr.Code == http.StatusPreconditionFailed) {
log.Errorfc(ctx, "citygml: packer: failed to write metadata: %v", err)
return c.JSON(http.StatusInternalServerError, map[string]any{
"reason": "failed to write metadata",
"error": err.Error(),
"error": "failed to write metadata",
})
}
return c.JSON(http.StatusOK, resp)
Expand All @@ -148,9 +155,9 @@ func (p *packer) handlePackRequest(c echo.Context) error {
URLs: req.URLs,
}
if err := p.packAsync(ctx, packReq); err != nil {
log.Errorfc(ctx, "citygml: packer: failed to write metadata: %v", err)
return c.JSON(http.StatusInternalServerError, map[string]any{
"reason": "failed to enqueue pack job",
"error": err.Error(),
"error": "failed to enqueue pack job",
})
}
return c.JSON(http.StatusOK, resp)
Expand All @@ -161,7 +168,10 @@ func (p *packer) packAsync(ctx context.Context, req PackAsyncRequest) error {
Timeout: "86400s", // 1 day
QueueTtl: "86400s", // 1 day
Steps: []*cloudbuild.BuildStep{
{Name: p.conf.CityGMLPackerImage, Args: append([]string{"citygml-packer", "-dest", req.Dest, "-domain", req.Domain}, req.URLs...)},
{
Name: p.conf.CityGMLPackerImage,
Args: append([]string{"citygml-packer", "-dest", req.Dest, "-domain", req.Domain}, req.URLs...),
},
},
}
var err error
Expand All @@ -184,7 +194,7 @@ func Status(s string) map[string]string {
}
}

func GetStatus(metadata map[string]string) string {
func getStatus(metadata map[string]string) string {
return metadata["status"]
}

Expand Down
29 changes: 24 additions & 5 deletions worker/citygmlpacker/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,32 +25,38 @@ func Run(conf Config) (err error) {
if err != nil {
return fmt.Errorf("invalid destination bucket(%s): %w", conf.Dest, err)
}

gcs, err := storage.NewClient(ctx)
if err != nil {
return fmt.Errorf("storage.NewClient: %v", err)
}

obj := gcs.Bucket(destURL.Host).Object(path.Join(strings.TrimPrefix(destURL.Path, "/")))

defer func() {
if err == nil {
return
}
_, uErr := obj.Update(ctx, storage.ObjectAttrsToUpdate{
Metadata: citygml.Status(citygml.PackStatusFailed),
Metadata: citygml.Status(PackStatusFailed),
})
if uErr != nil {
log.Printf("failed to update status: (to=%s): %v", citygml.PackStatusFailed, uErr)
log.Printf("failed to update status: (to=%s): %v", PackStatusFailed, uErr)
}
}()

attrs, err := obj.Attrs(ctx)
if err != nil {
return fmt.Errorf("get metadata: %v", err)
}
if status := citygml.GetStatus(attrs.Metadata); status != citygml.PackStatusAccepted {

if status := getStatus(attrs.Metadata); status != PackStatusAccepted {
log.Printf("SKIPPED: already exists (status=%s)", status)
return nil
}

_, err = obj.If(storage.Conditions{GenerationMatch: attrs.Generation, MetagenerationMatch: attrs.Metageneration}).
Update(ctx, storage.ObjectAttrsToUpdate{Metadata: citygml.Status(citygml.PackStatusProcessing)})
Update(ctx, storage.ObjectAttrsToUpdate{Metadata: status(PackStatusProcessing)})
if err != nil {
var gErr *googleapi.Error
if !(errors.As(err, &gErr) && gErr.Code == http.StatusPreconditionFailed) {
Expand All @@ -61,7 +67,7 @@ func Run(conf Config) (err error) {
}

w := obj.NewWriter(ctx)
w.ObjectAttrs.Metadata = citygml.Status(citygml.PackStatusSucceeded)
w.ObjectAttrs.Metadata = citygml.Status(PackStatusSucceeded)
defer w.Close()
if err := pack(ctx, w, conf.Domain, conf.URLs); err != nil {
return fmt.Errorf("pack: %w", err)
Expand All @@ -88,9 +94,11 @@ func pack(ctx context.Context, w io.Writer, host string, gmlURLs []string) error
if err != nil {
return fmt.Errorf("invalid gml URL: %w", err)
}

if host != "" && u.Host != host {
return fmt.Errorf("invalid host: %s", u.Host)
}

nq, err = writeZip(ctx, zw, &httpClient, u, nq)
if err != nil {
return fmt.Errorf("writeZip: %w", err)
Expand All @@ -113,10 +121,12 @@ func writeZip(ctx context.Context, zw *zip.Writer, httpClient *http.Client, u *u
if err != nil {
return nil, fmt.Errorf("create: %w", err)
}

req, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil)
if err != nil {
return nil, fmt.Errorf("invalid url: %w", err)
}

resp, err := httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("request gml: %w", err)
Expand All @@ -126,6 +136,7 @@ func writeZip(ctx context.Context, zw *zip.Writer, httpClient *http.Client, u *u
sax := gosax.NewReader(io.TeeReader(resp.Body, ze))
sax.EmitSelfClosingTag = true
inAppImageURI := false

for {
e, err := sax.Event()
if err != nil {
Expand All @@ -140,11 +151,14 @@ func writeZip(ctx context.Context, zw *zip.Writer, httpClient *http.Client, u *u
if err != nil {
return nil, fmt.Errorf("start element error: %w", err)
}

inAppImageURI = t.Name.Space == "app" && t.Name.Local == "imageURI"

for _, a := range t.Attr {
if a.Name.Space == "" && a.Name.Local == "codeSpace" {
depsMap[a.Value] = struct{}{}
}

if a.Name.Space == "xsi" && a.Name.Local == "schemaLocation" {
s := a.Value
for s != "" {
Expand All @@ -168,6 +182,7 @@ func writeZip(ctx context.Context, zw *zip.Writer, httpClient *http.Client, u *u
default:
}
}

deps := make([]string, 0, len(depsMap))
for dep := range depsMap {
deps = append(deps, dep)
Expand All @@ -183,19 +198,23 @@ func writeZip(ctx context.Context, zw *zip.Writer, httpClient *http.Client, u *u
if err != nil {
return nil, fmt.Errorf("create: %w", err)
}

req, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil)
if err != nil {
return nil, fmt.Errorf("invalid url: %w", err)
}

resp, err := httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("request: %w", err)
}

_, err = io.Copy(w, resp.Body)
_ = resp.Body.Close()
if err != nil {
return nil, fmt.Errorf("copy response: %w", err)
}
}

return q, nil
}
18 changes: 18 additions & 0 deletions worker/citygmlpacker/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package citygmlpacker

const (
PackStatusAccepted = "accepted"
PackStatusProcessing = "processing"
PackStatusSucceeded = "succeeded"
PackStatusFailed = "failed"
)

func getStatus(metadata map[string]string) string {
return metadata["status"]
}

func status(s string) map[string]string {
return map[string]string{
"status": s,
}
}

0 comments on commit 677e3d3

Please sign in to comment.