Skip to content

Commit

Permalink
Init - CA Notify
Browse files Browse the repository at this point in the history
  • Loading branch information
jzingh98 committed Dec 23, 2020
1 parent 0083084 commit 1f2699a
Show file tree
Hide file tree
Showing 15 changed files with 424 additions and 1 deletion.
9 changes: 9 additions & 0 deletions CCRStoAPI/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import azure.functions as func
import logging
from .process_data import orchestrate_epochs


def main(req: func.HttpRequest) -> func.HttpResponse:
logging.info("main".upper())
orchestrate_epochs()
return func.HttpResponse('Completed')
39 changes: 39 additions & 0 deletions CCRStoAPI/acquire_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import logging
import snowflake.connector
from .config import DB_PASSWORD, DB_SCHEMA, ACQUIRE_DATA_QUERY


def acquire_data():
logging.info('acquire_data'.upper())
df = get_data_from_snowflake()
data = []
for index, row in df.iterrows():
data.append(row)
return data


def get_data_from_snowflake():
logging.info('get_data_from_snowflake'.upper())
try:
con = get_connection()
results_iterable = con.cursor().execute(ACQUIRE_DATA_QUERY)
logging.info("Total Records: " + str(results_iterable.rowcount))
df = results_iterable.fetch_pandas_all()
logging.info(df)
return df
except Exception as e:
logging.error("Exception in get_data_from_snowflake: " + str(e))
return e


def get_connection():
logging.info('get_connection'.upper())
return snowflake.connector.connect(
user='CANOTIFY_USER',
password=DB_PASSWORD,
account='your_snowflake_account',
database='CA_NOTIFY',
schema=DB_SCHEMA,
warehouse='VWH_CA_NOTIFY',
role='CA_NOTIFY_ROLE'
)
21 changes: 21 additions & 0 deletions CCRStoAPI/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import os

# VERIFICATION SERVER
APHL_ADMIN_API_KEY = os.environ['APP_APHL_ADMIN_API_KEY']
APHL_ADMIN_URL = os.environ['APP_APHL_ADMIN_URL']

# TWILIO
ACCOUNT_SID = os.environ['APP_TWILIO_ACCT_ID']
AUTH_TOKEN = os.environ['APP_TWILIO_TOKEN']

# DATABASE
ACQUIRE_DATA_QUERY = os.environ['APP_ACQUIRE_DATA_QUERY']
INSERT_WRITEBACK_QUERY = os.environ['APP_INSERT_WRITEBACK_QUERY'] + " "
VAL = "('{}',{},Current_timestamp())"
DB_SCHEMA = os.environ['APP_DB_SCHEMA']
DB_PASSWORD = os.environ['APP_DB_PASSWORD']

# THROUGHPUT
EPOCH_COUNT = int(os.environ['APP_EPOCH_COUNT'])
BATCH_SIZE = int(os.environ['APP_BATCH_SIZE'])
SLEEP_TIME = float(os.environ['APP_SLEEP_TIME'])
20 changes: 20 additions & 0 deletions CCRStoAPI/function.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"scriptFile": "__init__.py",
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "req",
"methods": [
"get",
"post"
]
},
{
"type": "http",
"direction": "out",
"name": "$return"
}
]
}
50 changes: 50 additions & 0 deletions CCRStoAPI/pre_check.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import logging
import threading
from twilio.rest import Client
from twilio.base.exceptions import TwilioRestException
from .config import ACCOUNT_SID, AUTH_TOKEN


twilio_client = Client(ACCOUNT_SID, AUTH_TOKEN)


def pre_check_and_annotate(objects_list):
logging.info("execute_pre_check: ".upper())
annotated_list = []
threads = []
for request_obj in objects_list:
logging.info("Creating Thread For: ".upper() + str(request_obj))
annotated_list.append(request_obj)
x = threading.Thread(target=is_phone_valid, args=(request_obj,))
threads.append(x)
for thread in threads:
logging.info("Starting Thread: ".upper() + str(thread))
thread.start()
for thread in threads:
logging.info("Joining Thread: ".upper() + str(thread))
thread.join()
return annotated_list


