Skip to content

Commit

Permalink
Support deleting without providing full primary key
Browse files Browse the repository at this point in the history
  • Loading branch information
ramikg committed Dec 25, 2024
1 parent 804e0bb commit ae80fd0
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 17 deletions.
23 changes: 17 additions & 6 deletions lib/mapping/object-selector.js
Original file line number Diff line number Diff line change
Expand Up @@ -255,22 +255,33 @@ class ObjectSelector {
throw new Error(`Table "${info.tables[i].name}" could not be retrieved`);
}

// All partition and clustering keys from the table should be included in the document
const keyNames = table.partitionKeys.concat(table.clusteringKeys).map(k => k.name);
const columns = propertiesInfo.map(p => p.columnName);
if (keysAreIncluded(table.partitionKeys, propertiesInfo) !== keyMatches.all) {
return false;
}

for (let i = 0; i < keyNames.length; i++) {
if (columns.indexOf(keyNames[i]) === -1) {
const clusteringKeyNames = table.clusteringKeys.map(k => k.name);
const queriedClusteringKeyNames = propertiesInfo.filter(k => clusteringKeyNames.includes(k.columnName)).map(k => k.columnName);
for (const queriedClusteringKeyName of queriedClusteringKeyNames) {
if (clusteringKeyNames.indexOf(queriedClusteringKeyName) >= queriedClusteringKeyNames.length) {
// One of the clustering columns preceding queriedClusteringKeyName is omitted
return false;
}
}

// The filter makes sure that the column doesn't appear in `doc` (but it may appear in `docInfo`)
const queriedColumnNames = propertiesInfo.filter(p => p.value).map(p => p.columnName);
const primaryKeyColumnNames = table.partitionKeys.concat(table.clusteringKeys).map(k => k.name);
if (queriedColumnNames.some(queriedColumnName => !primaryKeyColumnNames.includes(queriedColumnName))) {
// Only primary key columns are allowed
return false;
}

// "when" conditions should be contained in the table
return when.reduce((acc, p) => acc && table.columnsByName[p.columnName] !== undefined, true);
});

if (filteredTables.length === 0) {
let message = `No table matches (all PKs have to be specified) fields: [${
let message = `No table matches (must specify all partition key and top-level clustering columns) fields: [${
propertiesInfo.map(p => p.columnName)}]`;

if (when.length > 0) {
Expand Down
25 changes: 14 additions & 11 deletions test/unit/mapping/model-mapper-mutation-tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -273,43 +273,46 @@ describe('ModelMapper', () => {
it('should throw an error when filter or conditions are not valid', () => testErrors('remove', [
{
doc: { id1: 'x', notAValidProp: 'y' },
message: 'No table matches (all PKs have to be specified) fields: [id1,notAValidProp]'
message: 'No table matches (must specify all partition key and top-level clustering columns) fields: [id1,notAValidProp]'
}, {
doc: { id1: 'x'},
docInfo: { fields: ['notAValidProp'] },
message: 'No table matches (all PKs have to be specified) fields: [notAValidProp]'
message: 'No table matches (must specify all partition key and top-level clustering columns) fields: [notAValidProp]'
}, {
doc: { id1: 'x', name: 'y' },
message: 'No table matches (all PKs have to be specified) fields: [id1,name]'
message: 'No table matches (must specify all partition key and top-level clustering columns) fields: [id1,name]'
}, {
doc: { id1: 'x', id2: 'y', name: 'z'},
doc: { id1: 'x', id2: 'y'},
docInfo: { when: { notAValidProp: 'm'} },
message: 'No table matches (all PKs have to be specified) fields: [id1,id2,name]; condition: [notAValidProp]'
message: 'No table matches (must specify all partition key and top-level clustering columns) fields: [id1,id2]; condition: [notAValidProp]'
}, {
doc: {},
message: 'Expected object with keys'
}, {
doc: { id1: 'x', id3: 'y' },
message: 'No table matches (must specify all partition key and top-level clustering columns) fields: [id1,id3]'
}
]));

it('should generate the query, params and set the idempotency', () => testQueries('remove', [
{
doc: { id1: 'x', 'id2': 'y' },
doc: { id1: 'x', id2: 'y' },
query: 'DELETE FROM ks1.table1 WHERE "id1" = ? AND "id2" = ?',
params: [ 'x', 'y' ]
}, {
doc: { id1: 'x', 'id2': 'y' },
doc: { id1: 'x', id2: 'y' },
docInfo: { when: { name: 'a' }},
query: 'DELETE FROM ks1.table1 WHERE "id1" = ? AND "id2" = ? IF "name" = ?',
params: [ 'x', 'y', 'a' ],
isIdempotent: false
}, {
doc: { id1: 'x', 'id2': 'y' },
doc: { id1: 'x', id2: 'y' },
docInfo: { ifExists: true },
query: 'DELETE FROM ks1.table1 WHERE "id1" = ? AND "id2" = ? IF EXISTS',
params: [ 'x', 'y' ],
isIdempotent: false
}, {
doc: { id1: 'x', 'id2': 'y' },
doc: { id1: 'x', id2: 'y' },
docInfo: { fields: [ 'id1', 'id2', 'name' ], deleteOnlyColumns: true },
query: 'DELETE "name" FROM ks1.table1 WHERE "id1" = ? AND "id2" = ?',
params: [ 'x', 'y' ]
Expand Down Expand Up @@ -347,8 +350,8 @@ describe('ModelMapper', () => {

function testErrors(methodName, items) {
return Promise.all(items.map(item => {
const columns = [ 'id1', 'id2', 'name'];
const clientInfo = mapperTestHelper.getClient(columns, [ 1, 1 ], 'ks1');
const columns = [ 'id1', 'id2', 'id3', 'name'];
const clientInfo = mapperTestHelper.getClient(columns, [ 1, 2 ], 'ks1');
const modelMapper = mapperTestHelper.getModelMapper(clientInfo);

let catchCalled = false;
Expand Down

0 comments on commit ae80fd0

Please sign in to comment.