-
Notifications
You must be signed in to change notification settings - Fork 660
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[flytepropeller][flyteadmin] Streaming Decks V2 #6053
base: master
Are you sure you want to change the base?
Conversation
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]> Co-authored-by: Yi Cheng <[email protected]> Co-authored-by: pingsutw <[email protected]>
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #6053 +/- ##
=======================================
Coverage 37.08% 37.09%
=======================================
Files 1318 1318
Lines 132284 132307 +23
=======================================
+ Hits 49062 49073 +11
- Misses 78950 78960 +10
- Partials 4272 4274 +2
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
Signed-off-by: Future-Outlier <[email protected]>
switch pluginTrns.pInfo.Phase() { | ||
case pluginCore.PhaseSuccess: | ||
// This is to prevent the console from potentially checking the deck URI that does not exist if in final phase(PhaseSuccess). | ||
err = pluginTrns.RemoveNonexistentDeckURI(ctx, tCtx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this do a head
call on the deck URI for every task that succeeds? Two thoughts here:
(1) does the flyteadmin merge algorithm then remove the deckURI from the execution metadata?
(2) this is incurring a 20-30ms performance degredation to every task execution
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will take a look tmr, thank you!!!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this do a head call on the deck URI for every task that succeeds?
yes it will do a head
call by RemoteFileOutputReader
flyte/flyteplugins/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go
Lines 306 to 313 in b3330ba
func (r RemoteFileOutputReader) DeckExists(ctx context.Context) (bool, error) { | |
md, err := r.store.Head(ctx, r.outPath.GetDeckPath()) | |
if err != nil { | |
return false, err | |
} | |
return md.Exists(), nil | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how do you know the performance degradation?
did you use grafana or other performance tools?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does the flyteadmin merge algorithm then remove the deckURI from the execution metadata?
flyteadmin will set the deckURI
in the execution metadata
to nil
if the propeller removes it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @hamersaw
How to test it?
flytectl demo start --image futureoutlier/streaming-deck:1128 --force cd flytekit
gh pr checkout 2779
from flytekit import ImageSpec, task, workflow
from flytekit.deck import Deck
flytekit_hash = "fc5578f74c7193245c051d1470c61f0f14619217"
flytekit = f"git+https://github.com/flyteorg/flytekit.git@{flytekit_hash}"
# Define custom image for the task
custom_image = ImageSpec(packages=[flytekit],
apt_packages=["git"],
registry="localhost:30000",
)
@task(enable_deck=True, container_image=custom_image)
def t_deck():
import time
"""
1st deck only show timeline deck
2nd will show
"""
for i in range(2):
Deck.publish()
time.sleep(10)
raise Exception("This is an exception")
@workflow
def wf():
t_deck()
if __name__ == "__main__":
from flytekit.clis.sdk_in_container import pyflyte
from click.testing import CliRunner
import os
runner = CliRunner()
path = os.path.realpath(__file__)
result = runner.invoke(pyflyte.main,
["run", "--remote", path,"wf"])
# "--remote",
print("Remote Execution: ", result.output) |
Mind adding screenshots for the rendered deck and refresh to the PR description? |
Yes no problem |
its provided! |
Tracking issue
#5574
Why are the changes needed?
To enhance user visibility into Flyte Decks at different stages of workflow execution (running, failing, and succeeding), enabling better debugging and analysis.
What changes were proposed in this pull request?
(1) When the Flytekit task pod publishes the Deck, Propeller updates the Deck URI in the node event.
(2) The Admin retrieves the Deck URI from the node event.
(3) The console gets the deck by admin's data proxy
How was this patch tested?
Setup process
single binary.
flyte: this branch
flytekit: flyteorg/flytekit#2779
flyteconsole: flyteorg/flyteconsole#890
Screenshots
RUNNING AND SUCCEED
Screen.Recording.2024-11-28.at.10.04.09.AM.mov
FAILED
Screen.Recording.2024-11-28.at.10.06.21.AM.mov
Check all the applicable boxes
Related PRs
follow up questions
Abort
phase for the streaming deck?should we support
EPhaseAbort
in this file?https://github.com/flyteorg/flyte/blob/b3330ba4430538f91ae9fc7d868a29a2e96db8bd/flytepropeller/pkg/controller/nodes/handler/transition_info.go