def is_phone_valid(request_obj):
logging.info("is_phone_valid: ".upper() + str(request_obj))
if twilio_lookup_is_valid_mobile(request_obj["phone"]):
request_obj["pre_check"] = True
else:
request_obj["pre_check"] = False


def twilio_lookup_is_valid_mobile(phone):
logging.info("validate_phone_number: ".upper() + str(phone))
try:
phone_number = twilio_client.lookups.phone_numbers(
str(phone)).fetch(type=['carrier'])
if (phone_number.carrier['type']
and phone_number.carrier['type'] != "mobile") \
or len(phone) < 10 or len(phone) > 11:
return False
else:
return True
except TwilioRestException as e:
logging.info("Exception: " + str(phone) + " " + str(e))
return False
101 changes: 101 additions & 0 deletions CCRStoAPI/process_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import logging
import time
import random
import string
import base64
from .config import EPOCH_COUNT, BATCH_SIZE, SLEEP_TIME
from .acquire_data import acquire_data
from .pre_check import pre_check_and_annotate
from .write_back import write_back
from .verification_server import call_verification_server, evaluate_response


def orchestrate_epochs():
logging.info("orchestrate_epochs".upper())
t0 = time.time()
for x in range(0, EPOCH_COUNT):
execute_single_epoch(x)
t1 = time.time()
total_time = t1 - t0
logging.info("TIME: " + str(total_time))


def execute_single_epoch(epoch_num):
logging.info(f"execute_single_epoch: ${epoch_num}".upper())
data = acquire_data()
t0 = time.time()
if len(data):
orchestrate_batches(data)
else:
logging.info("Data length 0. Skipping processing")
t1 = time.time()
sleeptime = SLEEP_TIME - (t1 - t0)
logging.info("Sleeptime: " + str(sleeptime))
if sleeptime > 0:
time.sleep(sleeptime)


def orchestrate_batches(data):
logging.info("orchestrate_batches")
batches = create_batches(data)
for index, batch in enumerate(batches):
logging.info("Now processing batch: " + str(index))
execute_single_batch(batch)


def create_batches(data):
logging.info("create_batches")
for i in range(0, len(data), BATCH_SIZE):
yield data[i:i + BATCH_SIZE]


def execute_single_batch(batch):
logging.info("process_batch: ".upper())
valid_recipients, invalid_recipients = generate_recipient_lists(batch)
data_object = {"codes": valid_recipients}
response = call_verification_server(data_object)
valid_recipients_evaluated = evaluate_response(response, valid_recipients)
write_back(valid_recipients_evaluated, invalid_recipients)


def generate_recipient_lists(batch):
logging.info("generate_recipient_lists: ".upper())
objects_list = generate_request_objects_list(batch)
annotated_list = pre_check_and_annotate(objects_list)
valid_recipients = list(filter(
lambda x: x['pre_check'] is True, annotated_list))
invalid_recipients = list(filter(
lambda x: x['pre_check'] is False, annotated_list))
# Remove annotations
for item in valid_recipients:
del item['pre_check']
for item in invalid_recipients:
del item['pre_check']
return valid_recipients, invalid_recipients


def generate_request_objects_list(batch):
logging.info("generate_list_of_request_objects: ".upper())
objects_list = []
for row in batch:
request_obj = generate_request_object(row)
objects_list.append(request_obj)
return objects_list


def generate_request_object(row):
logging.info("create_request_object: ".upper() + str(row))
return {"testDate": str(row.DATE),
"testType": str(row.TESTTYPE),
"tzOffset": -420,
"phone": str(row.PHONE),
"padding": get_random_base64_string(),
"pre_check": False}


def get_random_base64_string():
logging.info("get_random_base64_string".upper())
length = random.randint(5, 20)
letters = string.ascii_lowercase
result_str = ''.join(random.choice(letters) for i in range(length))
return base64.b64encode(result_str.encode('utf-8')).decode('ascii')
3 changes: 3 additions & 0 deletions CCRStoAPI/sample.dat
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"name": "Azure"
}
36 changes: 36 additions & 0 deletions CCRStoAPI/verification_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import logging
import json
import requests
from .config import APHL_ADMIN_URL, APHL_ADMIN_API_KEY


