Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: require schema format #45

Merged
merged 17 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions .github/workflows/pr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ jobs:

# Linting multiple Dockerfiles to ensure adherence to best practices and coding standards.
hadolint_job:
name: Hadolint
if: ${{ ! contains(github.head_ref, 'release-please--branches--main') }}
name: Hadolint
runs-on: ubuntu-latest
strategy:
fail-fast: false
Expand Down Expand Up @@ -84,8 +84,8 @@ jobs:

# Ensures that the code adheres to the lint checks defined in .golangci.yaml.
lint_job:
name: Go lint
if: ${{ ! contains(github.head_ref, 'release-please--branches--main') }}
name: Go lint
runs-on: ubuntu-latest
strategy:
fail-fast: false
Expand Down Expand Up @@ -131,8 +131,8 @@ jobs:
working-directory: ${{ matrix.component }}

licenses_check_job:
if: ${{ ! contains(github.head_ref, 'release-please--branches--main') }}
name: 3rd party licenses check
if: ${{ github.event.head_commit.committer.name != 'github-actions[bot]' || ! contains(github.head_ref, 'release-please--branches--main') }}
runs-on: ubuntu-latest
permissions:
contents: write
Expand Down Expand Up @@ -238,8 +238,8 @@ jobs:

# Runs unit tests for all components in this repo
test_job:
if: ${{ ! contains(github.head_ref, 'release-please--branches--main') }}
name: Test job
if: ${{ github.base_ref == 'main' && ! contains(github.head_ref, 'release-please--branches--main') }}
runs-on: ubuntu-latest

strategy:
Expand Down Expand Up @@ -270,8 +270,8 @@ jobs:

# Builds docker images for all components of the repo to test if they can successfully be built
test_docker_image_builds_job:
name: Test docker image builds
if: ${{ ! contains(github.head_ref, 'release-please--branches--main') }}
name: Test docker image builds
runs-on: ubuntu-latest

