Skip to content

Commit

Permalink
docs: mssql to snowflake migration example
Browse files Browse the repository at this point in the history
  • Loading branch information
snandam committed Jan 13, 2025
1 parent 39ccf63 commit b17964b
Show file tree
Hide file tree
Showing 11 changed files with 634 additions and 0 deletions.
183 changes: 183 additions & 0 deletions examples/command/portals/snowflake/example-8/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
# Microsoft SQL Server to Snowflake Migration

![Architecture](./diagram.png)

## Prerequisites

- curl
- Ockam
- Snowflake Account
- Microsoft SQL Server
- snowsql
- docker


## Get started with Ockam

[Signup for Ockam](https://www.ockam.io/signup) and then run the following commands on your workstation:

```sh
# Install Ockam Command
curl --proto '=https' --tlsv1.2 -sSfL https://install.command.ockam.io | bash && source "$HOME/.ockam/env"

# Enroll with Ockam Orchestrator.
ockam enroll

# Create an enrollment ticket for the node that will run from a linux machine where thesql server is reachable from
ockam project ticket --usage-count 1 --expires-in 4h --attribute mssql --relay mssql > mssql_outlet.ticket

```

## Setup Ockam node next to MS SQL Server

- Copy `setup_ockam_outlet.sh` to the linux machine where the MS SQL Server is reachable from.
- Copy `mssql_outlet.ticket` to the same location as `setup_ockam_outlet.sh` script

```sh
# Run the setup script
chmod +x setup_ockam_outlet.sh
DB_ENDPOINT="HOST:1433" ./setup_ockam_outlet.sh
```

- Create a demo table in MS SQL Server to follow the example

```sql
-- Create the database
CREATE DATABASE ockam_mssql_demo;
GO

-- Use the database
USE ockam_mssql_demo;
GO

-- Create the table
CREATE TABLE customers (id INT PRIMARY KEY, name VARCHAR(100));
GO

-- Insert some data
INSERT INTO customers (id, name) VALUES (1, 'John Doe');
INSERT INTO customers (id, name) VALUES (2, 'Jane Smith');
INSERT INTO customers (id, name) VALUES (3, 'Bob Johnson');

GO
```

## Setup Snowflake
- Configure `snowsql` with the snowflake account details

- Create the database, schema, role, compute pool, warehouse, and image repository.

```sh
# Run the init script and get the repository URL
snowsql -f snowflake_scripts/init.sql && \
repository_url=$(snowsql -o output_format=csv -o header=false -o timing=false \
-q "SHOW IMAGE REPOSITORIES;" | grep 'MSSQL_CONNECTOR_DB' | cut -d',' -f5 | tr -d '"') && \
echo "Repository URL: $repository_url"
```

> **Note**
> Respository URL will be similar to `XXX.registry.snowflakecomputing.com/mssql_connector_db/mssql_connector_schema/mssql_connector_repository`

## Push Ockam docker image and MS SQL Server client docker image

```sh
# Login to the repository
docker login $repository_url

# Push the Ockam docker image
ockam_image="ghcr.io/build-trust/ockam:0.146.0@sha256:b13ed188dbde6f5cae9d2c9c9e9305f9c36a009b1e5c126ac0d066537510f895"
docker pull $ockam_image && \
docker tag $ockam_image $repository_url/ockam:latest && \
docker push $repository_url/ockam:latest

# Build and Push the MS SQL Server client docker image
cd mssql_client && \
docker buildx build --platform linux/amd64 --load -t $repository_url/mssql_client:latest . && \
docker push $repository_url/mssql_client:latest && \
cd -
```

## Create an Ockam node and Python Client to connect to MS SQL Server in Snowpark Container Services

- Create network rules to allow the Ockam node to connect to your ockam project and for python client to connect to`ocsp.snowflakecomputing.com`

```bash
# Run from the same machine where you had enrolled to ockam project and created tickets
snowsql -f snowflake_scripts/access.sql --variable egress_list=$(ockam project show --jq '.egress_allow_list[0]')
```

- Create Service

```bash
# Replace the `TODO` values with the values for MS SQL Server
snowsql -f snowflake_scripts/service.sql \
--variable ockam_ticket="$(ockam project ticket --usage-count 1 --expires-in 10m --attribute snowflake)" \
--variable mssql_database="TODO" \
--variable mssql_user="TODO" \
--variable mssql_password="TODO"
```

- Ensure container services are running

```sql
-- Check service status
USE WAREHOUSE MSSQL_CONNECTOR_WH;
USE ROLE MSSQL_CONNECTOR_ROLE;
USE DATABASE MSSQL_CONNECTOR_DB;
USE SCHEMA MSSQL_CONNECTOR_SCHEMA;

SHOW SERVICES;
SELECT SYSTEM$GET_SERVICE_STATUS('MSSQL_CONNECTOR_CLIENT');

-- Check service logs
CALL SYSTEM$GET_SERVICE_LOGS('MSSQL_CONNECTOR_CLIENT', '0', 'http-endpoint', 100);
CALL SYSTEM$GET_SERVICE_LOGS('MSSQL_CONNECTOR_CLIENT', '0', 'ockam-inlet', 100);
```

> [!IMPORTANT]
> - `http-endpoint` is the endpoint that will be used to connect to the MS SQL Server. You will see `Successfully connected to SQL Server` in the logs upon successful connection.
> - `ockam-inlet` is the endpoint that will be used to connect to the Ockam node. Logs will indicate if there are any errors starting the node.
## Create table in Snowflake and grant access to the role

```sql
--Create demo table in Snowflake
CREATE TABLE MSSQL_CONNECTOR_DB.MSSQL_CONNECTOR_SCHEMA.CUSTOMERS (id INT PRIMARY KEY, name VARCHAR(100));

--Grant access to the role
GRANT SELECT, INSERT, UPDATE, DELETE, TRUNCATE ON TABLE MSSQL_CONNECTOR_DB.MSSQL_CONNECTOR_SCHEMA.CUSTOMERS TO ROLE MSSQL_CONNECTOR_ROLE;

```

## Create Stored Procedure to execute the function

```bash
# Run the functions script to create the stored procedures and functions
snowsql -f snowflake_scripts/functions.sql

```

## Access MS SQL Server from Snowflake

- Copy data from MS SQL Server to Snowflake

```sql
-- Copy data from MS SQL Server to Snowflake
CALL ockam_mssql_copy(
'SOURCE_TABLE',
'TARGET_TABLE'
);

-- Example using demo tables created in MS SQL Server and Snowflake
CALL ockam_mssql_copy(
'ockam_mssql_demo.dbo.customers',
'MSSQL_CONNECTOR_DB.MSSQL_CONNECTOR_SCHEMA.CUSTOMERS'
);
```

## Clean up

```bash
snowsql -f snowflake_scripts/cleanup.sql
```
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
ARG BASE_IMAGE=python:3.10-slim-buster
FROM $BASE_IMAGE

RUN pip install --upgrade pip && \
pip install flask snowflake snowflake-connector-python snowflake-snowpark-python pandas

RUN pip install pymssql==2.2.11

COPY service.py ./
COPY connection.py ./

CMD ["python3", "service.py"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import os
import logging
import snowflake.connector
from snowflake.snowpark import Session


def session() -> Session:
"""
Create a session for the connection
:return: Session
"""
logging.info(f"Create a session")
return Session.builder.configs({"connection": connection()}).create()


def connection() -> snowflake.connector.SnowflakeConnection:
"""
Create a connection, either from inside the native application when deployed
or with user/password credentials when testing locally
:return: A SnowflakeConnection
"""
logging.info(f"Create a connection")
if os.path.isfile("/snowflake/session/token"):
logging.info(f"Use an OAUTH token")
creds = {
'account': os.getenv('SNOWFLAKE_ACCOUNT'),
'host': os.getenv('SNOWFLAKE_HOST'),
'port': os.getenv('SNOWFLAKE_PORT'),
'protocol': 'https',
'warehouse': os.getenv('SNOWFLAKE_WAREHOUSE'),
'database': os.getenv('SNOWFLAKE_DATABASE'),
'schema': os.getenv('SNOWFLAKE_SCHEMA'),
'role': os.getenv('SNOWFLAKE_ROLE'),
'authenticator': "oauth",
'token': open('/snowflake/session/token', 'r').read(),
'client_session_keep_alive': True,
'ocsp_fail_open': False,
'validate_default_parameters': True,
}
#logging.info(f"the creds are {creds}")
else:
creds = {
'account': os.getenv('SNOWFLAKE_ACCOUNT'),
'user': os.getenv('SNOWFLAKE_USER'),
'password': os.getenv('SNOWFLAKE_PASSWORD'),
'warehouse': os.getenv('SNOWFLAKE_WAREHOUSE'),
'database': os.getenv('SNOWFLAKE_DATABASE'),
'schema': os.getenv('SNOWFLAKE_SCHEMA'),
'client_session_keep_alive': True
}

return snowflake.connector.connect(**creds)
137 changes: 137 additions & 0 deletions examples/command/portals/snowflake/example-8/mssql_client/service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import logging
import os
import sys
import pymssql
import connection
from flask import Flask, request
from snowflake import connector
from snowflake.connector.errors import ProgrammingError, DatabaseError

# Environment variables
WAREHOUSE = os.getenv('SNOWFLAKE_WAREHOUSE', "reference('WAREHOUSE')")
LOG_LEVEL = os.getenv('LOG_LEVEL', 'INFO').upper()
logging.basicConfig(level=LOG_LEVEL, format='%(asctime)s - %(levelname)s - %(message)s')

app = Flask(__name__)

# MSSQL Configuration
MSSQL_USER = os.environ.get('MSSQL_USER')
MSSQL_PASSWORD = os.environ.get('MSSQL_PASSWORD')
MSSQL_DATABASE = os.environ.get('MSSQL_DATABASE')
MSSQL_SERVER = os.environ.get('ENDPOINT_HOST', 'localhost')
MSSQL_PORT = os.environ.get('ENDPOINT_PORT', '1433')

# Snowflake session
session = None

# MSSQL Connection
def get_mssql_connection():
"""Create MSSQL connection"""
return pymssql.connect(
server=MSSQL_SERVER,
user=MSSQL_USER,
password=MSSQL_PASSWORD,
database=MSSQL_DATABASE,
port=MSSQL_PORT
)

def test_mssql_connection():
"""Test the MSSQL database connection"""
try:
with get_mssql_connection() as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT @@VERSION AS SQLServerVersion;")
version = cursor.fetchone()[0]
logging.info(f"Successfully connected to SQL Server")
logging.info(f"Server Version: {version}")
except Exception as e:
logging.error(f"Connection test failed: {str(e)}")
raise

# Snowflake Query
def execute_snowflake_query(query, values=[]) -> bool:
"""Execute a Snowflake SQL query"""
try:
logging.info(f"Executing query: {query}, with values {values}")
session.sql(query, values).collect()
logging.info(f"Query execution successful")
return True
except (ProgrammingError, DatabaseError) as e:
logging.error(f"Snowflake Error: {type(e).__name__} - {str(e)}")
return False
except Exception as e:
logging.error(f"Unexpected error executing query: {type(e).__name__} - {str(e)}")
return False

def use_snowflake_referenced_warehouse():
"""Use the warehouse referenced as 'WAREHOUSE' for the current session"""
try:
logging.info(f"Use the referenced warehouse")
result = session.sql(f"USE WAREHOUSE {WAREHOUSE}").collect()
logging.info(f"Result of USE WAREHOUSE: {result}.")
except Exception as e:
logging.error(f"Cannot use the referenced warehouse: {type(e).__name__} - {str(e)}")
raise

# Snowflake Copy
@app.route("/copy_to_snowflake", methods=["POST"])
def copy_to_snowflake():
message = request.json
logging.info(f"Received message: {message}")

source_table = message['data'][0][1]
target_table = message['data'][0][2]

logging.info(f"Copying from {source_table} to {target_table}")

try:
# Get data from MSSQL
with get_mssql_connection() as conn:
with conn.cursor() as cursor:
cursor.execute(f"SELECT * FROM {source_table}")
columns = [column[0] for column in cursor.description]
rows = [list(row) for row in cursor.fetchall()]

# Format data for Snowflake
columns_str = ", ".join(columns)
values_list = []
for row in rows:
values = [f"'{str(val)}'" if isinstance(val, str) else str(val) for val in row]
values_list.append(f"({', '.join(values)})")

# Insert data into Snowflake
insert_sql = f"INSERT INTO {target_table} ({columns_str}) VALUES {', '.join(values_list)}"
result = execute_snowflake_query(insert_sql)
if result is False:
error_msg = "Failed to insert data into target table. Check service logs for more details."
logging.error(error_msg)
return {
'data': [[0, error_msg]]
}

rows_inserted = len(rows)
return {
'data': [[0, f"Successfully copied {rows_inserted} rows from {source_table} to {target_table}"]]
}
except Exception as e:
error_msg = str(e)
logging.error(f"Error: {error_msg}")
return {
'data': [[0, f"Error: {error_msg}"]]
}

def main():
global session
# Connect to Snowflake
session = connection.session()
try:
logging.info(f"Start the server")
use_snowflake_referenced_warehouse()
test_mssql_connection()
app.run(host='0.0.0.0', port=8080)
except Exception as e:
logging.error(f"Fatal error in main: {e}")
sys.exit(1)

if __name__ == '__main__':
main()
Loading

0 comments on commit b17964b

Please sign in to comment.