-
Notifications
You must be signed in to change notification settings - Fork 132
/
app.py
1231 lines (1029 loc) · 62.5 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""THA3 live mode for SillyTavern-extras.
This is the animation engine, running on top of the THA3 posing engine.
This module implements the live animation backend and serves the API. For usage, see `server.py`.
If you want to play around with THA3 expressions in a standalone app, see `manual_poser.py`.
"""
__all__ = ["set_emotion_from_classification", "set_emotion",
"unload",
"start_talking", "stop_talking",
"result_feed",
"talkinghead_load_file",
"launch"]
import atexit
import io
import json
import logging
import math
import os
import random
import sys
import time
import numpy as np
import threading
from typing import Any, Dict, List, NoReturn, Optional, Union
import PIL
import torch
from flask import Flask, Response
from flask_cors import CORS
from tha3.poser.modes.load_poser import load_poser
from tha3.poser.poser import Poser
from tha3.util import (torch_linear_to_srgb, resize_PIL_image,
extract_PIL_image_from_filelike, extract_pytorch_image_from_PIL_image)
from tha3.app.postprocessor import Postprocessor
from tha3.app.util import posedict_keys, posedict_key_to_index, load_emotion_presets, posedict_to_pose, to_talkinghead_image, RunningAverage
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# --------------------------------------------------------------------------------
# Global variables
# Default configuration for the animator, loaded when the plugin is launched.
# Doubles as the authoritative documentation of the animator settings (beside the animation driver docstrings and the actual source code).
animator_defaults = {"target_fps": 25, # Desired output frames per second. Note this only affects smoothness of the output (if hardware allows).
# The speed at which the animation evolves is based on wall time. Snapshots are rendered at the target FPS,
# or if the hardware is too slow to reach the target FPS, then as often as hardware allows.
# For smooth animation, make the FPS lower than what your hardware could produce, so that some compute
# remains untapped, available to smooth over the occasional hiccup from other running programs.
"crop_left": 0.0, # in units where the image width is 2.0
"crop_right": 0.0, # in units where the image width is 2.0
"crop_top": 0.0, # in units where the image height is 2.0
"crop_bottom": 0.0, # in units where the image height is 2.0
"pose_interpolator_step": 0.1, # 0 < this <= 1; at each frame at a reference of 25 FPS; FPS-corrected automatically; see `interpolate_pose`.
"blink_interval_min": 2.0, # seconds, lower limit for random minimum time until next blink is allowed.
"blink_interval_max": 5.0, # seconds, upper limit for random minimum time until next blink is allowed.
"blink_probability": 0.03, # At each frame at a reference of 25 FPS; FPS-corrected automatically.
"blink_confusion_duration": 10.0, # seconds, upon entering "confusion" emotion, during which blinking quickly in succession is allowed.
"talking_fps": 12, # How often to re-randomize mouth during talking animation.
# Early 2000s anime used ~12 FPS as the fastest actual framerate of new cels (not counting camera panning effects and such).
"talking_morph": "mouth_aaa_index", # which mouth-open morph to use for talking; for available values, see `posedict_keys`
"sway_morphs": ["head_x_index", "head_y_index", "neck_z_index", "body_y_index", "body_z_index"], # which morphs to sway; see `posedict_keys`
"sway_interval_min": 5.0, # seconds, lower limit for random time interval until randomizing new sway pose.
"sway_interval_max": 10.0, # seconds, upper limit for random time interval until randomizing new sway pose.
"sway_macro_strength": 0.6, # [0, 1], in sway pose, max abs deviation from emotion pose target morph value for each sway morph,
# but also max deviation from center. The emotion pose itself may use higher values; in such cases,
# sway will only occur toward the center. See `compute_sway_target_pose` for details.
"sway_micro_strength": 0.02, # [0, 1], max abs random noise added each frame. No limiting other than a clamp of final pose to [-1, 1].
"breathing_cycle_duration": 4.0, # seconds, for a full breathing cycle.
"postprocessor_chain": []} # Pixel-space glitch artistry settings; see `postprocessor.py`.
talkinghead_basedir = "talkinghead"
global_animator_instance = None
_animator_output_lock = threading.Lock() # protect from concurrent access to `result_image` and the `new_frame_available` flag.
global_encoder_instance = None
global_latest_frame_sent = None
# These need to be written to by the API functions.
#
# Since the plugin might not have been started yet at that time (so the animator instance might not exist),
# it's better to keep this state in module-level globals rather than in attributes of the animator.
animation_running = False # used in initial bootup state, and while loading a new image
current_emotion = "neutral"
is_talking = False
global_reload_image = None
target_fps = 25 # value overridden by `load_animator_settings` at animator startup
# --------------------------------------------------------------------------------
# API
# Flask setup
app = Flask(__name__)
CORS(app)
def set_emotion_from_classification(emotion_scores: List[Dict[str, Union[str, float]]]) -> str:
"""Set the current emotion of the character based on sentiment analysis results.
Currently, we pick the emotion with the highest confidence score.
`emotion_scores`: results from classify module: [{"label": emotion0, "score": confidence0}, ...]
Return a status message for passing over HTTP.
"""
highest_score = float("-inf")
highest_label = None
for item in emotion_scores:
if item["score"] > highest_score:
highest_score = item["score"]
highest_label = item["label"]
logger.info(f"set_emotion_from_classification: winning score: {highest_label} = {highest_score}")
return set_emotion(highest_label)
def set_emotion(emotion: str) -> str:
"""Set the current emotion of the character.
Return a status message for passing over HTTP.
"""
global current_emotion
if emotion not in global_animator_instance.emotions:
logger.warning(f"set_emotion: specified emotion '{emotion}' does not exist, selecting 'neutral'")
emotion = "neutral"
logger.info(f"set_emotion: applying emotion {emotion}")
current_emotion = emotion
return f"emotion set to {emotion}"
def unload() -> str:
"""Stop animation.
Return a status message for passing over HTTP.
"""
global animation_running
animation_running = False
logger.info("unload: animation paused")
return "animation paused"
def start_talking() -> str:
"""Start talking animation.
Return a status message for passing over HTTP.
"""
global is_talking
is_talking = True
logger.debug("start_talking called")
return "talking started"
def stop_talking() -> str:
"""Stop talking animation.
Return a status message for passing over HTTP.
"""
global is_talking
is_talking = False
logger.debug("stop_talking called")
return "talking stopped"
# There are three tasks we must do each frame:
#
# 1) Render an animation frame
# 2) Encode the new animation frame for network transport
# 3) Send the animation frame over the network
#
# Instead of running serially:
#
# [render1][encode1][send1] [render2][encode2][send2]
# ------------------------------------------------------> time
#
# we get better throughput by parallelizing and interleaving:
#
# [render1] [render2] [render3] [render4] [render5]
# [encode1] [encode2] [encode3] [encode4]
# [send1] [send2] [send3]
# ----------------------------------------------------> time
#
# Despite the global interpreter lock, this increases throughput, as well as improves the timing of the network send
# since the network thread only needs to care about getting the send timing right.
#
# Either there's enough waiting for I/O for the split between render and encode to make a difference, or it's the fact
# that much of the compute-heavy work in both of those is performed inside C libraries that release the GIL (Torch,
# and the PNG encoder in Pillow, respectively).
#
# This is a simplified picture. Some important details:
#
# - At startup:
# - The animator renders the first frame on its own.
# - The encoder waits for the animator to publish a frame, and then starts normal operation.
# - The network thread waits for the encoder to publish a frame, and then starts normal operation.
# - In normal operation (after startup):
# - The animator waits until the encoder has consumed the previous published frame. Then it proceeds to render and publish a new frame.
# - This communication is handled through the flag `animator.new_frame_available`.
# - The network thread does its own thing on a regular schedule, based on the desired target FPS.
# - However, the network thread publishes metadata on which frame is the latest that has been sent over the network at least once.
# This is stored as an `id` (i.e. memory address) in `global_latest_frame_sent`.
# - If the target FPS is too high for the animator and/or encoder to keep up with, the network thread re-sends
# the latest frame published by the encoder as many times as necessary, to keep the network output at the target FPS
# regardless of render/encode speed. This handles the case of hardware slower than the target FPS.
# - On localhost, the network send is very fast, under 0.15 ms.
# - The encoder uses the metadata to wait until the latest encoded frame has been sent at least once before publishing a new frame.
# This ensures that no more frames are generated than are actually sent, and syncs also the animator (because the animator is
# rate-limited by the encoder consuming its frames). This handles the case of hardware faster than the target FPS.
# - When the animator and encoder are fast enough to keep up with the target FPS, generally when frame N is being sent,
# frame N+1 is being encoded (or is already encoded, and waiting for frame N to be sent), and frame N+2 is being rendered.
#
def result_feed() -> Response:
"""Return a Flask `Response` that repeatedly yields the current image as 'image/png'."""
def generate():
global global_latest_frame_sent
last_frame_send_complete_time = None
last_report_time = None
send_duration_sec = 0.0
send_duration_statistics = RunningAverage()
while True:
# Send the latest available animation frame.
# Important: grab reference to `image_bytes` only once, since it will be atomically updated without a lock.
image_bytes = global_encoder_instance.image_bytes
if image_bytes is not None:
# How often should we send?
# - Excessive spamming can DoS the SillyTavern GUI, so there needs to be a rate limit.
# - OTOH, we must constantly send something, or the GUI will lock up waiting.
# Therefore, send at a target FPS that yields a nice-looking animation.
frame_duration_target_sec = 1 / target_fps
if last_frame_send_complete_time is not None:
time_now = time.time_ns()
this_frame_elapsed_sec = (time_now - last_frame_send_complete_time) / 10**9
# The 2* is a fudge factor. It doesn't matter if the frame is a bit too early, but we don't want it to be late.
time_until_frame_deadline = frame_duration_target_sec - this_frame_elapsed_sec - 2 * send_duration_sec
else:
time_until_frame_deadline = 0.0 # nothing rendered yet
if time_until_frame_deadline <= 0.0:
time_now = time.time_ns()
yield (b"--frame\r\n"
b"Content-Type: image/png\r\n\r\n" + image_bytes + b"\r\n")
global_latest_frame_sent = id(image_bytes) # atomic update, no need for lock
send_duration_sec = (time.time_ns() - time_now) / 10**9 # about 0.12 ms on localhost (compress_level=1 or 6, doesn't matter)
# print(f"send {send_duration_sec:0.6g}s") # DEBUG
# Update the FPS counter, measuring the time between network sends.
time_now = time.time_ns()
if last_frame_send_complete_time is not None:
this_frame_elapsed_sec = (time_now - last_frame_send_complete_time) / 10**9
send_duration_statistics.add_datapoint(this_frame_elapsed_sec)
last_frame_send_complete_time = time_now
else:
time.sleep(time_until_frame_deadline)
# Log the FPS counter in 5-second intervals.
time_now = time.time_ns()
if animation_running and (last_report_time is None or time_now - last_report_time > 5e9):
avg_send_sec = send_duration_statistics.average()
msec = round(1000 * avg_send_sec, 1)
target_msec = round(1000 * frame_duration_target_sec, 1)
fps = round(1 / avg_send_sec, 1) if avg_send_sec > 0.0 else 0.0
logger.info(f"output: {msec:.1f}ms [{fps:.1f} FPS]; target {target_msec:.1f}ms [{target_fps:.1f} FPS]")
last_report_time = time_now
else: # first frame not yet available
time.sleep(0.1)
return Response(generate(), mimetype="multipart/x-mixed-replace; boundary=frame")
# TODO: the input is a flask.request.file.stream; what's the type of that?
def talkinghead_load_file(stream) -> str:
"""Load image from stream and start animation."""
global global_reload_image
global animation_running
logger.info("talkinghead_load_file: loading new input image from stream")
try:
animation_running = False # pause animation while loading a new image
pil_image = PIL.Image.open(stream) # Load the image using PIL.Image.open
img_data = io.BytesIO() # Create a copy of the image data in memory using BytesIO
pil_image.save(img_data, format="PNG")
global_reload_image = PIL.Image.open(io.BytesIO(img_data.getvalue())) # Set the global_reload_image to a copy of the image data
except PIL.Image.UnidentifiedImageError:
logger.warning("Could not load input image from stream, loading blank")
full_path = os.path.join(os.getcwd(), os.path.join(talkinghead_basedir, "tha3", "images", "inital.png"))
global_reload_image = PIL.Image.open(full_path)
finally:
animation_running = True
return "OK"
def launch(device: str, model: str) -> Union[None, NoReturn]:
"""Launch the talking head plugin (live mode).
If the plugin fails to load, the process exits.
device: "cpu" or "cuda"
model: one of the folder names inside "talkinghead/tha3/models/"
"""
global global_animator_instance
global global_encoder_instance
try:
# If the animator already exists, clean it up first
if global_animator_instance is not None:
logger.info(f"launch: relaunching on device {device} with model {model}")
global_animator_instance.exit()
global_animator_instance = None
global_encoder_instance.exit()
global_encoder_instance = None
logger.info("launch: loading the THA3 posing engine")
poser = load_poser(model, device, modelsdir=os.path.join(talkinghead_basedir, "tha3", "models"))
global_animator_instance = Animator(poser, device)
global_encoder_instance = Encoder()
# Load initial blank character image
full_path = os.path.join(os.getcwd(), os.path.join(talkinghead_basedir, "tha3", "images", "inital.png"))
global_animator_instance.load_image(full_path)
global_animator_instance.start()
global_encoder_instance.start()
except RuntimeError as exc:
logger.error(exc)
sys.exit()
# --------------------------------------------------------------------------------
# Internal stuff
def convert_linear_to_srgb(image: torch.Tensor) -> torch.Tensor:
"""RGBA (linear) -> RGBA (SRGB), preserving the alpha channel."""
rgb_image = torch_linear_to_srgb(image[0:3, :, :])
return torch.cat([rgb_image, image[3:4, :, :]], dim=0)
class Animator:
"""uWu Waifu"""
def __init__(self, poser: Poser, device: torch.device):
self.poser = poser
self.device = device
self.postprocessor = Postprocessor(device)
self.render_duration_statistics = RunningAverage()
self.animator_thread = None
self.source_image: Optional[torch.tensor] = None
self.result_image: Optional[np.array] = None
self.new_frame_available = False
self.last_report_time = None
self.reset_animation_state()
self.load_emotion_templates()
self.load_animator_settings()
# --------------------------------------------------------------------------------
# Management
def start(self) -> None:
"""Start the animation thread."""
self._terminated = False
def animator_update():
while not self._terminated:
try:
self.render_animation_frame()
except Exception as exc:
logger.error(exc)
raise # let the animator stop so we won't spam the log
time.sleep(0.01) # rate-limit the renderer to 100 FPS maximum (this could be adjusted later)
self.animator_thread = threading.Thread(target=animator_update, daemon=True)
self.animator_thread.start()
atexit.register(self.exit)
def exit(self) -> None:
"""Terminate the animation thread.
Called automatically when the process exits.
"""
self._terminated = True
self.animator_thread.join()
self.animator_thread = None
def reset_animation_state(self):
"""Reset character state trackers for all animation drivers."""
self.current_pose = None
self.last_emotion = None
self.last_emotion_change_timestamp = None
self.last_sway_target_timestamp = None
self.last_sway_target_pose = None
self.last_microsway_timestamp = None
self.sway_interval = None
self.last_blink_timestamp = None
self.blink_interval = None
self.last_talking_timestamp = None
self.last_talking_target_value = None
self.was_talking = False
self.breathing_epoch = time.time_ns()
def load_emotion_templates(self, emotions: Optional[Dict[str, Dict[str, float]]] = None) -> None:
"""Load emotion templates.
`emotions`: `{emotion0: {morph0: value0, ...}, ...}`
Optional dict of custom emotion templates.
If not given, this loads the templates from the emotion JSON files
in `talkinghead/emotions/`.
If given:
- Each emotion NOT supplied is populated from the defaults.
- In each emotion that IS supplied, each morph that is NOT mentioned
is implicitly set to zero (due to how `apply_emotion_to_pose` works).
For an example JSON file containing a suitable dictionary, see `talkinghead/emotions/_defaults.json`.
For available morph names, see `posedict_keys` in `talkinghead/tha3/app/util.py`.
For some more detail, see `talkinghead/tha3/poser/modes/pose_parameters.py`.
"Arity 2" means `posedict_keys` has separate left/right morphs.
If still in doubt, see the GUI panel implementations in `talkinghead/tha3/app/manual_poser.py`.
"""
# Load defaults as a base
self.emotions, self.emotion_names = load_emotion_presets(os.path.join("talkinghead", "emotions"))
# Then override defaults, and add any new custom emotions
if emotions is not None:
logger.info(f"load_emotion_templates: loading user-specified templates for emotions {list(sorted(emotions.keys()))}")
self.emotions.update(emotions)
emotion_names = set(self.emotion_names)
emotion_names.update(emotions.keys())
self.emotion_names = list(sorted(emotion_names))
else:
logger.info("load_emotion_templates: loaded default emotion templates")
def load_animator_settings(self, settings: Optional[Dict[str, Any]] = None) -> None:
"""Load animator settings.
`settings`: `{setting0: value0, ...}`
Optional dict of settings. The type and semantics of each value depends on each
particular setting.
For available settings, see `animator_defaults` in `talkinghead/tha3/app/app.py`.
Particularly for the setting `"postprocessor_chain"` (pixel-space glitch artistry),
see `talkinghead/tha3/app/postprocessor.py`.
"""
global target_fps
if settings is None:
settings = {}
logger.info(f"load_animator_settings: user settings: {settings}")
# Load server-side settings (`talkinghead/animator.json`)
try:
animator_config_path = os.path.join(talkinghead_basedir, "animator.json")
with open(animator_config_path, "r") as json_file:
server_settings = json.load(json_file)
except Exception as exc:
logger.info(f"load_animator_settings: skipping server settings, reason: {exc}")
server_settings = {}
# Let's define some helpers:
def drop_unrecognized(settings: Dict[str, Any], context: str) -> None: # DANGER: MUTATING FUNCTION
unknown_fields = [field for field in settings if field not in animator_defaults]
if unknown_fields:
logger.warning(f"load_animator_settings: in {context}: this server did not recognize the following settings, ignoring them: {unknown_fields}")
for field in unknown_fields:
settings.pop(field)
assert all(field in animator_defaults for field in settings) # contract: only known settings remaining
def typecheck(settings: Dict[str, Any], context: str) -> None: # DANGER: MUTATING FUNCTION
for field, default_value in animator_defaults.items():
type_match = (int, float) if isinstance(default_value, (int, float)) else type(default_value)
if field in settings and not isinstance(settings[field], type_match):
logger.warning(f"load_animator_settings: in {context}: incorrect type for '{field}': got {type(settings[field])} with value '{settings[field]}', expected {type_match}")
settings.pop(field) # (safe; this is not the collection we are iterating over)
def aggregate(settings: Dict[str, Any], fallback_settings: Dict[str, Any], fallback_context: str) -> None: # DANGER: MUTATING FUNCTION
for field, default_value in fallback_settings.items():
if field not in settings:
logger.info(f"load_animator_settings: filling in '{field}' from {fallback_context}")
settings[field] = default_value
# Now our settings loading strategy is as simple as:
settings = dict(settings) # copy to avoid modifying the original, since we'll pop some stuff.
if settings:
drop_unrecognized(settings, context="user settings")
typecheck(settings, context="user settings")
if server_settings:
drop_unrecognized(server_settings, context="server settings")
typecheck(server_settings, context="server settings")
# both `settings` and `server_settings` are fully valid at this point
aggregate(settings, fallback_settings=server_settings, fallback_context="server settings") # first fill in from server-side settings
aggregate(settings, fallback_settings=animator_defaults, fallback_context="built-in defaults") # then fill in from hardcoded defaults
logger.info(f"load_animator_settings: final settings (filled in as necessary): {settings}")
# Some settings must be applied explicitly.
logger.debug(f"load_animator_settings: Setting new target FPS = {settings['target_fps']}")
target_fps = settings.pop("target_fps") # global variable, controls the network send rate.
logger.debug("load_animator_settings: Sending new effect chain to postprocessor")
self.postprocessor.chain = settings.pop("postprocessor_chain") # ...and that's where the postprocessor reads its filter settings from.
# The rest of the settings we can just store in an attribute, and let the animation drivers read them from there.
self._settings = settings
def load_image(self, file_path=None) -> None:
"""Load the image file at `file_path`, and replace the current character with it.
Except, if `global_reload_image is not None`, use the global reload image data instead.
In that case `file_path` is not used.
When done, this always sets `global_reload_image` to `None`.
"""
global global_reload_image
try:
if global_reload_image is not None:
pil_image = global_reload_image
else:
pil_image = resize_PIL_image(
extract_PIL_image_from_filelike(file_path),
(self.poser.get_image_size(), self.poser.get_image_size()))
w, h = pil_image.size
if pil_image.size != (512, 512):
logger.info("Resizing Char Card to work")
pil_image = to_talkinghead_image(pil_image)
w, h = pil_image.size
if pil_image.mode != "RGBA":
logger.error("load_image: image must have alpha channel")
self.source_image = None
else:
self.source_image = extract_pytorch_image_from_PIL_image(pil_image) \
.to(self.device).to(self.poser.get_dtype())
except Exception as exc:
logger.error(f"load_image: {exc}")
finally:
global_reload_image = None
# --------------------------------------------------------------------------------
# Animation drivers
def apply_emotion_to_pose(self, emotion_posedict: Dict[str, float], pose: List[float]) -> List[float]:
"""Copy all morphs except breathing from `emotion_posedict` to `pose`.
If a morph does not exist in `emotion_posedict`, its value is copied from the original `pose`.
Return the modified pose.
"""
new_pose = list(pose) # copy
for idx, key in enumerate(posedict_keys):
if key in emotion_posedict and key != "breathing_index":
new_pose[idx] = emotion_posedict[key]
return new_pose
def animate_blinking(self, pose: List[float]) -> List[float]:
"""Eye blinking animation driver.
Relevant `self._settings` keys:
`"blink_interval_min"`: float, seconds, lower limit for random minimum time until next blink is allowed.
`"blink_interval_max"`: float, seconds, upper limit for random minimum time until next blink is allowed.
`"blink_probability"`: float, at each frame at a reference of 25 FPS. FPS-corrected automatically.
`"blink_confusion_duration"`: float, seconds, upon entering "confusion" emotion, during which blinking
quickly in succession is allowed.
Return the modified pose.
"""
# Compute FPS-corrected blink probability
CALIBRATION_FPS = 25
p_orig = self._settings["blink_probability"] # blink probability per frame at CALIBRATION_FPS
avg_render_sec = self.render_duration_statistics.average()
if avg_render_sec > 0:
avg_render_fps = 1 / avg_render_sec
# Even if render completes faster, the `talkinghead` output is rate-limited to `target_fps` at most.
avg_render_fps = min(avg_render_fps, target_fps)
else: # No statistics available yet; let's assume we're running at `target_fps`.
avg_render_fps = target_fps
# We give an independent trial for each of `n` "normalized frames" elapsed at `CALIBRATION_FPS` during one actual frame at `avg_render_fps`.
# Note direction: rendering faster (higher FPS) means less likely to blink per frame, to obtain the same blink density per unit of wall time.
n = CALIBRATION_FPS / avg_render_fps
# If at least one of the normalized frames wants to blink, then the actual frame should blink.
# Doesn't matter that `n` isn't an integer, since the power function over the reals is continuous and we just want a reasonable scaling here.
p_scaled = 1.0 - (1.0 - p_orig)**n
should_blink = (random.random() <= p_scaled)
debug_fps = round(avg_render_fps, 1)
logger.debug(f"animate_blinking: p @ {CALIBRATION_FPS} FPS = {p_orig}, scaled p @ {debug_fps:.1f} FPS = {p_scaled:0.6g}")
# Prevent blinking too fast in succession.
time_now = time.time_ns()
if self.blink_interval is not None:
# ...except when the "confusion" emotion has been entered recently.
seconds_since_last_emotion_change = (time_now - self.last_emotion_change_timestamp) / 10**9
if current_emotion == "confusion" and seconds_since_last_emotion_change < self._settings["blink_confusion_duration"]:
pass
else:
seconds_since_last_blink = (time_now - self.last_blink_timestamp) / 10**9
if seconds_since_last_blink < self.blink_interval:
should_blink = False
if not should_blink:
return pose
# If there should be a blink, set the wink morphs to 1.
new_pose = list(pose) # copy
for morph_name in ["eye_wink_left_index", "eye_wink_right_index"]:
idx = posedict_key_to_index[morph_name]
new_pose[idx] = 1.0
# Typical for humans is 12...20 times per minute, i.e. 5...3 seconds interval.
self.last_blink_timestamp = time_now
self.blink_interval = random.uniform(self._settings["blink_interval_min"],
self._settings["blink_interval_max"]) # seconds; duration of this blink before the next one can begin
return new_pose
def animate_talking(self, pose: List[float], target_pose: List[float]) -> List[float]:
"""Talking animation driver.
Relevant `self._settings` keys:
`"talking_fps"`: float, how often to re-randomize mouth during talking animation.
Early 2000s anime used ~12 FPS as the fastest actual framerate of
new cels (not counting camera panning effects and such).
`"talking_morph"`: str, see `posedict_keys` for available values.
Which morph to use for opening and closing the mouth during talking.
Any other morphs in the mouth-open group are set to zero while
talking is in progress.
Works by randomizing the mouth-open state in regular intervals.
When talking ends, the mouth immediately snaps to its position in the target pose
(to avoid a slow, unnatural closing, since most expressions have the mouth closed).
Return the modified pose.
"""
MOUTH_OPEN_MORPHS = ["mouth_aaa_index", "mouth_iii_index", "mouth_uuu_index", "mouth_eee_index", "mouth_ooo_index", "mouth_delta"]
talking_morph = self._settings["talking_morph"]
if not is_talking:
try:
if self.was_talking: # when talking ends, snap mouth to target immediately
new_pose = list(pose) # copy
for key in MOUTH_OPEN_MORPHS:
idx = posedict_key_to_index[key]
new_pose[idx] = target_pose[idx]
return new_pose
return pose # most common case: do nothing (not talking, and wasn't talking during previous frame)
finally: # reset state *after* processing
self.last_talking_target_value = None
self.last_talking_timestamp = None
self.was_talking = False
assert is_talking
# With 25 FPS (or faster) output, randomizing the mouth every frame looks too fast.
# Determine whether enough wall time has passed to randomize a new mouth position.
TARGET_SEC = 1 / self._settings["talking_fps"] # rate of "actual new cels" in talking animation
time_now = time.time_ns()
update_mouth = False
if self.last_talking_timestamp is None:
update_mouth = True
else:
time_elapsed_sec = (time_now - self.last_talking_timestamp) / 10**9
if time_elapsed_sec >= TARGET_SEC:
update_mouth = True
# Apply the mouth open morph
new_pose = list(pose) # copy
idx = posedict_key_to_index[talking_morph]
if self.last_talking_target_value is None or update_mouth:
# Randomize new mouth position
x = pose[idx]
x = abs(1.0 - x) + random.uniform(-2.0, 2.0)
x = max(0.0, min(x, 1.0)) # clamp (not the manga studio)
self.last_talking_target_value = x
self.last_talking_timestamp = time_now
else:
# Keep the mouth at its latest randomized position (this overrides the interpolator that would pull the mouth toward the target emotion pose)
x = self.last_talking_target_value
new_pose[idx] = x
# Zero out other morphs that affect mouth open/closed state.
for key in MOUTH_OPEN_MORPHS:
if key == talking_morph:
continue
idx = posedict_key_to_index[key]
new_pose[idx] = 0.0
self.was_talking = True
return new_pose
def compute_sway_target_pose(self, original_target_pose: List[float]) -> List[float]:
"""History-free sway animation driver.
`original_target_pose`: emotion pose to modify with a randomized sway target
Relevant `self._settings` keys:
`"sway_morphs"`: List[str], which morphs can sway. By default, this is all geometric transformations,
but disabling some can be useful for some characters (such as robots).
For available values, see `posedict_keys`.
`"sway_interval_min"`: float, seconds, lower limit for random time interval until randomizing new sway pose.
`"sway_interval_max"`: float, seconds, upper limit for random time interval until randomizing new sway pose.
Note the limits are ignored when `original_target_pose` changes (then immediately refreshing
the sway pose), because an emotion pose may affect the geometric transformations, too.
`"sway_macro_strength"`: float, [0, 1]. In sway pose, max abs deviation from emotion pose target morph value
for each sway morph, but also max deviation from center. The `original_target_pose`
itself may use higher values; in such cases, sway will only occur toward the center.
See the source code of this function for the exact details.
`"sway_micro_strength"`: float, [0, 1]. Max abs random noise to sway target pose, added each frame, to make
the animation look less robotic. No limiting other than a clamp of final pose to [-1, 1].
The sway target pose is randomized again when necessary; this takes care of caching internally.
Return the modified pose.
"""
# We just modify the target pose, and let the ODE integrator (`interpolate_pose`) do the actual animation.
# - This way we don't need to track start state, progress, etc.
# - This also makes the animation nonlinear automatically: a saturating exponential trajectory toward the target.
# - If we want a smooth start toward a target pose/morph, we can e.g. save the timestamp when the animation began, and then ramp the rate of change,
# beginning at zero and (some time later, as measured from the timestamp) ending at the original, non-ramped value. The ODE itself takes care of
# slowing down when we approach the target state.
# As documented in the original THA tech reports, on the pose axes, zero is centered, and 1.0 = 15 degrees.
random_max = self._settings["sway_macro_strength"] # max sway magnitude from center position of each morph
noise_max = self._settings["sway_micro_strength"] # amount of dynamic noise (re-generated every frame), added on top of the sway target, no clamping except to [-1, 1]
SWAYPARTS = self._settings["sway_morphs"] # some characters might not sway on all axes (e.g. a robot)
def macrosway() -> List[float]: # this handles caching and everything
time_now = time.time_ns()
should_pick_new_sway_target = True
if current_emotion == self.last_emotion:
if self.sway_interval is not None: # have we created a swayed pose at least once?
seconds_since_last_sway_target = (time_now - self.last_sway_target_timestamp) / 10**9
if seconds_since_last_sway_target < self.sway_interval:
should_pick_new_sway_target = False
# else, emotion has changed, invalidating the old sway target, because it is based on the old emotion (since emotions may affect the pose too).
if not should_pick_new_sway_target:
if self.last_sway_target_pose is not None: # When keeping the same sway target, return the cached sway pose if we have one.
return self.last_sway_target_pose
else: # Should not happen, but let's be robust.
return original_target_pose
new_target_pose = list(original_target_pose) # copy
for key in SWAYPARTS:
idx = posedict_key_to_index[key]
target_value = original_target_pose[idx]
# Determine the random range so that the swayed target always stays within `[-random_max, random_max]`, regardless of `target_value`.
# TODO: This is a simple zeroth-order solution that just cuts the random range.
# Would be nicer to *gradually* decrease the available random range on the "outside" as the target value gets further from the origin.
random_upper = max(0, random_max - target_value) # e.g. if target_value = 0.2, then random_upper = 0.4 => max possible = 0.6 = random_max
random_lower = min(0, -random_max - target_value) # e.g. if target_value = -0.2, then random_lower = -0.4 => min possible = -0.6 = -random_max
random_value = random.uniform(random_lower, random_upper)
new_target_pose[idx] = target_value + random_value
self.last_sway_target_pose = new_target_pose
self.last_sway_target_timestamp = time_now
self.sway_interval = random.uniform(self._settings["sway_interval_min"],
self._settings["sway_interval_max"]) # seconds; duration of this sway target before randomizing new one
return new_target_pose
# Add dynamic noise (re-generated at 25 FPS) to the target to make the animation look less robotic, especially once we are near the target pose.
def add_microsway() -> None: # DANGER: MUTATING FUNCTION
CALIBRATION_FPS = 25 # FPS at which randomizing a new microsway target looks good
time_now = time.time_ns()
should_microsway = True
if self.last_microsway_timestamp is not None:
seconds_since_last_microsway = (time_now - self.last_microsway_timestamp) / 10**9
if seconds_since_last_microsway < 1 / CALIBRATION_FPS:
should_microsway = False
if should_microsway:
for key in SWAYPARTS:
idx = posedict_key_to_index[key]
x = new_target_pose[idx] + random.uniform(-noise_max, noise_max)
x = max(-1.0, min(x, 1.0))
new_target_pose[idx] = x
self.last_microsway_timestamp = time_now
new_target_pose = macrosway()
add_microsway()
return new_target_pose
def animate_breathing(self, pose: List[float]) -> List[float]:
"""Breathing animation driver.
Relevant `self._settings` keys:
`"breathing_cycle_duration"`: seconds. Duration of one full breathing cycle.
Return the modified pose.
"""
breathing_cycle_duration = self._settings["breathing_cycle_duration"] # seconds
time_now = time.time_ns()
t = (time_now - self.breathing_epoch) / 10**9 # seconds since breathing-epoch
cycle_pos = t / breathing_cycle_duration # number of cycles since breathing-epoch
if cycle_pos > 1.0: # prevent loss of accuracy in long sessions
self.breathing_epoch = time_now # TODO: be more accurate here, should sync to a whole cycle
cycle_pos = cycle_pos - float(int(cycle_pos)) # fractional part
new_pose = list(pose) # copy
idx = posedict_key_to_index["breathing_index"]
new_pose[idx] = math.sin(cycle_pos * math.pi)**2 # 0 ... 1 ... 0, smoothly, with slow start and end, fast middle
return new_pose
def interpolate_pose(self, pose: List[float], target_pose: List[float]) -> List[float]:
"""Interpolate from current `pose` toward `target_pose`.
Relevant `self._settings` keys:
`"pose_interpolator_step"`: [0, 1]; how far toward `target_pose` to interpolate in one frame,
assuming a reference of 25 FPS. This is FPS-corrected automatically.
0 is fully `pose`, 1 is fully `target_pose`.
This is a kind of history-free rate-based formulation, which needs only the current and target poses, and
the step size; there is no need to keep track of e.g. the initial pose or the progress along the trajectory.
Note that looping back the output as `pose`, while keeping `target_pose` constant, causes the current pose
to approach `target_pose` on a saturating trajectory. This is because `step` is the fraction of the *current*
difference between `pose` and `target_pose`, which obviously becomes smaller after each repeat.
This is a feature, not a bug!
"""
# The `step` parameter is calibrated against animation at 25 FPS, so we must scale it appropriately, taking
# into account the actual FPS.
#
# How to do this requires some explanation. Numericist hat on. Let's do a quick back-of-the-envelope calculation.
# This pose interpolator is essentially a solver for the first-order ODE:
#
# u' = f(u, t)
#
# Consider the most common case, where the target pose remains constant over several animation frames.
# Furthermore, consider just one morph (they all behave similarly). Then our ODE is Newton's law of cooling:
#
# u' = -β [u - u∞]
#
# where `u = u(t)` is the temperature, `u∞` is the constant temperature of the external environment,
# and `β > 0` is a material-dependent cooling coefficient.
#
# But instead of numerical simulation at a constant timestep size, as would be typical in computational science,
# we instead read off points off the analytical solution curve. The `step` parameter is *not* the timestep size;
# instead, it controls the relative distance along the *u* axis that should be covered in one simulation step,
# so it is actually related to the cooling coefficient β.
#
# (How exactly: write the left-hand side as `[unew - uold] / Δt + O([Δt]²)`, drop the error term, and decide
# whether to use `uold` (forward Euler) or `unew` (backward Euler) as `u` on the right-hand side. Then compare
# to our update formula. But those details don't matter here.)
#
# To match the notation in the rest of this code, let us denote the temperature (actually pose morph value) as `x`
# (instead of `u`). And to keep notation shorter, let `β := step` (although it's not exactly the `β` of the
# continuous-in-time case above).
#
# To scale the animation speed linearly with regard to FPS, we must invert the relation between simulation step
# number `n` and the solution value `x`. For an initial value `x0`, a constant target value `x∞`, and constant
# step `β ∈ (0, 1]`, the pose interpolator produces the sequence:
#
# x1 = x0 + β [x∞ - x0] = [1 - β] x0 + β x∞
# x2 = x1 + β [x∞ - x1] = [1 - β] x1 + β x∞
# x3 = x2 + β [x∞ - x2] = [1 - β] x2 + β x∞
# ...
#
# Note that with exact arithmetic, if `β < 1`, the final value is only reached in the limit `n → ∞`.
# For floating point, this is not the case. Eventually the increment becomes small enough that when
# it is added, nothing happens. After sufficiently many steps, in practice `x` will stop just slightly
# short of `x∞` (on the side it approached the target from).
#
# (For performance reasons, when approaching zero, one may need to beware of denormals, because those
# are usually implemented in (slow!) software on modern CPUs. So especially if the target is zero,
# it is useful to have some very small cutoff (inside the normal floating-point range) after which
# we make `x` instantly jump to the target value.)
#
# Inserting the definition of `x1` to the formula for `x2`, we can express `x2` in terms of `x0` and `x∞`:
#
# x2 = [1 - β] ([1 - β] x0 + β x∞) + β x∞
# = [1 - β]² x0 + [1 - β] β x∞ + β x∞
# = [1 - β]² x0 + [[1 - β] + 1] β x∞
#
# Then inserting this to the formula for `x3`:
#
# x3 = [1 - β] ([1 - β]² x0 + [[1 - β] + 1] β x∞) + β x∞
# = [1 - β]³ x0 + [1 - β]² β x∞ + [1 - β] β x∞ + β x∞
#
# To simplify notation, define:
#
# α := 1 - β
#
# We have:
#
# x1 = α x0 + [1 - α] x∞
# x2 = α² x0 + [1 - α] [1 + α] x∞
# = α² x0 + [1 - α²] x∞
# x3 = α³ x0 + [1 - α] [1 + α + α²] x∞
# = α³ x0 + [1 - α³] x∞
#
# This suggests that the general pattern is (as can be proven by induction on `n`):
#
# xn = α**n x0 + [1 - α**n] x∞
#
# This allows us to determine `x` as a function of simulation step number `n`. Now the scaling question becomes:
# if we want to reach a given value `xn` by some given step `n_scaled` (instead of the original step `n`),
# how must we change the step size `β` (or equivalently, the parameter `α`)?
#
# To simplify further, observe:
#
# x1 = α x0 + [1 - α] [[x∞ - x0] + x0]
# = [α + [1 - α]] x0 + [1 - α] [x∞ - x0]
# = x0 + [1 - α] [x∞ - x0]
#
# Rearranging yields:
#
# [x1 - x0] / [x∞ - x0] = 1 - α
#
# which gives us the relative distance from `x0` to `x∞` that is covered in one step. This isn't yet much
# to write home about (it's essentially just a rearrangement of the definition of `x1`), but next, let's
# treat `x2` the same way:
#
# x2 = α² x0 + [1 - α] [1 + α] [[x∞ - x0] + x0]
# = [α² x0 + [1 - α²] x0] + [1 - α²] [x∞ - x0]
# = [α² + 1 - α²] x0 + [1 - α²] [x∞ - x0]
# = x0 + [1 - α²] [x∞ - x0]
#
# We obtain
#
# [x2 - x0] / [x∞ - x0] = 1 - α²
#
# which is the relative distance, from the original `x0` toward the final `x∞`, that is covered in two steps
# using the original step size `β = 1 - α`. Next up, `x3`:
#
# x3 = α³ x0 + [1 - α³] [[x∞ - x0] + x0]
# = α³ x0 + [1 - α³] [x∞ - x0] + [1 - α³] x0
# = x0 + [1 - α³] [x∞ - x0]
#
# Rearranging,
#
# [x3 - x0] / [x∞ - x0] = 1 - α³
#
# which is the relative distance covered in three steps. Hence, we have:
#
# xrel := [xn - x0] / [x∞ - x0] = 1 - α**n
#
# so that
#
# α**n = 1 - xrel (**)
#
# and (taking the natural logarithm of both sides)
#
# n log α = log [1 - xrel]
#
# Finally,
#
# n = [log [1 - xrel]] / [log α]
#
# Given `α`, this gives the `n` where the interpolator has covered the fraction `xrel` of the original distance.
# On the other hand, we can also solve (**) for `α`:
#
# α = (1 - xrel)**(1 / n)
#
# which, given desired `n`, gives us the `α` that makes the interpolator cover the fraction `xrel` of the original distance in `n` steps.
#
CALIBRATION_FPS = 25 # FPS for which the default value `step` was calibrated
xrel = 0.5 # just some convenient value
step = self._settings["pose_interpolator_step"]
alpha_orig = 1.0 - step
if 0 < alpha_orig < 1:
avg_render_sec = self.render_duration_statistics.average()
if avg_render_sec > 0:
avg_render_fps = 1 / avg_render_sec
# Even if render completes faster, the `talkinghead` output is rate-limited to `target_fps` at most.
avg_render_fps = min(avg_render_fps, target_fps)
else: # No statistics available yet; let's assume we're running at `target_fps`.
avg_render_fps = target_fps