Skip to content

Commit

Permalink
intermediate commit
Browse files Browse the repository at this point in the history
  • Loading branch information
GnsP committed Jan 16, 2025
1 parent c8ba4b3 commit 19a8a42
Show file tree
Hide file tree
Showing 9 changed files with 299 additions and 107 deletions.
1 change: 1 addition & 0 deletions app/cdap/api/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ export const MyPipelineApi = {
'REQUEST',
`${pluginFetchBase}/plugins/:pluginName?scope=system`
),
fetchArtifactProperties: apiCreator(dataSrc, 'GET', 'REQUEST', artifactBasePath),

validateStage: apiCreator(
dataSrc,
Expand Down
252 changes: 148 additions & 104 deletions app/cdap/components/StudioV2/CreatePipelineView/usePropertiesPanel.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,32 @@ import { useSelector, useDispatch } from 'react-redux';
import _cloneDeep from 'lodash/cloneDeep';
import _isEqual from 'lodash/isEqual';
import _merge from 'lodash/merge';
import _uniq from 'lodash/uniq';
import avsc from 'cdap-avsc';
import ee from 'event-emitter';
import {
getAppType,
getArtifact,
getNodes,
getSourceConnections,
getSourceNodes,
} from '../store/config/queries';
import PipelineMetricsStore from 'services/PipelineMetricsStore';
import { objectQuery } from 'services/helpers';
import { getNodesFromStages, getPluginInfo } from '../utils/nodeUtils';
import { getNodesFromStages, getOutputSchemaObj, getPluginInfo } from '../utils/nodeUtils';
import { GLOBALS } from 'services/global-constants';
import PipelineDetailStore from 'components/PipelineDetails/store';
import { getCurrentNamespace } from 'services/NamespaceStore';
import { getNodesMap } from 'components/hydrator/helpers/DAGhelpers';
import { PreviewActions } from '../store/preview/actions';
import { NodesActions } from '../store/nodes/reducer';
import { removePreviousState, resetFutureStates } from '../store/nodes/actions';
import { fetchWidgetJson } from '../utils/pluginUtils';
import { generateNodeConfig } from 'services/HydratorPluginConfigFactory';
import useWatchers from '../utils/useWatchers';
import { editPlugin } from '../store/config/actions';
import { isNodeNameUnique } from 'services/PipelineErrorFactory';
import { formatSchemaToAvro } from '../utils/schemaUtils';

export default function usePropertiesPanel(pluginNode: any) {
const tabs = [
Expand All @@ -54,6 +62,9 @@ export default function usePropertiesPanel(pluginNode: any) {
const [selectedNode, setSelectedNode] = useState<any>(null);
const [datasetAlreadyExists, setDatasetAlreadyExists] = useState<boolean>(false);
const [datasetId, setDatasetId] = useState<any>(null);
const [widgetJson, setWidgetJson] = useState<any>(null);

const $watch = useWatchers(state);

const previewState = useSelector((state) => state.preview);
const uiState = useSelector((state) => state.uiState);
Expand Down Expand Up @@ -328,7 +339,7 @@ export default function usePropertiesPanel(pluginNode: any) {
clearTimeout(stateChangeTimeout);
const tout = setTimeout(async () => {
await loadNewPlugin();
await validateNodeLabel();
await validateNodeLabel(state);
}, 0);
setStateChangeTimeout(tout);
}
Expand All @@ -353,11 +364,8 @@ export default function usePropertiesPanel(pluginNode: any) {
}
}

////////////////////////////////////
// TODO: correct from here

async function loadNewPlugin() {
const noJsonErrorHandler = (err) => {
const noJsonErrorHandler = (err?: string) => {
const propertiesFromBackend = Object.keys(state.node._backendProperties);
// Didn't receive a configuration from the backend. Fallback to all textboxes.
const changes: any = _cloneDeep(state);
Expand All @@ -371,6 +379,8 @@ export default function usePropertiesPanel(pluginNode: any) {
case 'CONFIG_SEMANTICS_JSON_ERROR':
changes.noConfigMessage = GLOBALS.en.hydrator.studio.error['SEMANTIC-CONFIG-JSON'];
break;
default:
break;
}
changes.noconfig = true;
changes.configfetched = true;
Expand All @@ -380,55 +390,37 @@ export default function usePropertiesPanel(pluginNode: any) {
});
changes.defaultState = _cloneDeep(changes);
changes.watchers.push(
////////////////////////////////////////
this.$scope.$watch(
'HydratorPlusPlusNodeConfigCtrl.state.node',
() => {
validateNodeLabel(this);
this.HydratorPlusPlusConfigActions.editPlugin(this.state.node.name, this.state.node);
},
true
)
$watch('node', (newState) => {
validateNodeLabel(newState);
editPlugin(newState.node.name, newState.node);
})
);

updateState(changes);
};

this.state.noproperty = Object.keys(this.state.node._backendProperties || {}).length;
if (this.state.noproperty) {
const artifactName = this.myHelpers.objectQuery(
this.state.node,
'plugin',
'artifact',
'name'
);
const artifactVersion = this.myHelpers.objectQuery(
this.state.node,
'plugin',
'artifact',
'version'
);
const artifactScope = this.myHelpers.objectQuery(
this.state.node,
'plugin',
'artifact',
'scope'
);
this.HydratorPlusPlusPluginConfigFactory.fetchWidgetJson(
const draftState: any = _cloneDeep(state);
draftState.noproperty = Object.keys(draftState.node._backendProperties || {}).length;
if (draftState.noproperty) {
const artifactName = objectQuery(draftState.node, 'plugin', 'artifact', 'name');
const artifactVersion = objectQuery(draftState.node, 'plugin', 'artifact', 'version');
const artifactScope = objectQuery(draftState.node, 'plugin', 'artifact', 'scope');

fetchWidgetJson(
artifactName,
artifactVersion,
artifactScope,
`widgets.${this.state.node.plugin.name}-${this.state.node.type ||
this.state.node.plugin.type}`
`widgets.${draftState.node.plugin.name}-${draftState.node.type ||
draftState.node.plugin.type}`
).then((res) => {
this.widgetJson = res;
setWidgetJson(res);

// Not going to eliminate the groupsConfig just yet, because there are still other things depending on it
// such as output schema.
try {
this.state.groupsConfig = this.HydratorPlusPlusPluginConfigFactory.generateNodeConfig(
this.state.node._backendProperties,
res
);
draftState.groupsConfig = generateNodeConfig(draftState.node._backendProperties, res);
} catch (e) {
updateState(draftState);
noJsonErrorHandler();
return;
}
Expand All @@ -438,118 +430,170 @@ export default function usePropertiesPanel(pluginNode: any) {
const jumpConfigDatasets = jumpConfig.datasets || [];
datasets = jumpConfigDatasets.map((dataset) => {
let datasetId = properties[dataset['ref-property-name']];
const { metadataEndpoints } = window.CaskCommon.PipelineDetailStore.getState();
const { metadataEndpoints } = PipelineDetailStore.getState();
if (!datasetId && metadataEndpoints) {
const endpoint = metadataEndpoints.find((endpoint) => {
return endpoint.properties.stageName === this.state.node.id;
return endpoint.properties.stageName === draftState.node.id;
});
datasetId = endpoint && endpoint.name;
}
return { datasetId, entityType: 'datasets' };
});
return { datasets };
};
if (res.errorDataset || this.state.node.errorDatasetName) {
this.state.showErrorDataset = true;
this.state.errorDatasetTooltip =

if (res.errorDataset || draftState.node.errorDatasetName) {
draftState.showErrorDataset = true;
draftState.errorDatasetTooltip =
(res.errorDataset && res.errorDataset.errorDatasetTooltip) || false;
this.state.node.errorDatasetName = this.state.node.errorDatasetName || '';
draftState.node.errorDatasetName = draftState.node.errorDatasetName || '';
}

if (
this.$scope.isDisabled &&
this.state.groupsConfig.jumpConfig &&
Object.keys(this.state.groupsConfig.jumpConfig).length
rDisabled &&
draftState.groupsConfig.jumpConfig &&
Object.keys(draftState.groupsConfig.jumpConfig).length
) {
const { datasets } = generateJumpConfig(
this.state.groupsConfig.jumpConfig,
this.state.node.plugin.properties
draftState.groupsConfig.jumpConfig,
draftState.node.plugin.properties
);
this.state.groupsConfig.jumpConfig.datasets = datasets;
draftState.groupsConfig.jumpConfig.datasets = datasets;
} else {
// If we isDisabled is set to false then we are in studio mode & hence remove jump config.
// Jumpconfig is only for published view where everything is disabled.
delete this.state.groupsConfig.jumpConfig;
draftState.groupsConfig.jumpConfig = null;
}
const configOutputSchema = this.state.groupsConfig.outputSchema;
const configOutputSchema = draftState.groupsConfig.outputSchema;
// If its an implicit schema, set the output schema to the implicit schema and inform ConfigActionFactory
if (configOutputSchema.implicitSchema) {
this.state.node.outputSchema = [
this.HydratorPlusPlusNodeService.getOutputSchemaObj(
this.HydratorPlusPlusHydratorService.formatSchemaToAvro(
configOutputSchema.implicitSchema
)
),
draftState.node.outputSchema = [
getOutputSchemaObj(formatSchemaToAvro(configOutputSchema.implicitSchema)),
];
this.HydratorPlusPlusConfigActions.editPlugin(this.state.node.name, this.state.node);
editPlugin(draftState.node.name, draftState.node);
} else {
// If not an implcit schema check if a schema property exists in the node config.
// What this means is, has the plugin developer specified a plugin property in 'outputs' array of node config.
// If yes then set it as output schema and everytime when a user edits the output schema the value has to
// be transitioned to the respective plugin property.
if (configOutputSchema.isOutputSchemaExists) {
const schemaProperty = configOutputSchema.outputSchemaProperty[0];
const pluginProperties = this.state.node.plugin.properties;
const pluginProperties = draftState.node.plugin.properties;
if (pluginProperties[schemaProperty]) {
this.state.node.outputSchema = pluginProperties[schemaProperty];
} else if (pluginProperties[schemaProperty] !== this.state.node.outputSchema) {
this.state.node.plugin.properties[
configOutputSchema.outputSchemaProperty[0]
] = this.state.node.outputSchema[0].schema;
draftState.node.outputSchema = pluginProperties[schemaProperty];
} else if (!_isEqual(pluginProperties[schemaProperty], draftState.node.outputSchema)) {
draftState.node.plugin.properties[configOutputSchema.outputSchemaProperty[0]] =
draftState.node.outputSchema[0].schema;
}
this.state.watchers.push(
this.$scope.$watch('HydratorPlusPlusNodeConfigCtrl.state.node.outputSchema', () => {
if (this.validateSchema()) {
this.state.node.plugin.properties[
configOutputSchema.outputSchemaProperty[0]
] = this.state.node.outputSchema[0].schema;
draftState.watchers.push(
$watch('node.outputSchema', (newState) => {
const newDraftState: any = _cloneDeep(newState);
if (validateSchema()) {
newDraftState.node.plugin.properties[configOutputSchema.outputSchemaProperty[0]] =
newDraftState.node.outputSchema[0].schema;
updateState(newDraftState);
}
})
);
}
}
if (!this.$scope.isDisabled) {
this.state.watchers.push(
this.$scope.$watch(
'HydratorPlusPlusNodeConfigCtrl.state.node',
() => {
this.validateNodeLabel(this);
this.HydratorPlusPlusConfigActions.editPlugin(
this.state.node.name,
this.state.node
);
},
true
)
if (!rDisabled) {
draftState.watchers.push(
$watch('node', (newState) => {
validateNodeLabel(newState);
editPlugin(newState.node.name, newState.node);
})
);
}
if (!this.state.node.outputSchema || this.state.node.type === 'condition') {
let inputSchema =
this.myHelpers.objectQuery(this.state.node, 'inputSchema', 0, 'schema') || '';

if (!draftState.node.outputSchema || draftState.node.type === 'condition') {
let inputSchema = objectQuery(draftState.node, 'inputSchema', 0, 'schema') || '';
if (typeof inputSchema !== 'string') {
inputSchema = JSON.stringify(inputSchema);
}
this.state.node.outputSchema = [
this.HydratorPlusPlusNodeService.getOutputSchemaObj(inputSchema),
];
draftState.node.outputSchema = [getOutputSchemaObj(inputSchema)];
}
if (!this.state.node.plugin.label) {
this.state.node.plugin.label = this.state.node.name;
if (!draftState.node.plugin.label) {
draftState.node.plugin.label = draftState.node.name;
}
// Mark the configfetched to show that configurations have been received.
this.state.configfetched = true;
this.state.config = res;
this.state.noconfig = false;
this.defaultState = angular.copy(this.state);
draftState.configfetched = true;
draftState.config = res;
draftState.noconfig = false;
draftState.defaultState = _cloneDeep(draftState);
updateState(draftState);
}, noJsonErrorHandler);
} else {
this.state.configfetched = true;
draftState.configfetched = true;
updateState(draftState);
}
}

function validateNodeLabel(currentState) {
const nodes = getNodes(configState);
const nodeName = objectQuery(currentState, 'node', 'plugin', 'label');
if (!nodeName) {
return;
}
isNodeNameUnique(nodeName, nodes, (err) => {
const changes: any = _cloneDeep(currentState);
if (err) {
changes.nodeLabelError = GLOBALS.en.hydrator.studio.error[err];
} else {
changes.nodeLabelError = '';
}
updateState(changes);
});
}

function validateSchema() {
const draftState = _cloneDeep(state);
draftState.errors = [];

if (!Array.isArray(draftState.node.outputSchema)) {
draftState.node.outputSchema = [getOutputSchemaObj(draftState.node.outputSchema)];
}

Object.values(draftState.node.outputSchema).forEach((schemaObj: any) => {
let schema;
try {
schema = JSON.parse(schemaObj.schema);
schema = schema.fields;
} catch (e) {
schema = null;
}

const validationRules = [hasUniqueFields];

const error = [];
validationRules.forEach((rule) => {
rule(schema, error);
});

if (error.length > 0) {
draftState.errors.push(error);
}
});

updateState(draftState);
if (draftState.errors.length) {
return false;
}
return true;
}

// async function validateNodeLabel() {
function hasUniqueFields(schema, error) {
if (!schema) {
return true;
}

const fields = schema.map((field) => field.name);
const unique = _uniq(fields);

// }
if (fields.length !== unique.length) {
error.push('There are two or more fields with the same name.');
}
}

function updateNodeStateIfDirty() {
const isStateDirty = stateIsDirty();
Expand Down
Loading

0 comments on commit 19a8a42

Please sign in to comment.