Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
oklemenz2 committed Feb 20, 2025
1 parent 3f5acca commit cf905ef
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 5 deletions.
25 changes: 20 additions & 5 deletions srv/scheduling/processing-service.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"use strict";

const cds = require("@sap/cds");
const { Readable } = require("stream");
const { text } = require("node:stream/consumers");

const BaseApplicationService = require("../common/BaseApplicationService");

Expand Down Expand Up @@ -31,12 +33,16 @@ const STATUS_TRANSITIONS = {
};

module.exports = class SchedulingProcessingService extends BaseApplicationService {

constructor(...args) {
super(...args);
this.statusTransitions = STATUS_TRANSITIONS;
}

async init() {
const { Job } = this.entities("scheduling");
const { processJob, updateJob, cancelJob } = this.operations;

this.statusTransitions = STATUS_TRANSITIONS;

this.before([processJob, updateJob, cancelJob], async (req) => {
const ID = req.data.ID;
const job = await SELECT.one(Job).where({ ID });
Expand Down Expand Up @@ -74,18 +80,27 @@ module.exports = class SchedulingProcessingService extends BaseApplicationServic
if (!JobStatus[status]) {
return req.reject(JobSchedulingError.invalidJobStatus(status));
}
if (req.job.status_code === status) {
if (job.status_code === status) {
return;
}
if (!(await this.checkStatusTransition(req, req.job.status_code, status))) {
return req.reject(JobSchedulingError.statusTransitionNotAllowed(req.job.status_code, status));
if (!(await this.checkStatusTransition(req, job.status_code, status))) {
return req.reject(JobSchedulingError.statusTransitionNotAllowed(job.status_code, status));
}
await UPDATE.entity(Job)
.set({
status_code: status,
})
.where({ ID: job.ID });
if (results && results.length > 0) {
for (const result of results) {
if (result.data) {
if (Buffer.isBuffer(result.data)) {
result.data = btoa(result.data);
} else if (result.data instanceof Readable) {
result.data = btoa(await text(result.data));
}
}
}
const insertResults = await this.checkJobResults(req, results);
await INSERT.into(JobResult).entries(insertResults);
}
Expand Down
51 changes: 51 additions & 0 deletions test/scheduling/processing.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const { Readable } = require("stream");

const { cleanData, connectToWS, clearEventQueue, eventQueueEntry, processOutbox } = require("../helper");
const { JobStatus, ResultType, MessageSeverity } = require("../../srv/scheduling/common/codelist");
const SchedulingProcessingService = require("../../srv/scheduling/processing-service");

const { test } = cds.test(__dirname + "/../..");

Expand Down Expand Up @@ -308,6 +309,56 @@ describe("Processing Service", () => {
expect(log.output).toEqual(expect.stringMatching(/ASSERT_DATA_TYPE.*LargeBinary { type: 'cds.LargeBinary' }/s));
});

it("updateJob - processJobUpdate", async () => {
const schedulingProcessingService = new SchedulingProcessingService();
Object.defineProperty(schedulingProcessingService, "entities", {
writable: true,
});
const Job = cds.model.definitions["scheduling.Job"];
const JobResult = cds.model.definitions["scheduling.JobResult"];
schedulingProcessingService.entities = () => {
return {
Job,
JobResult,
};
};
const req = {
job: {
ID,
status_code: JobStatus.running,
},
};
const result = await schedulingProcessingService.processJobUpdate(req, JobStatus.completed, [
{
type: ResultType.data,
name: "Buffer",
filename: "test.txt",
mimeType: "text/plain",
data: Buffer.from("This is a test"),
},
{
type: ResultType.data,
name: "Stream",
filename: "test.txt",
mimeType: "text/plain",
data: Readable.from("This is a test"),
},
]);
expect(result).toBeUndefined();
const tx = cds.tx(req);
const results = await tx.run(SELECT.from(JobResult).where({ job_ID: ID }));
expect(results).toHaveLength(2);
const data = [];
for (const entry of results) {
const jobResult = await tx.run(SELECT.one.from(JobResult).columns("data").where({ ID: entry.ID }));
data.push(await text(jobResult.data));
}
await tx.rollback();
for (const record of data) {
expect(record).toEqual("This is a test");
}
});

it("cancelJob", async () => {
const ws = await connectToWS("job-scheduling");
let message = ws.message("jobStatusChanged");
Expand Down

0 comments on commit cf905ef

Please sign in to comment.