Skip to content

Commit

Permalink
Updates to the pipelines GH summarization lab to demonstrate componen…
Browse files Browse the repository at this point in the history
…t input/output (kubeflow#669)

* copy and training step params, remove unused args,
use google-samples images

* update notebook to reflect new pipeline

* type definition change

* fix typo, use kfp.dsl.RUN_ID_PLACEHOLDER

* change 'serve' setp to use gcp secret- req'd for 0.7
  • Loading branch information
amygdala authored Oct 27, 2019
1 parent 7e28cd6 commit 452aa42
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ RUN pip install tensorflow-probability==0.5
RUN pip install tensor2tensor==1.11.0
RUN pip install tensorflow_hub==0.1.1
RUN pip install pyyaml==3.12 six==1.11.0
RUN pip install google-cloud-storage
RUN pip install google-cloud-storage pathlib2

RUN wget -nv https://dl.google.com/dl/cloudsdk/release/google-cloud-sdk.zip && \
unzip -qq google-cloud-sdk.zip -d /tools && \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ metadata:
labels:
add-pod-env: 'true'
inputs:
- name: working_dir
description: '...'
type: GCSPath
- name: data_dir
description: '...'
type: GCSPath
Expand All @@ -35,15 +32,19 @@ inputs:
- name: action
description: '...'
type: String
outputs:
- name: copy_output_path
description: '...'
type: GCSPath
implementation:
container:
image: gcr.io/google-samples/ml-pipeline-t2ttrain:v2ap
image: gcr.io/google-samples/ml-pipeline-t2ttrain:v3ap
args: [
--data-dir, {inputValue: data_dir},
--checkpoint-dir, {inputValue: checkpoint_dir},
--action, {inputValue: action},
--working-dir, {inputValue: working_dir},
--model-dir, {inputValue: model_dir}
--model-dir, {inputValue: model_dir},
--copy-output-path, {outputPath: copy_output_path}
]
env:
KFP_POD_NAME: "{{pod.name}}"
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,15 @@
from urlparse import urlparse

from google.cloud import storage
import pathlib2


# location of the model checkpoint from which we'll start our training
SOURCE_BUCKET = 'aju-dev-demos-codelabs'
PREFIX = 'kubecon/model_output_tbase.bak2019000/'
COPY_ACTION = 'copy_data'
TRAIN_ACTION = 'train'
PROBLEM = 'gh_problem'
OUTPUT_PATH = '/tmp/output'



def copy_blob(storage_client, source_bucket, source_blob, target_bucket_name, new_blob_name,
new_blob_prefix, prefix):
"""Copies a blob from one bucket to another with a new name."""
Expand All @@ -49,17 +46,26 @@ def copy_blob(storage_client, source_bucket, source_blob, target_bucket_name, ne
str(source_blob.name), str(source_bucket.name), str(new_blob.name), str(target_bucket.name))


def copy_checkpoint(new_blob_prefix, target_bucket):
def copy_checkpoint(checkpoint_dir, model_dir):
"""Copy an existing model checkpoint directory to the working directory for the workflow,
so that the training can start from that point.
"""

storage_client = storage.Client()
source_bucket = storage_client.bucket(SOURCE_BUCKET)
retries = 10

source_bucket_string = urlparse(checkpoint_dir).netloc
source_prefix = checkpoint_dir.replace('gs://' + source_bucket_string + '/', '')
logging.info("source bucket %s and prefix %s", source_bucket_string, source_prefix)
source_bucket = storage_client.bucket(source_bucket_string)

target_bucket = urlparse(model_dir).netloc
logging.info("target bucket: %s", target_bucket)
new_blob_prefix = model_dir.replace('gs://' + target_bucket + '/', '')
logging.info("new_blob_prefix: %s", new_blob_prefix)

# Lists objects with the given prefix.
blob_list = list(source_bucket.list_blobs(prefix=PREFIX))
blob_list = list(source_bucket.list_blobs(prefix=source_prefix))
logging.info('Copying files:')
for blob in blob_list:
sleeptime = 0.1
Expand All @@ -68,7 +74,7 @@ def copy_checkpoint(new_blob_prefix, target_bucket):
logging.info('copying %s; retry %s', blob.name, num_retries)
try:
copy_blob(storage_client, source_bucket, blob, target_bucket, blob.name, new_blob_prefix,
PREFIX)
source_prefix)
break
except Exception as e: #pylint: disable=broad-except
logging.warning(e)
Expand Down Expand Up @@ -97,7 +103,6 @@ def run_training(args, data_dir, model_dir, problem):
# print(result2)

# then export the model...

