Skip to content

Commit

Permalink
Sanketika-Obsrv/issue-tracker#180 fix: Datasource DB schema changes t…
Browse files Browse the repository at this point in the history
…o include type. (#79)

Co-authored-by: sowmya-dixit <[email protected]>
  • Loading branch information
sowmya-dixit and sowmya-dixit authored May 22, 2024
1 parent 8cbe9f3 commit 94a0378
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 5 deletions.
1 change: 1 addition & 0 deletions dataset-registry/src/main/resources/dataset-registry.sql
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ CREATE INDEX IF NOT EXISTS datasets_status ON datasets(status);
CREATE TABLE IF NOT EXISTS datasources (
datasource text PRIMARY KEY,
dataset_id text REFERENCES datasets (id),
type text NOT NULL,
ingestion_spec json NOT NULL,
datasource_ref text NOT NULL,
retention_period json,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ object DatasetModels {
@JsonProperty("status") status: String, @JsonProperty("connector_stats") connectorStats: Option[ConnectorStats] = None)

case class DataSource(@JsonProperty("id") id: String, @JsonProperty("datasource") datasource: String, @JsonProperty("dataset_id") datasetId: String,
@JsonProperty("ingestion_spec") ingestionSpec: String, @JsonProperty("datasource_ref") datasourceRef: String)
@JsonProperty("type") `type`: String, @JsonProperty("ingestion_spec") ingestionSpec: String, @JsonProperty("datasource_ref") datasourceRef: String)

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ object DatasetRegistry {
DatasetRegistryService.readDatasources(datasetId)
}

def getAllDatasources(): List[DataSource] = {
val datasourceList = DatasetRegistryService.readAllDatasources()
datasourceList.getOrElse(List())
}

def getDataSetIds(datasetType: String): List[String] = {
datasets.filter(f => f._2.datasetType.equals(datasetType)).keySet.toList
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,17 @@ object DatasetRegistryService {
}
}

def readAllDatasources(): Option[List[DataSource]] = {

val postgresConnect = new PostgresConnect(postgresConfig)
try {
val rs = postgresConnect.executeQuery(s"SELECT * FROM datasources")
Option(Iterator.continually((rs, rs.next)).takeWhile(f => f._2).map(f => f._1).map(result => {
parseDatasource(result)
}).toList)
}
}

def updateDatasourceRef(datasource: DataSource, datasourceRef: String): Int = {
val query = s"UPDATE datasources set datasource_ref = '$datasourceRef' where datasource='${datasource.datasource}' and dataset_id='${datasource.datasetId}'"
updateRegistry(query)
Expand Down Expand Up @@ -190,10 +201,11 @@ object DatasetRegistryService {
val id = rs.getString("id")
val datasource = rs.getString("datasource")
val datasetId = rs.getString("dataset_id")
val datasourceType = rs.getString("type")
val ingestionSpec = rs.getString("ingestion_spec")
val datasourceRef = rs.getString("datasource_ref")

DataSource(id, datasource, datasetId, ingestionSpec, datasourceRef)
DataSource(id, datasource, datasetId, datasourceType, ingestionSpec, datasourceRef)
}

private def parseDatasetTransformation(rs: ResultSet): DatasetTransformation = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class BaseSpecWithDatasetRegistry extends BaseSpecWithPostgres {
private def createSchema(postgresConnect: PostgresConnect): Unit = {

postgresConnect.execute("CREATE TABLE IF NOT EXISTS datasets ( id text PRIMARY KEY, type text NOT NULL, validation_config json, extraction_config json, dedup_config json, data_schema json, denorm_config json, router_config json NOT NULL, dataset_config json NOT NULL, status text NOT NULL, tags text[], data_version INT, created_by text NOT NULL, updated_by text NOT NULL, created_date timestamp NOT NULL, updated_date timestamp NOT NULL );")
postgresConnect.execute("CREATE TABLE IF NOT EXISTS datasources ( id text PRIMARY KEY, dataset_id text REFERENCES datasets (id), ingestion_spec json NOT NULL, datasource text NOT NULL, datasource_ref text NOT NULL, retention_period json, archival_policy json, purge_policy json, backup_config json NOT NULL, status text NOT NULL, created_by text NOT NULL, updated_by text NOT NULL, created_date Date NOT NULL, updated_date Date NOT NULL );")
postgresConnect.execute("CREATE TABLE IF NOT EXISTS datasources ( id text PRIMARY KEY, dataset_id text REFERENCES datasets (id), type text NOT NULL, ingestion_spec json NOT NULL, datasource text NOT NULL, datasource_ref text NOT NULL, retention_period json, archival_policy json, purge_policy json, backup_config json NOT NULL, status text NOT NULL, created_by text NOT NULL, updated_by text NOT NULL, created_date Date NOT NULL, updated_date Date NOT NULL );")
postgresConnect.execute("CREATE TABLE IF NOT EXISTS dataset_transformations ( id text PRIMARY KEY, dataset_id text REFERENCES datasets (id), field_key text NOT NULL, transformation_function json NOT NULL, status text NOT NULL, mode text, created_by text NOT NULL, updated_by text NOT NULL, created_date Date NOT NULL, updated_date Date NOT NULL, UNIQUE(field_key, dataset_id) );")
postgresConnect.execute("CREATE TABLE IF NOT EXISTS dataset_source_config ( id text PRIMARY KEY, dataset_id text NOT NULL REFERENCES datasets (id), connector_type text NOT NULL, connector_config json NOT NULL, status text NOT NULL, connector_stats json, created_by text NOT NULL, updated_by text NOT NULL, created_date Date NOT NULL, updated_date Date NOT NULL, UNIQUE(connector_type, dataset_id) );")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ class TestDatasetRegistrySpec extends BaseSpecWithDatasetRegistry with Matchers
postgresConnect.execute("insert into dataset_source_config values('sc1', 'd1', 'kafka', '{\"kafkaBrokers\":\"localhost:9090\",\"topic\":\"test-topic\"}', 'Live', null, 'System', 'System', now(), now());")
postgresConnect.execute("insert into dataset_source_config values('sc2', 'd1', 'rdbms', '{\"type\":\"postgres\",\"tableName\":\"test-table\"}', 'Live', null, 'System', 'System', now(), now());")

//postgresConnect.execute("CREATE TABLE IF NOT EXISTS datasources ( id text PRIMARY KEY, dataset_id text REFERENCES datasets (id), ingestion_spec json NOT NULL, datasource text NOT NULL, datasource_ref text NOT NULL, retention_period json, archival_policy json, purge_policy json, backup_config json NOT NULL, status text NOT NULL, created_by text NOT NULL, updated_by text NOT NULL, created_date Date NOT NULL, updated_date Date NOT NULL );")
postgresConnect.execute("insert into datasources values('ds1', 'd1', '{}', 'd1-datasource', 'd1-datasource-1', null, null, null, '{}', 'Live', 'System', 'System', now(), now());")
//postgresConnect.execute("CREATE TABLE IF NOT EXISTS datasources ( id text PRIMARY KEY, dataset_id text REFERENCES datasets (id), type text NOT NULL, ingestion_spec json NOT NULL, datasource text NOT NULL, datasource_ref text NOT NULL, retention_period json, archival_policy json, purge_policy json, backup_config json NOT NULL, status text NOT NULL, created_by text NOT NULL, updated_by text NOT NULL, created_date Date NOT NULL, updated_date Date NOT NULL );")
postgresConnect.execute("insert into datasources values('ds1', 'd1', 'druid', '{}', 'd1-datasource', 'd1-datasource-1', null, null, null, '{}', 'Live', 'System', 'System', now(), now());")
postgresConnect.closeConnection()
}
}

0 comments on commit 94a0378

Please sign in to comment.