strategy:
Expand Down
18 changes: 11 additions & 7 deletions .github/workflows/push.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ jobs:
- name: Add missing license headers
run: nwa add -c "Syntio Ltd." -s **/*.xml ./registry ./validator

- name: Check and Commit changes
- name: Check and commit changes
id: check_commit
run: |
git config user.name "github-actions[bot]"
Expand All @@ -75,16 +75,16 @@ jobs:
git checkout -b "add-license-headers-${{ github.run_id }}"
git push origin HEAD

- name: Create Pull Request
- name: Create pull request
if: env.changes_committed == 'true'
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
gh pr create --base ${{ github.ref_name }} --head "add-license-headers-${{ github.run_id }}" --title "style: add license headers" --body "This PR adds license headers to the affected files. Recommendation: Merge this PR using the rebase-merge method"

upload_docker_images_job:
if: github.event.head_commit.committer.name == 'github-actions[bot]' || startsWith(github.event.head_commit.message.name, 'feat') || startsWith(github.event.head_commit.message.name, 'fix')
name: Build, push and sign Docker images
if: github.event.head_commit.committer.name == 'github-actions[bot]'
runs-on: ubuntu-latest
permissions:
id-token: write # required to generate JWT token
Expand All @@ -111,9 +111,13 @@ jobs:
- name: Check out code
uses: actions/checkout@v4

- name: Set Tag
- name: Set image tag
run: |
TAG=$(cat version.txt)
if [ ${{ github.event.head_commit.committer.name }} == 'github-actions[bot]' ]; then
TAG=$(cat version.txt)
else
TAG=$(echo $GITHUB_SHA | cut -c 1-7)
fi
echo "TAG=$TAG" >> $GITHUB_ENV

- name: Docker Hub Login
Expand All @@ -139,11 +143,11 @@ jobs:
docker push syntioinc/dataphos-${{ matrix.component.image-name }}:${{ env.TAG }}

- name: Install cosign
if: ${{ env.TAG_EXISTS == 'false' }}
if: ${{ github.event.head_commit.committer.name == 'github-actions[bot]' && env.TAG_EXISTS == 'false' }}
uses: sigstore/[email protected]

- name: Sign the Docker image
if: ${{ env.TAG_EXISTS == 'false' }}
if: ${{ github.event.head_commit.committer.name == 'github-actions[bot]' && env.TAG_EXISTS == 'false' }}
run: |
digest=$(docker inspect --format='{{index .RepoDigests 0}}' syntioinc/dataphos-${{ matrix.component.image-name }}:${{ env.TAG }})
cosign sign --yes "$digest"
Expand Down
2 changes: 1 addition & 1 deletion registry/cmd/sr-cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func registerSchema(registerCommand *flag.FlagSet) {
if !added {
log.Print("schema already exists")
} else {
log.Print("created schema under the id ", details.VersionID)
log.Printf("created schema under the id %s and version %s", details.SchemaID, details.Version)
}
}

Expand Down
5 changes: 5 additions & 0 deletions registry/docker/.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
SR_HOST=host.docker.internal
SR_DBNAME=postgres
SR_USER=postgres
SR_PASSWORD=
SR_TABLE_PREFIX=syntio_schema.
53 changes: 53 additions & 0 deletions registry/docker/compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
name: schema-registry
services:
postgres:
image: postgres:17
ports:
- 5432:5432
environment:
POSTGRES_PASSWORD: # Please provide the password. Needs to match SR_PASSWORD in the .env file.
healthcheck:
test: ["CMD-SHELL", "pg_isready"]
interval: 30s
timeout: 60s
retries: 5
start_period: 80s
initdb:
depends_on:
postgres:
condition: service_healthy
build:
context: ../../
dockerfile: ./registry/docker/initdb/Dockerfile
env_file:
- .env
compatibility-checker:
build:
context: ../../
dockerfile: ./registry/docker/compatibility-checker/Dockerfile
ports:
- 8088:8088
validity-checker:
build:
context: ../../
dockerfile: ./registry/docker/validity-checker/Dockerfile
ports:
- 8089:8089
schema-registry:
depends_on:
initdb:
condition: service_completed_successfully
compatibility-checker:
condition: service_started
validity-checker:
condition: service_started
build:
context: ../../
dockerfile: ./registry/docker/registry/Dockerfile
ports:
- 8080:8080
env_file:
- .env
environment:
COMPATIBILITY_CHECKER_URL: http://host.docker.internal:8088
VALIDITY_CHECKER_URL: http://host.docker.internal:8089
1 change: 1 addition & 0 deletions registry/registry/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
var ErrNotFound = errors.New("not found")
var ErrUnknownComp = errors.New("unknown value for compatibility_mode")
var ErrUnknownVal = errors.New("unknown value for validity mode")
var ErrUnknownFormat = errors.New("unknown value for schema format")
var ErrNotValid = errors.New("schema is not valid")
var ErrNotComp = errors.New("schemas are not compatible")
var ErrInvalidValueHeader = errors.New("invalid header value")
Expand Down
1 change: 0 additions & 1 deletion registry/registry/repository/postgres/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ func intoRegistrySchema(schema Schema) registry.Schema {
// intoRegistryVersionDetails maps VersionDetails from repository to service layer.
func intoRegistryVersionDetails(VersionDetails VersionDetails) registry.VersionDetails {
return registry.VersionDetails{
VersionID: strconv.Itoa(int(VersionDetails.VersionID)),
Version: VersionDetails.Version,
SchemaID: strconv.Itoa(int(VersionDetails.SchemaID)),
Specification: VersionDetails.Specification,
Expand Down
15 changes: 13 additions & 2 deletions registry/server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,8 +535,19 @@ func (h Handler) SearchSchemas(w http.ResponseWriter, r *http.Request) {
// @Failure 500
// @Router /schemas [post]
func (h Handler) PostSchema(w http.ResponseWriter, r *http.Request) {

registerRequest, err := readSchemaRegisterRequest(r.Body)
if err != nil {
if errors.Is(err, registry.ErrUnknownFormat) {
body, _ := json.Marshal(report{
Message: "Bad request: unknown format value",
})
writeResponse(w, responseBodyAndCode{
Body: body,
Code: http.StatusBadRequest,
})
return
}
writeResponse(w, responseBodyAndCode{
Body: serializeErrorMessage(http.StatusText(http.StatusBadRequest)),
Code: http.StatusBadRequest,
Expand All @@ -548,7 +559,7 @@ func (h Handler) PostSchema(w http.ResponseWriter, r *http.Request) {
if err != nil {
if errors.Is(err, registry.ErrUnknownComp) {
body, _ := json.Marshal(report{
Message: "Bad request: unknown value for compatibility_mode",
Message: "Bad request: unknown compatibility_mode value",
})
writeResponse(w, responseBodyAndCode{
Body: body,
Expand All @@ -559,7 +570,7 @@ func (h Handler) PostSchema(w http.ResponseWriter, r *http.Request) {

if errors.Is(err, registry.ErrUnknownVal) {
body, _ := json.Marshal(report{
Message: "Bad request: unknown value for validity_mode",
Message: "Bad request: unknown validity_mode value",
})
writeResponse(w, responseBodyAndCode{
Body: body,
Expand Down
18 changes: 18 additions & 0 deletions registry/server/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"encoding/json"
"io"
"net/http"
"strings"

"github.com/dataphos/schema-registry/registry"
)
Expand All @@ -27,6 +28,8 @@ type responseBodyAndCode struct {
Code int
}

var supportedFormats = []string{"json", "avro", "xml", "csv", "protobuf"}

func writeResponse(w http.ResponseWriter, response responseBodyAndCode) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(response.Code)
Expand All @@ -49,9 +52,24 @@ func readSchemaRegisterRequest(body io.ReadCloser) (registry.SchemaRegistrationR
return registry.SchemaRegistrationRequest{}, err
}

// check if format is unknown
format := strings.ToLower(schemaRegisterRequest.SchemaType)
if !containsFormat(format) {
return registry.SchemaRegistrationRequest{}, registry.ErrUnknownFormat
}

return schemaRegisterRequest, nil
}

func containsFormat(format string) bool {
for _, supportedFormat := range supportedFormats {
if format == supportedFormat {
return true
}
}
return false
}

func readSchemaUpdateRequest(body io.ReadCloser) (registry.SchemaUpdateRequest, error) {
encoded, err := io.ReadAll(body)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public static boolean checkValidity(String schemaType, String schema, String mod
case "full" -> ValidityLevel.FULL;
default -> ValidityLevel.NONE;
};
ContentValidator validator = ValidatorFactory.createValidator(schemaType);
ContentValidator validator = ValidatorFactory.createValidator(schemaType.toLowerCase());
if (validator == null) { // in case ValidatorFactory returns null
return false;
}
Expand Down
2 changes: 1 addition & 1 deletion registry/validity/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func CheckOverHTTP(ctx context.Context, schemaType, schema, mode, url string) (b

var parsedBody checkResponse
if err = json.Unmarshal(body, &parsedBody); err != nil {
return false, "error unmarshalling compatibility check body", err
return false, "error unmarshalling validity check body", err
}

valid := parsedBody.Result
Expand Down
3 changes: 1 addition & 2 deletions validator/internal/centralconsumer/centralconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,7 @@ func (cc *CentralConsumer) Handle(ctx context.Context, message janitor.Message)
setMessageRawAttributes(message, "Header error", err)
return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, nil
}
}
if e.Error() == errtemplates.AttributeNotDefined(janitor.AttributeHeaderVersion).Error() {
} else if e.Error() == errtemplates.AttributeNotDefined(janitor.AttributeHeaderVersion).Error() {
if cc.defaultHeaderSchema.DefaultHeaderSchemaVersion != "" {
headerVersion = cc.defaultHeaderSchema.DefaultHeaderSchemaVersion
} else {
Expand Down
25 changes: 19 additions & 6 deletions validator/internal/janitor/janitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,29 +48,41 @@ type Message struct {
}

const (
// OldAttributeSchemaID is one of the keys expected to be found in the attributes field of the message.
// It holds the schema id information concerning the data field of the message
// This is planned to be dropped in the future version, but is kept for now to be backwards compatible with
// systems that still use this convention
OldAttributeSchemaID = "schemaId"

// AttributeSchemaID is one of the keys expected to be found in the attributes field of the message.
// It holds the schema id information concerning the data field of the message
AttributeSchemaID = "schemaId"
AttributeSchemaID = "schema_id"

// OldAttributeSchemaVersion is one of the keys expected to be found in the attributes field of the message,
// It holds the schema version information concerning the data field of the message.
// This is planned to be dropped in the future version, but is kept for now to be backwards compatible with
// systems that still use this convention
OldAttributeSchemaVersion = "versionId"

// AttributeSchemaVersion is one of the keys expected to be found in the attributes field of the message,
// It holds the schema version information concerning the data field of the message.
AttributeSchemaVersion = "versionId"
AttributeSchemaVersion = "version"

// AttributeFormat is one of the keys expected to be found in the attributes field of the message.
// It holds the format of the data field of the message.
AttributeFormat = "format"

// HeaderValidation is one of the keys that can occur in raw attributes section of header.
// It determines if the header will be validated
HeaderValidation = "headerValidation"
HeaderValidation = "validate_header"

// AttributeHeaderID is one of the keys expected in raw attributes section of header, but only if HeaderValidation is true.
// It holds the header's schema id that is used to check header validity
AttributeHeaderID = "headerSchemaId"
AttributeHeaderID = "header_schema_id"

// AttributeHeaderVersion is one of the keys expected in raw attributes section of header, but only if HeaderValidation is true.
// It holds the header's schema version that is used to check header validity
AttributeHeaderVersion = "headerVersionId"
AttributeHeaderVersion = "header_version"
)

// MessageSchemaPair wraps a Message with the Schema relating to this Message.
Expand Down Expand Up @@ -157,7 +169,8 @@ func generateHeaderData(rawAttributes map[string]interface{}) ([]byte, error) {
cleanAttributes := make(map[string]interface{})
for key, value := range rawAttributes {
if key == HeaderValidation || key == AttributeHeaderID || key == AttributeHeaderVersion ||
key == AttributeSchemaID || key == AttributeSchemaVersion || key == AttributeFormat {
key == AttributeSchemaID || key == AttributeSchemaVersion || key == AttributeFormat ||
key == OldAttributeSchemaID || key == OldAttributeSchemaVersion {
continue
} else {
cleanAttributes[key] = value
Expand Down
10 changes: 8 additions & 2 deletions validator/internal/janitor/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ func ExtractAttributes(raw map[string]interface{}) (Attributes, error) {

schemaID, ok := raw[AttributeSchemaID]
if !ok {
return Attributes{}, errtemplates.AttributeNotDefined(AttributeSchemaID)
schemaID, ok = raw[OldAttributeSchemaID]
if !ok {
return Attributes{}, errtemplates.AttributeNotDefined(AttributeSchemaID)
}
}
schemaIDStr, ok = schemaID.(string)
if !ok {
Expand All @@ -65,7 +68,10 @@ func ExtractAttributes(raw map[string]interface{}) (Attributes, error) {

version, ok := raw[AttributeSchemaVersion]
if !ok {
return Attributes{}, errtemplates.AttributeNotDefined(AttributeSchemaVersion)
version, ok = raw[OldAttributeSchemaVersion]
if !ok {
return Attributes{}, errtemplates.AttributeNotDefined(AttributeSchemaVersion)
}
}
versionStr, ok = version.(string)
if !ok {
Expand Down
Loading