model_export_command = ['t2t-exporter', '--model', 'transformer',
'--hparams_set', 'transformer_prepend',
'--problem', problem,
Expand All @@ -124,17 +129,21 @@ def main():
help='...',
required=True)
parser.add_argument(
'--working-dir',
'--data-dir',
help='...',
required=True)
parser.add_argument(
'--data-dir',
'--copy-output-path',
help='...',
required=True)
)
parser.add_argument(
'--train-output-path',
help='...',
)
parser.add_argument( # used for the copy step only
'--checkpoint-dir',
help='...',
required=True)
required=False)
parser.add_argument(
'--train-steps',
help='...')
Expand All @@ -145,34 +154,37 @@ def main():

args = parser.parse_args()

# Create metadata.json file for visualization.
metadata = {
'outputs' : [{
'type': 'tensorboard',
'source': args.model_dir,
}]
}
with open('/mlpipeline-ui-metadata.json', 'w') as f:
json.dump(metadata, f)

data_dir = args.data_dir
logging.info("data dir: %s", data_dir)

# model_startpoint = args.checkpoint_dir
logging.info("model_startpoint: %s", args.checkpoint_dir)
model_dir = args.model_dir
logging.info("model_dir: %s", model_dir)

if args.action.lower() == COPY_ACTION:
# copy over the checkpoint directory
target_bucket = urlparse(args.working_dir).netloc
logging.info("target bucket: %s", target_bucket)
new_blob_prefix = model_dir.replace('gs://' + target_bucket + '/', '')
logging.info("new_blob_prefix: %s", new_blob_prefix)
copy_checkpoint(new_blob_prefix, target_bucket)
logging.info("model starting checkpoint: %s", args.checkpoint_dir)
copy_checkpoint(args.checkpoint_dir, model_dir)
# write the model dir path as an output param
logging.info("copy_output_path: %s", args.copy_output_path)
pathlib2.Path(args.copy_output_path).parent.mkdir(parents=True)
pathlib2.Path(args.copy_output_path).write_text(model_dir.decode('utf-8'))

elif args.action.lower() == TRAIN_ACTION:
# launch the training job
run_training(args, data_dir, model_dir, PROBLEM)
# write the model export path as an output param
logging.info("train_output_path: %s", args.train_output_path)
pathlib2.Path(args.train_output_path).parent.mkdir(parents=True)
export_dir = '%s/export' % model_dir
pathlib2.Path(args.train_output_path).write_text(export_dir.decode('utf-8'))
# Create metadata.json file for Tensorboard 'artifact'
metadata = {
'outputs' : [{
'type': 'tensorboard',
'source': model_dir,
}]
}
with open('/mlpipeline-ui-metadata.json', 'w') as f:
json.dump(metadata, f)

else:
logging.warning("Error: unknown action mode %s", args.action)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,10 @@ inputs:
- name: train_steps
description: '...'
type: Integer
default: '2019300'
- name: working_dir
description: '...'
type: GCSPath
default: 2019300
- name: data_dir
description: '...'
type: GCSPath
- name: checkpoint_dir
description: '...'
type: GCSPath
- name: model_dir
description: '...'
type: GCSPath
Expand All @@ -43,22 +37,27 @@ inputs:
description: '...'
type: String
outputs:
- name: output
- name: launch_server
description: '...'
type: String
- name: train_output_path
description: '...'
type: GCSPath
- name: MLPipeline UI metadata
type: UI metadata
implementation:
container:
image: gcr.io/google-samples/ml-pipeline-t2ttrain:v2ap
image: gcr.io/google-samples/ml-pipeline-t2ttrain:v3ap
args: [
--data-dir, {inputValue: data_dir},
--checkpoint-dir, {inputValue: checkpoint_dir},
--action, {inputValue: action},
--working-dir, {inputValue: working_dir},
--model-dir, {inputValue: model_dir},
--train-steps, {inputValue: train_steps},
--deploy-webapp, {inputValue: deploy_webapp}
--deploy-webapp, {inputValue: deploy_webapp},
--train-output-path, {outputPath: train_output_path}
]
env:
KFP_POD_NAME: "{{pod.name}}"
fileOutputs:
output: /tmp/output
launch_server: /tmp/output
MLPipeline UI metadata: /mlpipeline-ui-metadata.json
47 changes: 22 additions & 25 deletions github_issue_summarization/pipelines/example_pipelines/gh_summ.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import kfp.dsl as dsl
import kfp.gcp as gcp
import kfp.components as comp
from kfp.dsl.types import GCSPath, String


COPY_ACTION = 'copy_data'
Expand All @@ -25,11 +26,11 @@
MODEL = 'model'

copydata_op = comp.load_component_from_url(
'https://raw.githubusercontent.com/kubeflow/examples/master/github_issue_summarization/pipelines/components/t2t/datacopy_component.yaml' # pylint: disable=line-too-long
'https://raw.githubusercontent.com/amygdala/kubeflow-examples/ghpl_update/github_issue_summarization/pipelines/components/t2t/datacopy_component.yaml' # pylint: disable=line-too-long
)

