Skip to content

Commit

Permalink
feat!(cdviz-db): use the stored procedure store_cdevent instead of …
Browse files Browse the repository at this point in the history
…direct sql insert
  • Loading branch information
davidB committed Oct 14, 2024
1 parent fdbcab9 commit a32d555
Show file tree
Hide file tree
Showing 10 changed files with 121 additions and 82 deletions.

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 11 additions & 30 deletions cdviz-collector/src/sinks/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use crate::{errors::Result, Message};
use serde::{Deserialize, Serialize};
use sqlx::postgres::PgPoolOptions;
use sqlx::PgPool;
use time::OffsetDateTime;
use tracing::Instrument;

use super::Sink;
Expand Down Expand Up @@ -67,13 +66,7 @@ impl Sink for DbSink {
store_event(
&self.pool,
// TODO build Event from raw json
Event {
timestamp: *message.cdevent.timestamp(),
payload: serde_json::to_value(&message.cdevent)?,
subject: message.cdevent.subject().content().subject().to_lowercase(),
predicate: message.cdevent.subject().content().predicate().to_string(),
version: None,
},
Event { payload: serde_json::to_value(&message.cdevent)? },
)
.await?;
Ok(())
Expand All @@ -82,11 +75,7 @@ impl Sink for DbSink {

#[derive(sqlx::FromRow)]
struct Event {
timestamp: OffsetDateTime,
payload: serde_json::Value,
subject: String,
predicate: String,
version: Option<[i32; 3]>,
}

// basic handmade span far to be compliant with
Expand All @@ -107,20 +96,10 @@ fn build_otel_span(db_operation: &str) -> tracing::Span {

// store event as json in db (postgresql using sqlx)
async fn store_event(pg_pool: &PgPool, event: Event) -> Result<()> {
sqlx::query!(
r#"
INSERT INTO cdevents_lake (timestamp, payload, subject, predicate, version)
VALUES ($1, $2, $3, $4, $5)
"#,
event.timestamp,
event.payload,
event.subject,
event.predicate,
event.version.as_ref().map(|x| x.as_slice()),
)
.execute(pg_pool)
.instrument(build_otel_span("INSERT INTO events"))
.await?;
sqlx::query!("CALL store_cdevent($1)", event.payload)
.execute(pg_pool)
.instrument(build_otel_span("store_cdevent"))
.await?;
Ok(())
}

Expand Down Expand Up @@ -174,10 +153,12 @@ mod tests {
};
let dbsink = DbSink::try_from(config).unwrap();
//Basic initialize the db schema
//TODO improve the loading, initialisation of the db
for sql in read_to_string("../cdviz-db/src/schema.sql").unwrap().split(';') {
sqlx::QueryBuilder::new(sql).build().execute(&dbsink.pool).await.unwrap();
}
// A transaction is implicitly created for the all file so some instruction could be applied
// -- { severity: Error, code: "25001", message: "CREATE INDEX CONCURRENTLY cannot run inside a transaction block",
sqlx::raw_sql(read_to_string("../cdviz-db/src/schema.sql").unwrap().as_str())
.execute(&dbsink.pool)
.await
.unwrap();
// container should be keep, else it is remove on drop
(dbsink, pg_container)
}
Expand Down
3 changes: 2 additions & 1 deletion cdviz-collector/taskfile.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,13 @@ tasks:
PG_OFFLINE_URL: "postgres://{{.PG_OFFLINE_USER}}:{{.PG_OFFLINE_PWD}}@127.0.0.1:5432/{{.PG_OFFLINE_USER}}"
cmds:
- docker rm -f postgres || true
- sleep 3
- docker run --name postgres
-e POSTGRES_PASSWORD={{.PG_OFFLINE_PWD}}
-e POSTGRES_USER={{.PG_OFFLINE_USER}}
-v {{.USER_WORKING_DIR}}/../cdviz-db/src:/docker-entrypoint-initdb.d
-p 5432:5432 -d postgres:16
- sleep 3
- sleep 5
- sqlx database create --database-url {{.PG_OFFLINE_URL}}
- cargo sqlx prepare --workspace --database-url {{.PG_OFFLINE_URL}}
- sqlx database drop -y --database-url {{.PG_OFFLINE_URL}}
Expand Down
14 changes: 0 additions & 14 deletions cdviz-db/migrations/20240316184734.sql

This file was deleted.

8 changes: 0 additions & 8 deletions cdviz-db/migrations/20240401171152.sql

This file was deleted.

52 changes: 52 additions & 0 deletions cdviz-db/migrations/20241014134101.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
-- Create "cdevents_lake" table
CREATE TABLE "cdevents_lake" ("id" bigint NOT NULL GENERATED ALWAYS AS IDENTITY, "imported_at" timestamptz NOT NULL DEFAULT CURRENT_TIMESTAMP, "timestamp" timestamptz NOT NULL, "payload" jsonb NOT NULL, "subject" character varying(100) NOT NULL, "predicate" character varying(100) NOT NULL, "version" integer[] NULL, "context_id" character varying(100) NOT NULL, PRIMARY KEY ("id"));
-- Create index "cdevents_lake_context_id_key" to table: "cdevents_lake"
CREATE UNIQUE INDEX "cdevents_lake_context_id_key" ON "cdevents_lake" ("context_id");
-- Create index "idx_subject" to table: "cdevents_lake"
CREATE INDEX "idx_subject" ON "cdevents_lake" ("subject");
-- Create index "idx_timestamp" to table: "cdevents_lake"
CREATE INDEX "idx_timestamp" ON "cdevents_lake" ("timestamp");
-- Set comment to table: "cdevents_lake"
COMMENT ON TABLE "cdevents_lake" IS 'table of stored cdevents without transformation';
-- Set comment to column: "imported_at" on table: "cdevents_lake"
COMMENT ON COLUMN "cdevents_lake" ."imported_at" IS 'the timestamp when the cdevent was stored into the table';
-- Set comment to column: "timestamp" on table: "cdevents_lake"
COMMENT ON COLUMN "cdevents_lake" ."timestamp" IS 'timestamp of cdevents extracted from context.timestamp in the json';
-- Set comment to column: "payload" on table: "cdevents_lake"
COMMENT ON COLUMN "cdevents_lake" ."payload" IS 'the full cdevent in json format';
-- Set comment to column: "subject" on table: "cdevents_lake"
COMMENT ON COLUMN "cdevents_lake" ."subject" IS 'subject extracted from context.type in the json';
-- Set comment to column: "predicate" on table: "cdevents_lake"
COMMENT ON COLUMN "cdevents_lake" ."predicate" IS 'predicate of the subject, extracted from context.type in the json';
-- Set comment to column: "version" on table: "cdevents_lake"
COMMENT ON COLUMN "cdevents_lake" ."version" IS 'the version of the suject s type, extracted from context.type. The version number are split in 0 for major, 1 for minor, 2 for patch';
-- Set comment to column: "context_id" on table: "cdevents_lake"
COMMENT ON COLUMN "cdevents_lake" ."context_id" IS 'the id of the event, extracted from context.id';

create or replace procedure store_cdevent(
cdevent jsonb
)
as $$
declare
ts timestamp with time zone;
tpe varchar(255);
context_id varchar(100);
tpe_subject varchar(100);
tpe_predicate varchar(100);
tpe_version INTEGER[3];
begin
context_id := (cdevent -> 'context' ->> 'id');
tpe := (cdevent -> 'context' ->> 'type');
tpe_subject := SPLIT_PART(tpe, '.', 3);
tpe_predicate := SPLIT_PART(tpe, '.', 4);
tpe_version[0]:= SPLIT_PART(tpe, '.', 5)::INTEGER;
tpe_version[1]:= SPLIT_PART(tpe, '.', 6)::INTEGER;
tpe_version[2]:= SPLIT_PART(SPLIT_PART(tpe, '.', 7), '-', 1)::INTEGER;
-- if (jsonb_typeof(cdevent -> 'context' ->> 'timestamp') = 'timestampz') then
ts := (cdevent -> 'context' ->> 'timestamp')::timestamp with time zone;
-- else
-- raise exception 'Input Jsonb doesn not contain a valid timestamp';
-- end if;
insert into "cdevents_lake"("payload", "timestamp", "subject", "predicate", "version", "context_id") values(cdevent, ts, tpe_subject, tpe_predicate, tpe_version, context_id);
end;
$$ language plpgsql;
5 changes: 2 additions & 3 deletions cdviz-db/migrations/atlas.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
h1:G8sqtBxbE8rQ0ZReTiVO01ugCUK0Z5U1/tn0i10a0H4=
20240316184734.sql h1:P4X1rAK3TUnBFjHwTZw+Lc7l3x9qkKXmTk8Y56GjMvI=
20240401171152.sql h1:Oh6ZIUfdo8j/LCUmA+YWmv28MIJYKsioxYIAOSA2MhI=
h1:kGNUpVkZ+2Ry6GZZbPz9UFUb+Y6TemKiz3AlgdKiHTQ=
20241014134101.sql h1:kRGH4LbTomyoYSmJ6M22O3cTR9d3ds8QbBODiTYe/10=
39 changes: 35 additions & 4 deletions cdviz-db/src/schema.sql
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
-- Add up migration script here
-- cdevents_lake
CREATE TABLE IF NOT EXISTS "cdevents_lake" (
"id" BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
"imported_at" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
"timestamp" TIMESTAMP WITH TIME ZONE NOT NULL,
"payload" JSONB NOT NULL,
"subject" VARCHAR(100) NOT NULL,
"predicate" VARCHAR(100) NOT NULL,
"version" INTEGER[3]
"version" INTEGER[3],
"context_id" VARCHAR(100) UNIQUE NOT NULL
);

COMMENT ON TABLE "cdevents_lake" IS 'table of stored cdevents without transformation';
Expand All @@ -15,11 +16,41 @@ COMMENT ON COLUMN "cdevents_lake"."timestamp" IS 'timestamp of cdevents extracte
COMMENT ON COLUMN "cdevents_lake"."payload" IS 'the full cdevent in json format';
COMMENT ON COLUMN "cdevents_lake"."subject" IS 'subject extracted from context.type in the json';
COMMENT ON COLUMN "cdevents_lake"."predicate" IS 'predicate of the subject, extracted from context.type in the json';
COMMENT ON COLUMN "cdevents_lake"."version" IS 'the version of the suject s type, extracted from context.type. The verion number are split in 0 for major, 1 for minor, 2 for patch';
COMMENT ON COLUMN "cdevents_lake"."version" IS 'the version of the suject s type, extracted from context.type. The version number are split in 0 for major, 1 for minor, 2 for patch';
COMMENT ON COLUMN "cdevents_lake"."context_id" IS 'the id of the event, extracted from context.id';

CREATE INDEX IF NOT EXISTS "idx_timestamp" ON "cdevents_lake"("timestamp");
CREATE INDEX IF NOT EXISTS "idx_timestamp" ON "cdevents_lake" using BRIN("timestamp");
CREATE INDEX IF NOT EXISTS "idx_subject" ON "cdevents_lake"("subject");

-- store_cdevent
create or replace procedure store_cdevent(
cdevent jsonb
)
as $$
declare
ts timestamp with time zone;
tpe varchar(255);
context_id varchar(100);
tpe_subject varchar(100);
tpe_predicate varchar(100);
tpe_version INTEGER[3];
begin
context_id := (cdevent -> 'context' ->> 'id');
tpe := (cdevent -> 'context' ->> 'type');
tpe_subject := SPLIT_PART(tpe, '.', 3);
tpe_predicate := SPLIT_PART(tpe, '.', 4);
tpe_version[0]:= SPLIT_PART(tpe, '.', 5)::INTEGER;
tpe_version[1]:= SPLIT_PART(tpe, '.', 6)::INTEGER;
tpe_version[2]:= SPLIT_PART(SPLIT_PART(tpe, '.', 7), '-', 1)::INTEGER;
-- if (jsonb_typeof(cdevent -> 'context' ->> 'timestamp') = 'timestampz') then
ts := (cdevent -> 'context' ->> 'timestamp')::timestamp with time zone;
-- else
-- raise exception 'Input Jsonb doesn not contain a valid timestamp';
-- end if;
insert into "cdevents_lake"("payload", "timestamp", "subject", "predicate", "version", "context_id") values(cdevent, ts, tpe_subject, tpe_predicate, tpe_version, context_id);
end;
$$ language plpgsql;

-- create a view based on fields in the json payload
-- source: [Postgresql json column to view - Database Administrators Stack Exchange](https://dba.stackexchange.com/questions/151838/postgresql-json-column-to-view?newreg=ed0a9389843a45699bfb02559dd32038)
-- DO $$
Expand Down
9 changes: 5 additions & 4 deletions cdviz-db/taskfile.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
version: '3'
version: "3"

tasks:

plan:
desc: "update the migrations to reflect target `src/schema.sql`"
cmds:
Expand All @@ -16,7 +15,9 @@ tasks:

db-local:start-empty:
desc: "start a container for the local db (empty: no data, no schema)"
cmd: docker run --name cdviz-db -e "POSTGRES_PASSWORD=$PG_LOCAL_PWD" -e "POSTGRES_USER=$PG_LOCAL_USER" -p 5432:5432 -d postgres:16.1
cmds:
- task: db-local:stop
- docker run --name cdviz-db -e "POSTGRES_PASSWORD=$PG_LOCAL_PWD" -e "POSTGRES_USER=$PG_LOCAL_USER" -p 5432:5432 -d postgres:16.1

db-local:start:
desc: "start a container for the local db with the migrations applied"
Expand All @@ -39,4 +40,4 @@ tasks:
cmds:
# - task: check
# - task: lint
- task: test
- task: test

0 comments on commit a32d555

Please sign in to comment.