From 94a03785c8c33c77e8b01fa63d24b1a06bf1aaea Mon Sep 17 00:00:00 2001 From: Sowmya N Dixit Date: Wed, 22 May 2024 19:22:09 +0530 Subject: [PATCH] Sanketika-Obsrv/issue-tracker#180 fix: Datasource DB schema changes to include type. (#79) Co-authored-by: sowmya-dixit --- .../src/main/resources/dataset-registry.sql | 1 + .../org/sunbird/obsrv/model/DatasetModels.scala | 2 +- .../sunbird/obsrv/registry/DatasetRegistry.scala | 5 +++++ .../obsrv/service/DatasetRegistryService.scala | 14 +++++++++++++- .../obsrv/spec/BaseSpecWithDatasetRegistry.scala | 2 +- .../obsrv/spec/TestDatasetRegistrySpec.scala | 4 ++-- 6 files changed, 23 insertions(+), 5 deletions(-) diff --git a/dataset-registry/src/main/resources/dataset-registry.sql b/dataset-registry/src/main/resources/dataset-registry.sql index ff28ae98..54373eec 100644 --- a/dataset-registry/src/main/resources/dataset-registry.sql +++ b/dataset-registry/src/main/resources/dataset-registry.sql @@ -22,6 +22,7 @@ CREATE INDEX IF NOT EXISTS datasets_status ON datasets(status); CREATE TABLE IF NOT EXISTS datasources ( datasource text PRIMARY KEY, dataset_id text REFERENCES datasets (id), + type text NOT NULL, ingestion_spec json NOT NULL, datasource_ref text NOT NULL, retention_period json, diff --git a/dataset-registry/src/main/scala/org/sunbird/obsrv/model/DatasetModels.scala b/dataset-registry/src/main/scala/org/sunbird/obsrv/model/DatasetModels.scala index 49cc51bc..ee73fbe0 100644 --- a/dataset-registry/src/main/scala/org/sunbird/obsrv/model/DatasetModels.scala +++ b/dataset-registry/src/main/scala/org/sunbird/obsrv/model/DatasetModels.scala @@ -71,7 +71,7 @@ object DatasetModels { @JsonProperty("status") status: String, @JsonProperty("connector_stats") connectorStats: Option[ConnectorStats] = None) case class DataSource(@JsonProperty("id") id: String, @JsonProperty("datasource") datasource: String, @JsonProperty("dataset_id") datasetId: String, - @JsonProperty("ingestion_spec") ingestionSpec: String, @JsonProperty("datasource_ref") datasourceRef: String) + @JsonProperty("type") `type`: String, @JsonProperty("ingestion_spec") ingestionSpec: String, @JsonProperty("datasource_ref") datasourceRef: String) } diff --git a/dataset-registry/src/main/scala/org/sunbird/obsrv/registry/DatasetRegistry.scala b/dataset-registry/src/main/scala/org/sunbird/obsrv/registry/DatasetRegistry.scala index c1394fd4..08921adc 100644 --- a/dataset-registry/src/main/scala/org/sunbird/obsrv/registry/DatasetRegistry.scala +++ b/dataset-registry/src/main/scala/org/sunbird/obsrv/registry/DatasetRegistry.scala @@ -42,6 +42,11 @@ object DatasetRegistry { DatasetRegistryService.readDatasources(datasetId) } + def getAllDatasources(): List[DataSource] = { + val datasourceList = DatasetRegistryService.readAllDatasources() + datasourceList.getOrElse(List()) + } + def getDataSetIds(datasetType: String): List[String] = { datasets.filter(f => f._2.datasetType.equals(datasetType)).keySet.toList } diff --git a/dataset-registry/src/main/scala/org/sunbird/obsrv/service/DatasetRegistryService.scala b/dataset-registry/src/main/scala/org/sunbird/obsrv/service/DatasetRegistryService.scala index 88efb7a6..8075d508 100644 --- a/dataset-registry/src/main/scala/org/sunbird/obsrv/service/DatasetRegistryService.scala +++ b/dataset-registry/src/main/scala/org/sunbird/obsrv/service/DatasetRegistryService.scala @@ -111,6 +111,17 @@ object DatasetRegistryService { } } + def readAllDatasources(): Option[List[DataSource]] = { + + val postgresConnect = new PostgresConnect(postgresConfig) + try { + val rs = postgresConnect.executeQuery(s"SELECT * FROM datasources") + Option(Iterator.continually((rs, rs.next)).takeWhile(f => f._2).map(f => f._1).map(result => { + parseDatasource(result) + }).toList) + } + } + def updateDatasourceRef(datasource: DataSource, datasourceRef: String): Int = { val query = s"UPDATE datasources set datasource_ref = '$datasourceRef' where datasource='${datasource.datasource}' and dataset_id='${datasource.datasetId}'" updateRegistry(query) @@ -190,10 +201,11 @@ object DatasetRegistryService { val id = rs.getString("id") val datasource = rs.getString("datasource") val datasetId = rs.getString("dataset_id") + val datasourceType = rs.getString("type") val ingestionSpec = rs.getString("ingestion_spec") val datasourceRef = rs.getString("datasource_ref") - DataSource(id, datasource, datasetId, ingestionSpec, datasourceRef) + DataSource(id, datasource, datasetId, datasourceType, ingestionSpec, datasourceRef) } private def parseDatasetTransformation(rs: ResultSet): DatasetTransformation = { diff --git a/dataset-registry/src/test/scala/org/sunbird/obsrv/spec/BaseSpecWithDatasetRegistry.scala b/dataset-registry/src/test/scala/org/sunbird/obsrv/spec/BaseSpecWithDatasetRegistry.scala index 09321143..1b3edea0 100644 --- a/dataset-registry/src/test/scala/org/sunbird/obsrv/spec/BaseSpecWithDatasetRegistry.scala +++ b/dataset-registry/src/test/scala/org/sunbird/obsrv/spec/BaseSpecWithDatasetRegistry.scala @@ -36,7 +36,7 @@ class BaseSpecWithDatasetRegistry extends BaseSpecWithPostgres { private def createSchema(postgresConnect: PostgresConnect): Unit = { postgresConnect.execute("CREATE TABLE IF NOT EXISTS datasets ( id text PRIMARY KEY, type text NOT NULL, validation_config json, extraction_config json, dedup_config json, data_schema json, denorm_config json, router_config json NOT NULL, dataset_config json NOT NULL, status text NOT NULL, tags text[], data_version INT, created_by text NOT NULL, updated_by text NOT NULL, created_date timestamp NOT NULL, updated_date timestamp NOT NULL );") - postgresConnect.execute("CREATE TABLE IF NOT EXISTS datasources ( id text PRIMARY KEY, dataset_id text REFERENCES datasets (id), ingestion_spec json NOT NULL, datasource text NOT NULL, datasource_ref text NOT NULL, retention_period json, archival_policy json, purge_policy json, backup_config json NOT NULL, status text NOT NULL, created_by text NOT NULL, updated_by text NOT NULL, created_date Date NOT NULL, updated_date Date NOT NULL );") + postgresConnect.execute("CREATE TABLE IF NOT EXISTS datasources ( id text PRIMARY KEY, dataset_id text REFERENCES datasets (id), type text NOT NULL, ingestion_spec json NOT NULL, datasource text NOT NULL, datasource_ref text NOT NULL, retention_period json, archival_policy json, purge_policy json, backup_config json NOT NULL, status text NOT NULL, created_by text NOT NULL, updated_by text NOT NULL, created_date Date NOT NULL, updated_date Date NOT NULL );") postgresConnect.execute("CREATE TABLE IF NOT EXISTS dataset_transformations ( id text PRIMARY KEY, dataset_id text REFERENCES datasets (id), field_key text NOT NULL, transformation_function json NOT NULL, status text NOT NULL, mode text, created_by text NOT NULL, updated_by text NOT NULL, created_date Date NOT NULL, updated_date Date NOT NULL, UNIQUE(field_key, dataset_id) );") postgresConnect.execute("CREATE TABLE IF NOT EXISTS dataset_source_config ( id text PRIMARY KEY, dataset_id text NOT NULL REFERENCES datasets (id), connector_type text NOT NULL, connector_config json NOT NULL, status text NOT NULL, connector_stats json, created_by text NOT NULL, updated_by text NOT NULL, created_date Date NOT NULL, updated_date Date NOT NULL, UNIQUE(connector_type, dataset_id) );") } diff --git a/dataset-registry/src/test/scala/org/sunbird/obsrv/spec/TestDatasetRegistrySpec.scala b/dataset-registry/src/test/scala/org/sunbird/obsrv/spec/TestDatasetRegistrySpec.scala index b37e801a..3d83552d 100644 --- a/dataset-registry/src/test/scala/org/sunbird/obsrv/spec/TestDatasetRegistrySpec.scala +++ b/dataset-registry/src/test/scala/org/sunbird/obsrv/spec/TestDatasetRegistrySpec.scala @@ -92,8 +92,8 @@ class TestDatasetRegistrySpec extends BaseSpecWithDatasetRegistry with Matchers postgresConnect.execute("insert into dataset_source_config values('sc1', 'd1', 'kafka', '{\"kafkaBrokers\":\"localhost:9090\",\"topic\":\"test-topic\"}', 'Live', null, 'System', 'System', now(), now());") postgresConnect.execute("insert into dataset_source_config values('sc2', 'd1', 'rdbms', '{\"type\":\"postgres\",\"tableName\":\"test-table\"}', 'Live', null, 'System', 'System', now(), now());") - //postgresConnect.execute("CREATE TABLE IF NOT EXISTS datasources ( id text PRIMARY KEY, dataset_id text REFERENCES datasets (id), ingestion_spec json NOT NULL, datasource text NOT NULL, datasource_ref text NOT NULL, retention_period json, archival_policy json, purge_policy json, backup_config json NOT NULL, status text NOT NULL, created_by text NOT NULL, updated_by text NOT NULL, created_date Date NOT NULL, updated_date Date NOT NULL );") - postgresConnect.execute("insert into datasources values('ds1', 'd1', '{}', 'd1-datasource', 'd1-datasource-1', null, null, null, '{}', 'Live', 'System', 'System', now(), now());") + //postgresConnect.execute("CREATE TABLE IF NOT EXISTS datasources ( id text PRIMARY KEY, dataset_id text REFERENCES datasets (id), type text NOT NULL, ingestion_spec json NOT NULL, datasource text NOT NULL, datasource_ref text NOT NULL, retention_period json, archival_policy json, purge_policy json, backup_config json NOT NULL, status text NOT NULL, created_by text NOT NULL, updated_by text NOT NULL, created_date Date NOT NULL, updated_date Date NOT NULL );") + postgresConnect.execute("insert into datasources values('ds1', 'd1', 'druid', '{}', 'd1-datasource', 'd1-datasource-1', null, null, null, '{}', 'Live', 'System', 'System', now(), now());") postgresConnect.closeConnection() } } \ No newline at end of file