train_op = comp.load_component_from_url(
'https://raw.githubusercontent.com/kubeflow/examples/master/github_issue_summarization/pipelines/components/t2t/train_component.yaml' # pylint: disable=line-too-long
'https://raw.githubusercontent.com/amygdala/kubeflow-examples/ghpl_update/github_issue_summarization/pipelines/components/t2t/train_component.yaml' # pylint: disable=line-too-long
)

metadata_log_op = comp.load_component_from_url(
Expand All @@ -41,37 +42,34 @@
description='Demonstrate Tensor2Tensor-based training and TF-Serving'
)
def gh_summ( #pylint: disable=unused-argument
train_steps=2019300,
project='YOUR_PROJECT_HERE',
github_token='YOUR_GITHUB_TOKEN_HERE',
working_dir='YOUR_GCS_DIR_HERE',
checkpoint_dir='gs://aju-dev-demos-codelabs/kubecon/model_output_tbase.bak2019000',
deploy_webapp='true',
data_dir='gs://aju-dev-demos-codelabs/kubecon/t2t_data_gh_all/'
train_steps: 'Integer' = 2019300,
project: String = 'YOUR_PROJECT_HERE',
github_token: String = 'YOUR_GITHUB_TOKEN_HERE',
working_dir: GCSPath = 'gs://YOUR_GCS_DIR_HERE',
checkpoint_dir: GCSPath = 'gs://aju-dev-demos-codelabs/kubecon/model_output_tbase.bak2019000/',
deploy_webapp: String = 'true',
data_dir: GCSPath = 'gs://aju-dev-demos-codelabs/kubecon/t2t_data_gh_all/'
):


copydata = copydata_op(
working_dir=working_dir,
data_dir=data_dir,
checkpoint_dir=checkpoint_dir,
model_dir='%s/%s/model_output' % (working_dir, '{{workflow.name}}'),
action=COPY_ACTION
model_dir='%s/%s/model_output' % (working_dir, dsl.RUN_ID_PLACEHOLDER),
action=COPY_ACTION,
).apply(gcp.use_gcp_secret('user-gcp-sa'))


log_dataset = metadata_log_op(
log_type=DATASET,
workspace_name=WORKSPACE_NAME,
run_name='{{workflow.name}}',
run_name=dsl.RUN_ID_PLACEHOLDER,
data_uri=data_dir
)

train = train_op(
working_dir=working_dir,
data_dir=data_dir,
checkpoint_dir=checkpoint_dir,
model_dir='%s/%s/model_output' % (working_dir, '{{workflow.name}}'),
model_dir=copydata.outputs['copy_output_path'],
action=TRAIN_ACTION, train_steps=train_steps,
deploy_webapp=deploy_webapp
).apply(gcp.use_gcp_secret('user-gcp-sa'))
Expand All @@ -80,29 +78,28 @@ def gh_summ( #pylint: disable=unused-argument
log_model = metadata_log_op(
log_type=MODEL,
workspace_name=WORKSPACE_NAME,
run_name='{{workflow.name}}',
model_uri='%s/%s/model_output' % (working_dir, '{{workflow.name}}')
run_name=dsl.RUN_ID_PLACEHOLDER,
model_uri=train.outputs['train_output_path']
)

serve = dsl.ContainerOp(
name='serve',
image='gcr.io/google-samples/ml-pipeline-kubeflow-tfserve',
arguments=["--model_name", 'ghsumm-%s' % ('{{workflow.name}}',),
"--model_path", '%s/%s/model_output/export' % (working_dir, '{{workflow.name}}')
arguments=["--model_name", 'ghsumm-%s' % (dsl.RUN_ID_PLACEHOLDER,),
"--model_path", train.outputs['train_output_path']
]
)
).apply(gcp.use_gcp_secret('user-gcp-sa'))

log_dataset.after(copydata)
train.after(copydata)
log_model.after(train)
serve.after(train)
train.set_gpu_limit(1)
train.set_memory_limit('48G')

with dsl.Condition(train.output == 'true'):
with dsl.Condition(train.outputs['launch_server'] == 'true'):
webapp = dsl.ContainerOp(
name='webapp',
image='gcr.io/google-samples/ml-pipeline-webapp-launcher:v2ap',
arguments=["--model_name", 'ghsumm-%s' % ('{{workflow.name}}',),
arguments=["--model_name", 'ghsumm-%s' % (dsl.RUN_ID_PLACEHOLDER,),
"--github_token", github_token]

)
Expand Down
Binary file not shown.
Loading

0 comments on commit 452aa42

Please sign in to comment.