def call_verification_server(request_obj):
logging.info("call_verification_server: ".upper() + str(request_obj))
try:
response = requests.post(APHL_ADMIN_URL, data=json.dumps(request_obj),
headers={
"content-type": "application/json",
"accept": "application/json",
"x-api-key": APHL_ADMIN_API_KEY})
logging.info("Google response = " + str(response))
return response.json()
except Exception as e:
logging.info("Google Exception = " + str(e))
return e


def evaluate_response(response, valid_recipients):
logging.info("evaluate_response: ".upper())
if(response.get("error", "")):
logging.info("Error in Google response".upper())
for item in valid_recipients:
singletonresponse = call_verification_server({"codes": [item]})
if(singletonresponse.get("error")):
item["result"] = 0
else:
item["result"] = 1
else:
logging.info("No errors in Google response".upper())
for item in valid_recipients:
item["result"] = 1
return valid_recipients
15 changes: 15 additions & 0 deletions CCRStoAPI/write_back.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import logging
from .config import INSERT_WRITEBACK_QUERY, VAL
from .acquire_data import get_connection


def write_back(valid_recipients_evaluated, invalid_recipients):
logging.info("write_back - V: ".upper() + str(valid_recipients_evaluated))
logging.info("write_back - I: ".upper() + str(invalid_recipients))
con = get_connection()

query_string = INSERT_WRITEBACK_QUERY
query_string += ','.join( [VAL.format(item["phone"],0) for item in invalid_recipients ] + [VAL.format(item["phone"],item["result"]) for item in valid_recipients_evaluated ])
con.cursor().execute(query_string)

