Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: custom header validation #19

Merged
merged 18 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Changelog

## 1.0.0 (2024-10-10)

**Contributors:**
[@andrijadukic-syntio](https://github.com/andrijadukic-syntio),
[@nikolatomazin](https://github.com/nikolatomazin),
[@Simun17](https://github.com/Simun17),
[@jpecaric2](https://github.com/jpecaric2),
[@jkomericki](https://github.com/jkomericki),
[@robe-rt](https://github.com/robe-rt),
[@PetarBersic](https://github.com/PetarBersic),
[@MarijaSokcevic](https://github.com/MarijaSokcevic),
[@StipeZ](https://github.com/StipeZ),
[@NikaPozar](https://github.com/NikaPozar),
[@mia-ravlic](https://github.com/mia-ravlic),
[@AndreaBarbaric](https://github.com/AndreaBarbaric),
[@Lcero98](https://github.com/Lcero98),
[@mkuju95](https://github.com/mkuju95)

### ⚠ BREAKING CHANGES

* initial schema-registry implementation ([#1](https://github.com/dataphos/schema-registry/issues/1)) (#5)

### Features

* initial schema-registry implementation ([#1](https://github.com/dataphos/schema-registry/issues/1)) ([#5](https://github.com/dataphos/schema-registry/issues/5)) ([beedaef](https://github.com/dataphos/schema-registry/commit/beedaef818b5490a68318cceea077c361a5f8818))
4 changes: 2 additions & 2 deletions registry/cmd/initdb/initdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ package main
import (
"runtime/debug"

"github.com/dataphos/lib-logger/logger"
"github.com/dataphos/lib-logger/standardlogger"
"github.com/dataphos/schema-registry/internal/config"
"github.com/dataphos/schema-registry/internal/errcodes"
"github.com/dataphos/schema-registry/registry/repository/postgres"
"github.com/dataphos/lib-logger/logger"
"github.com/dataphos/lib-logger/standardlogger"
)

func main() {
Expand Down
5 changes: 1 addition & 4 deletions registry/compatibility/checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"os"
"path/filepath"
"runtime"
Expand All @@ -34,9 +33,7 @@ func TestNew(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

v, err := New(ctx, "http://localhost:8088", 2*time.Second)

fmt.Println(v.url)
_, err := New(ctx, "http://localhost:8088", 2*time.Second)

if err != nil {
t.Fatal(err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,11 @@
<artifactId>lombok</artifactId>
<version>1.18.22</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.1</version>
</dependency>

<dependency>
<groupId>io.apicurio</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import net.syntio.compatibility.checker.ProtobufChecker;

public class CheckerFactory {

public static CompatibilityChecker createChecker(String format) throws Exception {
return switch (format) {
case FileTypes.JSON -> new JsonChecker();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package net.syntio.compatibility;

public class Message {

private final String id;
private final String format;
private final String schema;
Expand All @@ -27,12 +28,11 @@ public Message(String id, String format, String schema) {
this.schema = schema;
}


public String getSchema() {
return schema;
}

public String getID() {
public String getId() {
return id;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@
import java.util.List;

public class AvroChecker implements net.syntio.compatibility.checker.CompatibilityChecker {

@Override
public boolean testCompatibility(CompatibilityLevel level, List<ContentHandle> history, ContentHandle currentSchema) {
public List<String> testCompatibility(CompatibilityLevel level, List<ContentHandle> history, ContentHandle currentSchema) {
io.confluent.kafka.schemaregistry.CompatibilityLevel avroCompatibilityLevel = switch (level) {
case NONE -> io.confluent.kafka.schemaregistry.CompatibilityLevel.NONE;
case BACKWARD -> io.confluent.kafka.schemaregistry.CompatibilityLevel.BACKWARD;
Expand All @@ -42,7 +43,7 @@ public boolean testCompatibility(CompatibilityLevel level, List<ContentHandle> h
}
AvroSchema newSchema = new AvroSchema(currentSchema.content());

List<String> issues = switch (avroCompatibilityLevel) {
return switch (avroCompatibilityLevel) {
case BACKWARD -> CompatibilityChecker.BACKWARD_CHECKER.isCompatible(newSchema, newHistory);
case BACKWARD_TRANSITIVE -> CompatibilityChecker.BACKWARD_TRANSITIVE_CHECKER.isCompatible(newSchema, newHistory);
case FORWARD -> CompatibilityChecker.FORWARD_CHECKER.isCompatible(newSchema, newHistory);
Expand All @@ -51,6 +52,5 @@ public boolean testCompatibility(CompatibilityLevel level, List<ContentHandle> h
case FULL_TRANSITIVE -> CompatibilityChecker.FULL_TRANSITIVE_CHECKER.isCompatible(newSchema, newHistory);
case NONE -> CompatibilityChecker.NO_OP_CHECKER.isCompatible(newSchema, newHistory);
};
return issues.isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,14 @@
import java.util.List;

public class Checker {
public static boolean checkCompatibility(Message msg, List<String> history, CompatibilityLevel mode) throws Exception {
public static List<String> checkCompatibility(Message msg, List<String> history, CompatibilityLevel mode) throws Exception {
ContentHandle schema = ContentHandle.create(msg.getSchema());
List<ContentHandle> schemaHistory = new ArrayList<>();

for (String s : history) {
ContentHandle ps = ContentHandle.create(s);
schemaHistory.add(ps);
}
CompatibilityChecker cc = CheckerFactory.createChecker(msg.getFormat());

return cc.testCompatibility(mode, schemaHistory, schema);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@
import java.util.List;

public interface CompatibilityChecker {
boolean testCompatibility(CompatibilityLevel level, List<ContentHandle> history, ContentHandle currentSchema);
List<String> testCompatibility(CompatibilityLevel level, List<ContentHandle> history, ContentHandle currentSchema);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,26 @@
package net.syntio.compatibility.checker;

import io.apicurio.registry.content.ContentHandle;
import io.apicurio.registry.rules.compatibility.CompatibilityDifference;
import io.apicurio.registry.rules.compatibility.CompatibilityExecutionResult;
import io.apicurio.registry.rules.compatibility.CompatibilityLevel;
import io.apicurio.registry.rules.compatibility.JsonSchemaCompatibilityChecker;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;

public class JsonChecker implements CompatibilityChecker {

@Override
public boolean testCompatibility(CompatibilityLevel level, List<ContentHandle> history, ContentHandle currentSchema) {
public List<String> testCompatibility(CompatibilityLevel level, List<ContentHandle> history, ContentHandle currentSchema) {
JsonSchemaCompatibilityChecker cc = new JsonSchemaCompatibilityChecker();
return cc.testCompatibility(level, history, currentSchema).isCompatible();
CompatibilityExecutionResult res = cc.testCompatibility(level, history, currentSchema);
Set<CompatibilityDifference> resSet = res.getIncompatibleDifferences();
List<String> issuesList = new ArrayList<>();
for (CompatibilityDifference dif : resSet) {
issuesList.add(dif.asRuleViolation().getDescription());
}
return issuesList;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,26 @@
package net.syntio.compatibility.checker;

import io.apicurio.registry.content.ContentHandle;
import io.apicurio.registry.rules.compatibility.CompatibilityDifference;
import io.apicurio.registry.rules.compatibility.CompatibilityExecutionResult;
import io.apicurio.registry.rules.compatibility.CompatibilityLevel;
import io.apicurio.registry.rules.compatibility.ProtobufCompatibilityChecker;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;

public class ProtobufChecker implements CompatibilityChecker {

@Override
public boolean testCompatibility(CompatibilityLevel level, List<ContentHandle> history, ContentHandle currentSchema) {
ProtobufCompatibilityChecker cc = new ProtobufCompatibilityChecker();
return cc.testCompatibility(level, history, currentSchema).isCompatible();
public List<String> testCompatibility(CompatibilityLevel level, List<ContentHandle> history, ContentHandle currentSchema) {
ProtobufCompatibilityChecker cc = new ProtobufCompatibilityChecker();
CompatibilityExecutionResult res = cc.testCompatibility(level, history, currentSchema);
Set<CompatibilityDifference> diffs = res.getIncompatibleDifferences();
List<String> issues = new ArrayList<>();
for (CompatibilityDifference diff : diffs) {
issues.add(diff.asRuleViolation().getDescription());
}
return issues;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
import net.syntio.compatibility.checker.Checker;
import net.syntio.compatibility.dto.CheckRequestDto;
import net.syntio.compatibility.dto.CheckResponseDto;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

import java.util.ArrayList;
import java.util.List;

@RestController
Expand All @@ -35,32 +37,38 @@ public class CheckerController {
public ResponseEntity<CheckResponseDto> check(@RequestBody CheckRequestDto req) {
Message latestSchema = req.getMessage();
List<String> schemaHistory = req.getHistory();
CheckResponseDto res;
try {
for (int i = 0; i < schemaHistory.size(); i++) {
schemaHistory.set(i, schemaHistory.get(i).replaceAll("\r\n", "\n"));
}
schemaHistory.replaceAll(s -> s.replaceAll("\r\n", "\n"));
String mode = req.getMode();

CompatibilityLevel cl = getCompatibilityLevel(mode);
boolean result;
List<String> issues;
if (cl.equals(CompatibilityLevel.NONE)) {
result = true;
issues = new ArrayList<>();
} else {
result = Checker.checkCompatibility(latestSchema, schemaHistory, cl);
issues = Checker.checkCompatibility(latestSchema, schemaHistory, cl);
}

CheckResponseDto res = new CheckResponseDto(result);
if (result) {
res.setInfo("Schema is compatible");
res = new CheckResponseDto(issues.isEmpty());
if (issues.isEmpty()) {
res.setInfo("schema is compatible");
return ResponseEntity.ok(res);
}
res.setInfo("Schema is incompatible");
res.setInfo("schema is incompatible: " + String.join("; ", issues));
return ResponseEntity.ok(res);
} catch (NullPointerException e) {
System.err.println("Schema history is null.");
return ResponseEntity.badRequest().build();
res = new CheckResponseDto(false);
res.setInfo("schema history is null");
return ResponseEntity.status(HttpStatus.PRECONDITION_FAILED).body(res);
} catch (org.everit.json.schema.SchemaException e) {
res = new CheckResponseDto(false);
res.setInfo("schema version unknown or unsupported");
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(res);
} catch (Exception e) {
return ResponseEntity.badRequest().build();
res = new CheckResponseDto(false);
res.setInfo("unknown error");
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(res);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ public CheckRequestDto(String payload, List<String> history, String mode) {
this.message = transformStringToMessage(payload);
} catch (Exception e) {
this.message = new Message("", "", "");
System.err.println("Cannot read message");
}
this.history = history;
this.mode = mode;
Expand Down
31 changes: 24 additions & 7 deletions registry/compatibility/externalChecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,20 @@ package compatibility
import (
"context"
"encoding/base64"
"fmt"
"os"
"strings"
"time"

"github.com/pkg/errors"

"github.com/dataphos/schema-registry/compatibility/http"
"github.com/dataphos/schema-registry/internal/errtemplates"
"github.com/dataphos/lib-httputil/pkg/httputil"
"github.com/dataphos/lib-logger/logger"
"github.com/dataphos/lib-logger/standardlogger"
"github.com/dataphos/lib-retry/pkg/retry"
"github.com/dataphos/schema-registry/compatibility/http"
"github.com/dataphos/schema-registry/internal/config"
"github.com/dataphos/schema-registry/internal/errcodes"
"github.com/dataphos/schema-registry/internal/errtemplates"
)

const (
Expand All @@ -42,8 +45,9 @@ const (
)

type ExternalChecker struct {
url string
Url string
TimeoutBase time.Duration
Log logger.Log
}

// NewFromEnv loads the needed environment variables and calls New.
Expand Down Expand Up @@ -73,9 +77,20 @@ func New(ctx context.Context, url string, timeoutBase time.Duration) (*ExternalC
return nil, errors.Wrapf(err, "attempting to reach compatibility checker at %s failed", url)
}

labels := logger.Labels{
"product": "Schema Registry",
"component": "compatibility_checker",
}
logLevel, logConfigWarnings := config.GetLogLevel()
log := standardlogger.New(labels, standardlogger.WithLogLevel(logLevel))
for _, w := range logConfigWarnings {
log.Warn(w)
}

return &ExternalChecker{
url: url,
Url: url,
TimeoutBase: timeoutBase,
Log: log,
}, nil
}

Expand All @@ -90,17 +105,19 @@ func (c *ExternalChecker) Check(schemaInfo string, history []string, mode string

decodedHistory, err := c.DecodeHistory(history)
if err != nil {
c.Log.Error("could not decode", errcodes.SchemaUndecodable)
return false, err
}
return http.CheckOverHTTP(ctx, schemaInfo, decodedHistory, mode, c.url+"/")
compatible, info, err := http.CheckOverHTTP(ctx, schemaInfo, decodedHistory, mode, c.Url+"/")
c.Log.Info(info)
return compatible, err
}

func (c *ExternalChecker) DecodeHistory(history []string) ([]string, error) {
var decodedHistory []string
for i := 0; i < len(history); i++ {
decoded, err := base64.StdEncoding.DecodeString(history[i])
if err != nil {
fmt.Println(fmt.Errorf("could not decode").Error())
return nil, err
}
decodedHistory = append(decodedHistory, string(decoded))
Expand Down
Loading
Loading