-
-
Notifications
You must be signed in to change notification settings - Fork 561
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
docs: mssql to snowflake migration example
- Loading branch information
Showing
11 changed files
with
634 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,183 @@ | ||
# Microsoft SQL Server to Snowflake Migration | ||
|
||
 | ||
|
||
## Prerequisites | ||
|
||
- curl | ||
- Ockam | ||
- Snowflake Account | ||
- Microsoft SQL Server | ||
- snowsql | ||
- docker | ||
|
||
|
||
## Get started with Ockam | ||
|
||
[Signup for Ockam](https://www.ockam.io/signup) and then run the following commands on your workstation: | ||
|
||
```sh | ||
# Install Ockam Command | ||
curl --proto '=https' --tlsv1.2 -sSfL https://install.command.ockam.io | bash && source "$HOME/.ockam/env" | ||
|
||
# Enroll with Ockam Orchestrator. | ||
ockam enroll | ||
|
||
# Create an enrollment ticket for the node that will run from a linux machine where thesql server is reachable from | ||
ockam project ticket --usage-count 1 --expires-in 4h --attribute mssql --relay mssql > mssql_outlet.ticket | ||
|
||
``` | ||
|
||
## Setup Ockam node next to MS SQL Server | ||
|
||
- Copy `setup_ockam_outlet.sh` to the linux machine where the MS SQL Server is reachable from. | ||
- Copy `mssql_outlet.ticket` to the same location as `setup_ockam_outlet.sh` script | ||
|
||
```sh | ||
# Run the setup script | ||
chmod +x setup_ockam_outlet.sh | ||
DB_ENDPOINT="HOST:1433" ./setup_ockam_outlet.sh | ||
``` | ||
|
||
- Create a demo table in MS SQL Server to follow the example | ||
|
||
```sql | ||
-- Create the database | ||
CREATE DATABASE ockam_mssql_demo; | ||
GO | ||
|
||
-- Use the database | ||
USE ockam_mssql_demo; | ||
GO | ||
|
||
-- Create the table | ||
CREATE TABLE customers (id INT PRIMARY KEY, name VARCHAR(100)); | ||
GO | ||
|
||
-- Insert some data | ||
INSERT INTO customers (id, name) VALUES (1, 'John Doe'); | ||
INSERT INTO customers (id, name) VALUES (2, 'Jane Smith'); | ||
INSERT INTO customers (id, name) VALUES (3, 'Bob Johnson'); | ||
|
||
GO | ||
``` | ||
|
||
## Setup Snowflake | ||
- Configure `snowsql` with the snowflake account details | ||
|
||
- Create the database, schema, role, compute pool, warehouse, and image repository. | ||
|
||
```sh | ||
# Run the init script and get the repository URL | ||
snowsql -f snowflake_scripts/init.sql && \ | ||
repository_url=$(snowsql -o output_format=csv -o header=false -o timing=false \ | ||
-q "SHOW IMAGE REPOSITORIES;" | grep 'MSSQL_CONNECTOR_DB' | cut -d',' -f5 | tr -d '"') && \ | ||
echo "Repository URL: $repository_url" | ||
``` | ||
|
||
> **Note** | ||
> Respository URL will be similar to `XXX.registry.snowflakecomputing.com/mssql_connector_db/mssql_connector_schema/mssql_connector_repository` | ||
|
||
## Push Ockam docker image and MS SQL Server client docker image | ||
|
||
```sh | ||
# Login to the repository | ||
docker login $repository_url | ||
|
||
# Push the Ockam docker image | ||
ockam_image="ghcr.io/build-trust/ockam:0.146.0@sha256:b13ed188dbde6f5cae9d2c9c9e9305f9c36a009b1e5c126ac0d066537510f895" | ||
docker pull $ockam_image && \ | ||
docker tag $ockam_image $repository_url/ockam:latest && \ | ||
docker push $repository_url/ockam:latest | ||
|
||
# Build and Push the MS SQL Server client docker image | ||
cd mssql_client && \ | ||
docker buildx build --platform linux/amd64 --load -t $repository_url/mssql_client:latest . && \ | ||
docker push $repository_url/mssql_client:latest && \ | ||
cd - | ||
``` | ||
|
||
## Create an Ockam node and Python Client to connect to MS SQL Server in Snowpark Container Services | ||
|
||
- Create network rules to allow the Ockam node to connect to your ockam project and for python client to connect to`ocsp.snowflakecomputing.com` | ||
|
||
```bash | ||
# Run from the same machine where you had enrolled to ockam project and created tickets | ||
snowsql -f snowflake_scripts/access.sql --variable egress_list=$(ockam project show --jq '.egress_allow_list[0]') | ||
``` | ||
|
||
- Create Service | ||
|
||
```bash | ||
# Replace the `TODO` values with the values for MS SQL Server | ||
snowsql -f snowflake_scripts/service.sql \ | ||
--variable ockam_ticket="$(ockam project ticket --usage-count 1 --expires-in 10m --attribute snowflake)" \ | ||
--variable mssql_database="TODO" \ | ||
--variable mssql_user="TODO" \ | ||
--variable mssql_password="TODO" | ||
``` | ||
|
||
- Ensure container services are running | ||
|
||
```sql | ||
-- Check service status | ||
USE WAREHOUSE MSSQL_CONNECTOR_WH; | ||
USE ROLE MSSQL_CONNECTOR_ROLE; | ||
USE DATABASE MSSQL_CONNECTOR_DB; | ||
USE SCHEMA MSSQL_CONNECTOR_SCHEMA; | ||
|
||
SHOW SERVICES; | ||
SELECT SYSTEM$GET_SERVICE_STATUS('MSSQL_CONNECTOR_CLIENT'); | ||
|
||
-- Check service logs | ||
CALL SYSTEM$GET_SERVICE_LOGS('MSSQL_CONNECTOR_CLIENT', '0', 'http-endpoint', 100); | ||
CALL SYSTEM$GET_SERVICE_LOGS('MSSQL_CONNECTOR_CLIENT', '0', 'ockam-inlet', 100); | ||
``` | ||
|
||
> [!IMPORTANT] | ||
> - `http-endpoint` is the endpoint that will be used to connect to the MS SQL Server. You will see `Successfully connected to SQL Server` in the logs upon successful connection. | ||
> - `ockam-inlet` is the endpoint that will be used to connect to the Ockam node. Logs will indicate if there are any errors starting the node. | ||
## Create table in Snowflake and grant access to the role | ||
|
||
```sql | ||
--Create demo table in Snowflake | ||
CREATE TABLE MSSQL_CONNECTOR_DB.MSSQL_CONNECTOR_SCHEMA.CUSTOMERS (id INT PRIMARY KEY, name VARCHAR(100)); | ||
|
||
--Grant access to the role | ||
GRANT SELECT, INSERT, UPDATE, DELETE, TRUNCATE ON TABLE MSSQL_CONNECTOR_DB.MSSQL_CONNECTOR_SCHEMA.CUSTOMERS TO ROLE MSSQL_CONNECTOR_ROLE; | ||
|
||
``` | ||
|
||
## Create Stored Procedure to execute the function | ||
|
||
```bash | ||
# Run the functions script to create the stored procedures and functions | ||
snowsql -f snowflake_scripts/functions.sql | ||
|
||
``` | ||
|
||
## Access MS SQL Server from Snowflake | ||
|
||
- Copy data from MS SQL Server to Snowflake | ||
|
||
```sql | ||
-- Copy data from MS SQL Server to Snowflake | ||
CALL ockam_mssql_copy( | ||
'SOURCE_TABLE', | ||
'TARGET_TABLE' | ||
); | ||
|
||
-- Example using demo tables created in MS SQL Server and Snowflake | ||
CALL ockam_mssql_copy( | ||
'ockam_mssql_demo.dbo.customers', | ||
'MSSQL_CONNECTOR_DB.MSSQL_CONNECTOR_SCHEMA.CUSTOMERS' | ||
); | ||
``` | ||
|
||
## Clean up | ||
|
||
```bash | ||
snowsql -f snowflake_scripts/cleanup.sql | ||
``` |
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
12 changes: 12 additions & 0 deletions
12
examples/command/portals/snowflake/example-8/mssql_client/Dockerfile
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
ARG BASE_IMAGE=python:3.10-slim-buster | ||
FROM $BASE_IMAGE | ||
|
||
RUN pip install --upgrade pip && \ | ||
pip install flask snowflake snowflake-connector-python snowflake-snowpark-python pandas | ||
|
||
RUN pip install pymssql==2.2.11 | ||
|
||
COPY service.py ./ | ||
COPY connection.py ./ | ||
|
||
CMD ["python3", "service.py"] |
53 changes: 53 additions & 0 deletions
53
examples/command/portals/snowflake/example-8/mssql_client/connection.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
import os | ||
import logging | ||
import snowflake.connector | ||
from snowflake.snowpark import Session | ||
|
||
|
||
def session() -> Session: | ||
""" | ||
Create a session for the connection | ||
:return: Session | ||
""" | ||
logging.info(f"Create a session") | ||
return Session.builder.configs({"connection": connection()}).create() | ||
|
||
|
||
def connection() -> snowflake.connector.SnowflakeConnection: | ||
""" | ||
Create a connection, either from inside the native application when deployed | ||
or with user/password credentials when testing locally | ||
:return: A SnowflakeConnection | ||
""" | ||
logging.info(f"Create a connection") | ||
if os.path.isfile("/snowflake/session/token"): | ||
logging.info(f"Use an OAUTH token") | ||
creds = { | ||
'account': os.getenv('SNOWFLAKE_ACCOUNT'), | ||
'host': os.getenv('SNOWFLAKE_HOST'), | ||
'port': os.getenv('SNOWFLAKE_PORT'), | ||
'protocol': 'https', | ||
'warehouse': os.getenv('SNOWFLAKE_WAREHOUSE'), | ||
'database': os.getenv('SNOWFLAKE_DATABASE'), | ||
'schema': os.getenv('SNOWFLAKE_SCHEMA'), | ||
'role': os.getenv('SNOWFLAKE_ROLE'), | ||
'authenticator': "oauth", | ||
'token': open('/snowflake/session/token', 'r').read(), | ||
'client_session_keep_alive': True, | ||
'ocsp_fail_open': False, | ||
'validate_default_parameters': True, | ||
} | ||
#logging.info(f"the creds are {creds}") | ||
else: | ||
creds = { | ||
'account': os.getenv('SNOWFLAKE_ACCOUNT'), | ||
'user': os.getenv('SNOWFLAKE_USER'), | ||
'password': os.getenv('SNOWFLAKE_PASSWORD'), | ||
'warehouse': os.getenv('SNOWFLAKE_WAREHOUSE'), | ||
'database': os.getenv('SNOWFLAKE_DATABASE'), | ||
'schema': os.getenv('SNOWFLAKE_SCHEMA'), | ||
'client_session_keep_alive': True | ||
} | ||
|
||
return snowflake.connector.connect(**creds) |
137 changes: 137 additions & 0 deletions
137
examples/command/portals/snowflake/example-8/mssql_client/service.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
import logging | ||
import os | ||
import sys | ||
import pymssql | ||
import connection | ||
from flask import Flask, request | ||
from snowflake import connector | ||
from snowflake.connector.errors import ProgrammingError, DatabaseError | ||
|
||
# Environment variables | ||
WAREHOUSE = os.getenv('SNOWFLAKE_WAREHOUSE', "reference('WAREHOUSE')") | ||
LOG_LEVEL = os.getenv('LOG_LEVEL', 'INFO').upper() | ||
logging.basicConfig(level=LOG_LEVEL, format='%(asctime)s - %(levelname)s - %(message)s') | ||
|
||
app = Flask(__name__) | ||
|
||
# MSSQL Configuration | ||
MSSQL_USER = os.environ.get('MSSQL_USER') | ||
MSSQL_PASSWORD = os.environ.get('MSSQL_PASSWORD') | ||
MSSQL_DATABASE = os.environ.get('MSSQL_DATABASE') | ||
MSSQL_SERVER = os.environ.get('ENDPOINT_HOST', 'localhost') | ||
MSSQL_PORT = os.environ.get('ENDPOINT_PORT', '1433') | ||
|
||
# Snowflake session | ||
session = None | ||
|
||
# MSSQL Connection | ||
def get_mssql_connection(): | ||
"""Create MSSQL connection""" | ||
return pymssql.connect( | ||
server=MSSQL_SERVER, | ||
user=MSSQL_USER, | ||
password=MSSQL_PASSWORD, | ||
database=MSSQL_DATABASE, | ||
port=MSSQL_PORT | ||
) | ||
|
||
def test_mssql_connection(): | ||
"""Test the MSSQL database connection""" | ||
try: | ||
with get_mssql_connection() as conn: | ||
with conn.cursor() as cursor: | ||
cursor.execute("SELECT @@VERSION AS SQLServerVersion;") | ||
version = cursor.fetchone()[0] | ||
logging.info(f"Successfully connected to SQL Server") | ||
logging.info(f"Server Version: {version}") | ||
except Exception as e: | ||
logging.error(f"Connection test failed: {str(e)}") | ||
raise | ||
|
||
# Snowflake Query | ||
def execute_snowflake_query(query, values=[]) -> bool: | ||
"""Execute a Snowflake SQL query""" | ||
try: | ||
logging.info(f"Executing query: {query}, with values {values}") | ||
session.sql(query, values).collect() | ||
logging.info(f"Query execution successful") | ||
return True | ||
except (ProgrammingError, DatabaseError) as e: | ||
logging.error(f"Snowflake Error: {type(e).__name__} - {str(e)}") | ||
return False | ||
except Exception as e: | ||
logging.error(f"Unexpected error executing query: {type(e).__name__} - {str(e)}") | ||
return False | ||
|
||
def use_snowflake_referenced_warehouse(): | ||
"""Use the warehouse referenced as 'WAREHOUSE' for the current session""" | ||
try: | ||
logging.info(f"Use the referenced warehouse") | ||
result = session.sql(f"USE WAREHOUSE {WAREHOUSE}").collect() | ||
logging.info(f"Result of USE WAREHOUSE: {result}.") | ||
except Exception as e: | ||
logging.error(f"Cannot use the referenced warehouse: {type(e).__name__} - {str(e)}") | ||
raise | ||
|
||
# Snowflake Copy | ||
@app.route("/copy_to_snowflake", methods=["POST"]) | ||
def copy_to_snowflake(): | ||
message = request.json | ||
logging.info(f"Received message: {message}") | ||
|
||
source_table = message['data'][0][1] | ||
target_table = message['data'][0][2] | ||
|
||
logging.info(f"Copying from {source_table} to {target_table}") | ||
|
||
try: | ||
# Get data from MSSQL | ||
with get_mssql_connection() as conn: | ||
with conn.cursor() as cursor: | ||
cursor.execute(f"SELECT * FROM {source_table}") | ||
columns = [column[0] for column in cursor.description] | ||
rows = [list(row) for row in cursor.fetchall()] | ||
|
||
# Format data for Snowflake | ||
columns_str = ", ".join(columns) | ||
values_list = [] | ||
for row in rows: | ||
values = [f"'{str(val)}'" if isinstance(val, str) else str(val) for val in row] | ||
values_list.append(f"({', '.join(values)})") | ||
|
||
# Insert data into Snowflake | ||
insert_sql = f"INSERT INTO {target_table} ({columns_str}) VALUES {', '.join(values_list)}" | ||
result = execute_snowflake_query(insert_sql) | ||
if result is False: | ||
error_msg = "Failed to insert data into target table. Check service logs for more details." | ||
logging.error(error_msg) | ||
return { | ||
'data': [[0, error_msg]] | ||
} | ||
|
||
rows_inserted = len(rows) | ||
return { | ||
'data': [[0, f"Successfully copied {rows_inserted} rows from {source_table} to {target_table}"]] | ||
} | ||
except Exception as e: | ||
error_msg = str(e) | ||
logging.error(f"Error: {error_msg}") | ||
return { | ||
'data': [[0, f"Error: {error_msg}"]] | ||
} | ||
|
||
def main(): | ||
global session | ||
# Connect to Snowflake | ||
session = connection.session() | ||
try: | ||
logging.info(f"Start the server") | ||
use_snowflake_referenced_warehouse() | ||
test_mssql_connection() | ||
app.run(host='0.0.0.0', port=8080) | ||
except Exception as e: | ||
logging.error(f"Fatal error in main: {e}") | ||
sys.exit(1) | ||
|
||
if __name__ == '__main__': | ||
main() |
Oops, something went wrong.