return
51 changes: 50 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,50 @@
# CaNotify
# Introduction
SMS Text CA citizens who may have been exposed to people having COVID19 symptoms.
## Table of contents
* [Open Source Code Repos](#open-source-code-repos)
* [Programming Language](#programming-language)
* [External Dependencies](#external-dependencies)
* [Release Notes](#release-notes)
* [Technical Details](#technical-details)
* [Network Diagram](#network-diagram)

#Open Source Code Repos
https://github.com/StateOfCalifornia/CA_NOTIFY
#Programming Language
Python
#External-Dependencies
| SnowFlake<br><br>(Source Phone#)<br><br>Select 100 phone# @ a time<br><br> | Twillio<br> <br>(Phone# Validation API)<br><br>Takes only one phone# at a time, but the batch program threaded 10 simultaneous run at a time<br><br> | Google<br><br>(SMS Text API)<br><br> SMS Text 10 phone# at a time<br><br> |
|-------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------|---|---|
#Release Notes
|12/13/2020| Threaded 10 simultaneous run (one phone# @ a time) through Twillio and batch 10 phone# at a time to Google. Mark (1) as text sent, (0) as no text sent. Update write back table to SnowFlake to filter out already processed phone# for next batch of ten (10).|
|-------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------|---|---|

# Technical Details
The execution of the CA Notify pipeline can be visualized as a series of epochs each consisting of multiple batches. The main function calls **orchestrate_epochs** to initiate this process. The number of epochs to execute in a single run of the pipeline is defined in the environment variable **EPOCH_COUNT**.

The execution of each epoch occurs in **execute_single_epoch**. Each epoch consists of a single call to **acquire_data** to obtain up to 100 records in bulk from Snowflake, followed by a call to **orchestrate_batches.** Orchestrate_batches accepts the data obtained from Snowflake to iteratively create and execute batches of **BATCH_SIZE** using the **execute_single_batch** function.

Before proceeding to discuss the specifics of **execute_single_batch**, it's worth noting that BATCH_SIZE and EPOCH_COUNT are hyperparameters that may be adjusted to optimize performance and reliability. Our constraints are the following:
- BATCH_SIZE must be less than 10 due to Google Verification Server requirements.
- EPOCH_COUNT should be as high as possible without being so high that the total execution lasts longer than two minutes (this is due to an Azure Functions constraint and may vary on other cloud environments)
- The combined EPOCH_COUNT and BATCH_SIZE execution should yield runs that process as many records as possible without exceeding the max limit from Google Verification Server

The optimal values for us were 3 for EPOCH_COUNT and 10 for BATCH_SIZE. To provide an additional buffer in cases where the epochs execute faster than the acceptable limit we enforce a minimum time of 60 Seconds per epoch (with any difference being made up with sleeps between epochs).

**Execute_single_batch** runs the following sequence of steps on each batch:
1. Preprocesses the phone numbers in each batch using generate_recepient_lists function. This function uses the Twilio lookup API via a call to pre_check_and_annotate in pre_check.py to determine whether a phone is a valid or invalid. The exact implementation of pre_check_and_annotate employs multithreading to process all the records in a batch in parallel.
2. The valid_recepients list is wrapped into a Python object and passed to call_verification_server in verification_server.py.
3. The response from the verification server is analyzed to denote whether each phone was successful or not by evaluate_response.
4. Both the evaluated responses (in valid_recepients_evaluated) and invalid_recepients are passed to the writeback function. We only care about whether a message was successfully sent to the repience or not - not about why it wasn't sent in the event of a failure. Thus, valid_recepients_evaluated contains both passes that sent, and fails that failed on the Google verification server. Invalid_recepients contains those records that failed the pre check and were never even sent to the verification server. Write_back joins the results of these two lists and passes the data to Snowflake in a single write per batch.

Both the evaluated responses (in valid_recepients_evaluated) and invalid_recepients are passed to the writeback function. We only care about whether a message was successfully sent to the repience or not - not about why it wasn't sent in the event of a failure. Thus, valid_recepients_evaluated contains both passes that sent, and fails that failed on the Google verification server. Invalid_recepients contains those records that failed the pre check and were never even sent to the verification server. Write_back joins the results of these two lists and passes the data to Snowflake in a single write per batch.

The key optimization decision that significantly improved the throughput of our pipeline were:
- Reading from Snowflake only once *per epoch*
- Writing to Snowflake only once *per batch*
- Calling the Google Verification server using the batch-issue endpoint
- Parallelizing the calls to Twilio using multithreading
#Network Diagram
![Network Diagram](images/NetworkDiagram.png)


15 changes: 15 additions & 0 deletions host.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"version": "2.0",
"logging": {
"applicationInsights": {
"samplingSettings": {
"isEnabled": true,
"excludedTypes": "Request"
}
}
},
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[1.*, 2.0.0)"
}
}
Binary file added images/NetworkDiagram.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
22 changes: 22 additions & 0 deletions local.settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "***",
"FUNCTIONS_WORKER_RUNTIME": "python",
"python.pythonPath": "***",
"AzureWebJobs.raphaeltest.Disabled": "true",
"AzureWebJobs.FA-CDT-CCRSAPI-D-01.Disabled": "true",
"APP_APHL_ADMIN_API_KEY": "",
"APP_APHL_ADMIN_URL": "https://adminapi.encv.org/api/batch-issue",
"APP_TWILIO_ACCT_ID": "***",
"APP_TWILIO_TOKEN": "***",
"APP_DB_SCHEMA": "***",
"APP_DB_PASSWORD": "***",
"APP_ACQUIRE_DATA_QUERY": "SELECT * FROM your_schema.your_table LIMIT 100",
"APP_INSERT_WRITEBACK_QUERY": "insert into your_schema.your_write_back_table (NUMBER, MESSAGE_SENT,MESSAGE_SENT_AT ) VALUES ",
"APP_INSERT_VAL": "('{}',{},Current_timestamp())",
"APP_EPOCH_COUNT" : 1,
"APP_BATCH_SIZE" : 10,
"APP_SLEEP_TIME" : 60.1,
}
}
Loading

0 comments on commit 1f2699a

Please sign in to comment.