Skip to content

Commit

Permalink
Merge branch 'main' into walheartbeat-for-peers
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik authored Nov 24, 2023
2 parents 70f1030 + 963b6f1 commit 79e28a7
Show file tree
Hide file tree
Showing 12 changed files with 150 additions and 30 deletions.
1 change: 1 addition & 0 deletions docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ services:
<<: *catalog-config
DATABASE_URL: postgres://postgres:postgres@catalog:5432/postgres
PEERDB_FLOW_SERVER_HTTP: http://flow_api:8113
PEERDB_PASSWORD: peerdb

volumes:
pgdata:
18 changes: 10 additions & 8 deletions nexus/peer-bigquery/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ mod stream;

pub struct BigQueryQueryExecutor {
peer_name: String,
config: BigqueryConfig,
project_id: String,
dataset_id: String,
peer_connections: Arc<PeerConnectionTracker>,
client: Box<Client>,
cursor_manager: BigQueryCursorManager,
}

pub async fn bq_client_from_config(config: BigqueryConfig) -> anyhow::Result<Client> {
pub async fn bq_client_from_config(config: &BigqueryConfig) -> anyhow::Result<Client> {
let sa_key = yup_oauth2::ServiceAccountKey {
key_type: Some(config.auth_type.clone()),
project_id: Some(config.project_id.clone()),
Expand All @@ -52,12 +53,13 @@ impl BigQueryQueryExecutor {
config: &BigqueryConfig,
peer_connections: Arc<PeerConnectionTracker>,
) -> anyhow::Result<Self> {
let client = bq_client_from_config(config.clone()).await?;
let client = bq_client_from_config(config).await?;
let client = Box::new(client);
let cursor_manager = BigQueryCursorManager::new();
Ok(Self {
peer_name,
config: config.clone(),
project_id: config.project_id.clone(),
dataset_id: config.dataset_id.clone(),
peer_connections,
client,
cursor_manager,
Expand All @@ -82,7 +84,7 @@ impl BigQueryQueryExecutor {
let result_set = self
.client
.job()
.query(&self.config.project_id, query_req)
.query(&self.project_id, query_req)
.await
.map_err(|err| {
tracing::error!("error running query: {}", err);
Expand Down Expand Up @@ -112,7 +114,7 @@ impl QueryExecutor for BigQueryQueryExecutor {
let mut query = query.clone();
let bq_ast = ast::BigqueryAst::default();
bq_ast
.rewrite(&self.config.dataset_id, &mut query)
.rewrite(&self.dataset_id, &mut query)
.context("unable to rewrite query")
.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
Expand Down Expand Up @@ -222,7 +224,7 @@ impl QueryExecutor for BigQueryQueryExecutor {
let mut query = query.clone();
let bq_ast = ast::BigqueryAst::default();
bq_ast
.rewrite(&self.config.dataset_id, &mut query)
.rewrite(&self.dataset_id, &mut query)
.context("unable to rewrite query")
.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
Expand Down Expand Up @@ -260,7 +262,7 @@ impl QueryExecutor for BigQueryQueryExecutor {
let _result_set = self
.client
.job()
.query(&self.config.project_id, QueryRequest::new(sql))
.query(&self.project_id, QueryRequest::new(sql))
.await?;
Ok(true)
}
Expand Down
9 changes: 4 additions & 5 deletions nexus/peer-snowflake/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,20 @@ pub struct SnowflakeAuth {
}

impl SnowflakeAuth {
// When initializing, private_key must not be copied, to improve security of credentials.
#[tracing::instrument(name = "peer_sflake::init_client_auth", skip_all)]
pub fn new(
account_id: String,
username: String,
private_key: String,
password: Option<String>,
private_key: &str,
password: Option<&str>,
refresh_threshold: u64,
expiry_threshold: u64,
) -> anyhow::Result<Self> {
let pkey = match password {
Some(pw) => DecodePrivateKey::from_pkcs8_encrypted_pem(&private_key, pw)
Some(pw) => DecodePrivateKey::from_pkcs8_encrypted_pem(private_key, pw)
.context("Invalid private key or decryption failed")?,
None => {
DecodePrivateKey::from_pkcs8_pem(&private_key).context("Invalid private key")?
DecodePrivateKey::from_pkcs8_pem(private_key).context("Invalid private key")?
}
};
let mut snowflake_auth: SnowflakeAuth = SnowflakeAuth {
Expand Down
4 changes: 2 additions & 2 deletions nexus/peer-snowflake/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ impl SnowflakeQueryExecutor {
auth: SnowflakeAuth::new(
config.account_id.clone(),
config.username.clone(),
config.private_key.clone(),
config.password.clone(),
&config.private_key,
config.password.as_deref(),
DEFAULT_REFRESH_THRESHOLD,
DEFAULT_EXPIRY_THRESHOLD,
)?,
Expand Down
20 changes: 9 additions & 11 deletions nexus/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,9 @@ impl NexusBackend {

async fn get_peer_of_mirror(
catalog: &MutexGuard<'_, Catalog>,
peer_name: String,
peer_name: &str,
) -> PgWireResult<Peer> {
let peer = catalog.get_peer(&peer_name).await.map_err(|err| {
let peer = catalog.get_peer(peer_name).await.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("unable to get peer {:?}: {:?}", peer_name, err),
}))
Expand All @@ -192,7 +192,7 @@ impl NexusBackend {

fn handle_mirror_existence(
if_not_exists: bool,
flow_name: String,
flow_name: &str,
) -> PgWireResult<Vec<Response<'static>>> {
if if_not_exists {
let existing_mirror_success = "MIRROR ALREADY EXISTS";
Expand Down Expand Up @@ -389,7 +389,7 @@ impl NexusBackend {
None,
))])
} else {
Self::handle_mirror_existence(*if_not_exists, qrep_flow_job.name.clone())
Self::handle_mirror_existence(*if_not_exists, &qrep_flow_job.name)
}
}
_ => unreachable!(),
Expand Down Expand Up @@ -487,11 +487,9 @@ impl NexusBackend {

// get source and destination peers
let src_peer =
Self::get_peer_of_mirror(&catalog, flow_job.source_peer.clone())
.await?;
Self::get_peer_of_mirror(&catalog, &flow_job.source_peer).await?;
let dst_peer =
Self::get_peer_of_mirror(&catalog, flow_job.target_peer.clone())
.await?;
Self::get_peer_of_mirror(&catalog, &flow_job.target_peer).await?;

// make a request to the flow service to start the job.
let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await;
Expand Down Expand Up @@ -519,7 +517,7 @@ impl NexusBackend {
None,
))])
} else {
Self::handle_mirror_existence(*if_not_exists, flow_job.name.clone())
Self::handle_mirror_existence(*if_not_exists, &flow_job.name)
}
}
PeerDDL::CreateMirrorForSelect { .. } => {
Expand Down Expand Up @@ -931,9 +929,9 @@ impl NexusBackend {

let executor: Arc<dyn QueryExecutor> = match &peer.config {
Some(Config::BigqueryConfig(ref c)) => {
let peer_name = peer.name.clone();
let executor =
BigQueryQueryExecutor::new(peer_name, c, self.peer_connections.clone()).await?;
BigQueryQueryExecutor::new(peer.name.clone(), c, self.peer_connections.clone())
.await?;
Arc::new(executor)
}
Some(Config::PostgresConfig(ref c)) => {
Expand Down
13 changes: 13 additions & 0 deletions ui/app/api/login/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { cookies } from 'next/headers';

export async function POST(request: Request) {
const { password } = await request.json();
if (process.env.PEERDB_PASSWORD !== password) {
return new Response(JSON.stringify({ error: 'wrong password' }));
}
cookies().set('auth', password, {
expires: Date.now() + 24 * 60 * 60 * 1000,
secure: process.env.PEERDB_SECURE_COOKIES === 'true',
});
return new Response('{}');
}
6 changes: 6 additions & 0 deletions ui/app/api/logout/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import { cookies } from 'next/headers';

export async function POST(req: Request) {
cookies().delete('auth');
return new Response('');
}
61 changes: 61 additions & 0 deletions ui/app/login/page.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
'use client';
import Image from 'next/image';
import { useRouter, useSearchParams } from 'next/navigation';
import { useState } from 'react';

import { Button } from '@/lib/Button';
import { Layout, LayoutMain } from '@/lib/Layout';
import { TextField } from '@/lib/TextField';

export default function Login() {
const router = useRouter();
const searchParams = useSearchParams();
const [pass, setPass] = useState('');
const [error, setError] = useState(() =>
searchParams.has('reject') ? 'Authentication failed, please login' : ''
);
const login = () => {
fetch('/api/login', {
method: 'POST',
body: JSON.stringify({ password: pass }),
})
.then((res) => res.json())
.then((res) => {
if (res.error) setError(res.error);
else router.push('/');
});
};
return (
<Layout>
<LayoutMain alignSelf='center' justifySelf='center' width='xxLarge'>
<Image src='/images/peerdb-combinedMark.svg' alt='PeerDB' width={512} />
{error && (
<div
style={{
borderRadius: '8px',
fontWeight: 'bold',
color: '#600',
backgroundColor: '#c66',
}}
>
{error}
</div>
)}
<TextField
variant='simple'
placeholder='Password'
value={pass}
onChange={(e: React.ChangeEvent<HTMLInputElement>) =>
setPass(e.target.value)
}
onKeyPress={(e: React.KeyboardEvent<HTMLInputElement>) => {
if (e.key === 'Enter') {
login();
}
}}
/>
<Button onClick={login}>Login</Button>
</LayoutMain>
</Layout>
);
}
3 changes: 2 additions & 1 deletion ui/app/page.tsx
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import SidebarComponent from '@/components/SidebarComponent';
import { Header } from '@/lib/Header';
import { Layout, LayoutMain } from '@/lib/Layout';
import { cookies } from 'next/headers';

export default function Home() {
return (
<Layout sidebar={<SidebarComponent />}>
<Layout sidebar={<SidebarComponent logout={!!cookies().get('auth')} />}>
<LayoutMain alignSelf='center' justifySelf='center' width='xxLarge'>
<Header variant='largeTitle'>PeerDB Home Page</Header>
</LayoutMain>
Expand Down
16 changes: 16 additions & 0 deletions ui/components/Logout.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
'use client';
import { Button } from '@/lib/Button';

export default function Logout() {
return (
<Button
onClick={() =>
fetch('/api/logout', { method: 'POST' }).then((res) =>
location.assign('/login')
)
}
>
Logout
</Button>
);
}
6 changes: 3 additions & 3 deletions ui/components/SidebarComponent.tsx
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
'use client';

import useTZStore from '@/app/globalstate/time';
import Logout from '@/components/Logout';
import { BrandLogo } from '@/lib/BrandLogo';
import { Button } from '@/lib/Button';
import { Icon } from '@/lib/Icon';
import { Label } from '@/lib/Label';
import { RowWithSelect } from '@/lib/Layout';
import { Sidebar, SidebarItem } from '@/lib/Sidebar';
import Link from 'next/link';

export default function SidebarComponent() {
export default function SidebarComponent(props: { logout?: boolean }) {
const timezones = ['UTC', 'Local', 'Relative'];
const setZone = useTZStore((state) => state.setZone);
const zone = useTZStore((state) => state.timezone);
Expand Down Expand Up @@ -60,7 +60,7 @@ export default function SidebarComponent() {
/>
</div>
</div>
<Button className='w-full'>Help and Support</Button>
{props.logout && <Logout />}
</>
}
bottomLabel={<Label variant='footnote'>App. v0.7.0</Label>}
Expand Down
23 changes: 23 additions & 0 deletions ui/middleware.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import type { NextRequest } from 'next/server';
import { NextResponse } from 'next/server';

export default function middleware(req: NextRequest) {
if (
req.nextUrl.pathname !== '/login' &&
req.nextUrl.pathname !== '/api/login' &&
req.nextUrl.pathname !== '/api/logout' &&
process.env.PEERDB_PASSWORD &&
req.cookies.get('auth')?.value !== process.env.PEERDB_PASSWORD
) {
req.cookies.delete('auth');
return NextResponse.redirect(new URL('/login?reject', req.url));
}
return NextResponse.next();
}

export const config = {
matcher: [
// Match everything other than static assets
'/((?!_next/static/|images/|favicon.ico$).*)',
],
};

0 comments on commit 79e28a7

Please sign in to comment.