-
Notifications
You must be signed in to change notification settings - Fork 1
/
effective4.html
1516 lines (1358 loc) · 62.5 KB
/
effective4.html
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
<!DOCTYPE html>
<!--[if IEMobile 7 ]><html class="no-js iem7"><![endif]-->
<!--[if lt IE 9]><html class="no-js lte-ie8"><![endif]-->
<!--[if (gt IE 8)|(gt IEMobile 7)|!(IEMobile)|!(IE)]><!--><html class="no-js" lang="en"><!--<![endif]-->
<head>
<meta charset="utf-8">
<title>Effective Python一人輪読会(Item 52 to 74) — Daydreaming in Greater Boston</title>
<meta name="author" content="Kyos">
<!-- http://t.co/dKP3o1e -->
<meta name="HandheldFriendly" content="True">
<meta name="MobileOptimized" content="320">
<meta name="viewport" content="width=device-width, initial-scale=1">
<link href="/favicon.png" rel="icon">
<link href="/theme/css/main.css" media="screen, projection"
rel="stylesheet" type="text/css">
<link href="//fonts.googleapis.com/css?family=PT+Serif:regular,italic,bold,bolditalic"
rel="stylesheet" type="text/css">
<link href="//fonts.googleapis.com/css?family=PT+Sans:regular,italic,bold,bolditalic"
rel="stylesheet" type="text/css">
</head>
<body>
<header role="banner"><hgroup>
<h1><a href="/">Daydreaming in Greater Boston</a></h1>
</hgroup></header>
<nav role="navigation"><ul class="subscription" data-subscription="rss">
</ul>
<ul class="main-navigation">
<li><a href="/pages/about.html">About Me</a></li>
<li >
<a href="/category/blog.html">Blog</a>
</li>
<li >
<a href="/category/english.html">English</a>
</li>
<li >
<a href="/category/linux.html">Linux</a>
</li>
<li class="active">
<a href="/category/python.html">Python</a>
</li>
<li >
<a href="/category/tech.html">Tech</a>
</li>
</ul></nav>
<div id="main">
<div id="content">
<div>
<article class="hentry" role="article">
<header>
<h1 class="entry-title">Effective Python一人輪読会(Item 52 to 74)</h1>
<p class="meta">
<time datetime="2020-08-26T00:00:00-04:00" pubdate>Wed 26 August 2020</time> </p>
</header>
<div class="entry-content"><div id="table-of-contents">
<h2>Table of Contents</h2>
<div id="text-table-of-contents">
<ul>
<li><a href="#org6eee8ef">1. Chapter 7: コンカレンシーと並列実行</a>
<ul>
<li><a href="#org9a16b6c">1.1. Item 52: 子プロセスを管理するために subprosess を使え</a></li>
<li><a href="#org33d9887">1.2. Item 53: ブロックするI/Oにはスレッドを使い、parallelismを避けよ</a></li>
<li><a href="#orgb61b3f2">1.3. Item 54: スレッド間のデータレースを避けるために Lock を使え</a></li>
<li><a href="#org7000a83">1.4. Item 55: スレッド間のワークアサインの調整には Queue を使え</a></li>
<li><a href="#org3173f8f">1.5. Item 56: いつコンカレンシーが必要になるかをどう理解するか知れ</a></li>
<li><a href="#orgf59e86a">1.6. Item 57: オンデマンドのfan-outで新たなスレッドインスタンスを作るのは避けろ</a></li>
<li><a href="#orgc735716">1.7. Item 58: コンカレンシーのためにQueueを使うにはリファクタリングが必要なことを理解せよ</a></li>
<li><a href="#orgc98e78e">1.8. Item 59: コンカレンシーでスレッドが必要なら ThreadPoolExecutorを検討せよ</a></li>
<li><a href="#orgb1e74df">1.9. Item 60: コルーチン(Coroutines)を使って高コンカレントI/Oを実現せよ</a></li>
<li><a href="#orgd86deee">1.10. Item 61: スレッド化されたI/Oをasyncioにポートする方法を知れ</a></li>
<li><a href="#org99133da">1.11. Item 62: asyncioへの移行を楽にするため、スレッドとコルーチンを混在させよ</a></li>
<li><a href="#org3b33c3c">1.12. Item 63: レスポンスを最大化するためにはasyncioのイベントループをブロックするな</a></li>
<li><a href="#org6044d80">1.13. Item 64: 真の並行動作にはconcurrent.futuresを検討せよ</a></li>
</ul>
</li>
<li><a href="#orgf5571e4">2. Chapter 8: 堅牢性(robustness)と性能</a>
<ul>
<li><a href="#orgc49a200">2.1. Item 65: try/except/else/finallyで各ブロックを有効に使え</a></li>
<li><a href="#org211bb0b">2.2. Item 66: try/finally挙動を再利用するためにcontextlibとwithステートメントを考えよ</a></li>
<li><a href="#org46dabc8">2.3. Item 67: ローカル時間にはtimeの代わりにdatetimeを使え</a></li>
<li><a href="#org012eb4e">2.4. Item 68: copyregでpickleをreliableにせよ</a></li>
<li><a href="#org3185195">2.5. Item 69: 精度が重要なら decimal を使え</a></li>
<li><a href="#orgbcb6398">2.6. Item 70: 最適化の前にプロファイルせよ</a></li>
<li><a href="#orgd706422">2.7. Item 71: 生産者-消費者キューにはdequeを使え</a></li>
<li><a href="#orgacb6233">2.8. Item 72: ソートされたシーケンス内をサーチするにはbisectを使え</a></li>
<li><a href="#org3793459">2.9. Item 73: 優先度キューのために heapq をどう使うかを知れ</a></li>
<li><a href="#org71f96b0">2.10. Item 74: bytesとゼロコピーでinteractするにはmemoryviewとbytearrayを使え</a></li>
</ul>
</li>
</ul>
</div>
</div>
<div id="outline-container-org6eee8ef" class="outline-2">
<h2 id="org6eee8ef"><span class="section-number-2">1</span> Chapter 7: コンカレンシーと並列実行</h2>
<div class="outline-text-2" id="text-1">
</div>
<div id="outline-container-org9a16b6c" class="outline-3">
<h3 id="org9a16b6c"><span class="section-number-3">1.1</span> Item 52: 子プロセスを管理するために subprosess を使え</h3>
<div class="outline-text-3" id="text-1-1">
<p>
Pythonから子プロセスを呼び出すシンプルな方法。
</p>
<div class="org-src-container">
<pre class="src src-python">import subprocess
result = subprocess.run(
['echo', 'Hello from the child!'],
capture_output=True, # stdout/stderrをキャプチャーする
encoding='utf-8')
result.check_returncode()
print(result.stdout)
>>>
Hello from the child!
</pre>
</div>
<p>
Python 3.5で導入された <code>subprocess.run</code> は子プロセスの実行完了を待ちます。<a href="https://docs.python.org/3/library/subprocess.html">公式サイト</a>によると、基本的にはこれを使うのが推奨だそうです。
</p>
<blockquote>
<p>
subprocess.run(args, *, stdin=None, input=None, stdout=None, stderr=None, capture_output=False, shell=False, cwd=None, timeout=None, check=False, encoding=None, errors=None, text=None, env=None, universal_newlines=None, **other_popen_kwargs)
</p>
</blockquote>
<p>
タイムアウトも指定できそうです。
</p>
<p>
次は、 <code>subprocess.Popen</code> を使ってブロックされずに子プロセスを10個起動したあと、 <code><process>.communicate</code> で実行完了した各子プロセスを終了(terminate)させる処理です。
</p>
<div class="org-src-container">
<pre class="src src-python">import subprocess
import time
start = time.time()
sleep_procs = []
for _ in range(10):
proc = subprocess.Popen(['sleep', '1'])
sleep_procs.append(proc)
time.sleep(0.3)
for proc in sleep_procs:
proc.communicate()
end = time.time()
delta = end - start
print(f'Finished in {delta:.3} seconds')
>>>
Finished in 1.02 seconds
</pre>
</div>
<p>
並列に実行するため10秒かからず、1秒強で終わっています。
</p>
<p>
次の例は、外部コマンドの openssl にランダムな10バイトのバイト列を暗号化させる処理です。
</p>
<div class="org-src-container">
<pre class="src src-python">import subprocess
import os
def run_encrypt(data):
env = os.environ.copy()
env['password'] = 'start123'
proc = subprocess.Popen(
['openssl', 'enc', '-des3', '-pass', 'env:password'],
env=env,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE)
proc.stdin.write(data)
proc.stdin.flush()
return proc
procs = []
for _ in range(3):
data = os.urandom(10) # ランダムな10バイトを生成
proc = run_encrypt(data)
procs.append(proc)
for proc in procs:
out, _ = proc.communicate()
print(out[-10:]) # 後ろから10バイトをスライス
>>>
b'\x0f\xbc4\x94O\x93\xa5G\xbe\xe3'
b'm\xb3\x89\r\xc9pP7\xdc\xeb'
b"\xda\x16z N=\x850v'"
</pre>
</div>
<p>
結果は、ランダムなバイト列を暗号化したバイト列なので、意味は特にありません。
</p>
<p>
複数の外部コマンドを呼び出し、それらをパイプでつなぐこともできます。次の例で、 <code>run_hash</code> は <code>openssl</code> を使って入力バイト列のハッシュを求める関数です。 <code>for</code> 文以下では、100バイトのランダムな文字列を生成し、それからハッシュを求めることを3つのサブプロセスで並列実行します。 <code>run_hash</code> 関数呼び出しの引数に <code>encrypt_proc.stdout</code> を指定することで、これらの処理をパイプでつなげています。
</p>
<div class="org-src-container">
<pre class="src src-python">def run_hash(input_stdin):
return subprocess.Popen(
['openssl', 'dgst', '-whirlpool', '-binary'],
stdin=input_stdin, # stdinを指定
stdout=subprocess.PIPE)
encrypt_procs = []
hash_procs = []
for _ in range(3):
data = os.urandom(100) # ランダムな100バイトを生成
encrypt_proc = run_encrypt(data)
encrypt_procs.append(encrypt_proc)
hash_proc = run_hash(encrypt_proc.stdout) # stdoutを指定
hash_procs.append(hash_proc)
encrypt_proc.stdout.close() # 閉じてしまってよい???
encrypt_proc.stdout = None
for proc in encrypt_procs:
proc.communicate()
assert proc.returncode == 0
for proc in hash_procs:
out, _ = proc.communicate()
print(out[-10:])
assert proc.returncode == 0
>>>
'\x99\xd8*\x15~\x88\xd4\x89\x1c3'
b'\x00\x87\xd3\x93Ti\x12v\x01\xaa'
b'\x1b\x85\xdf\x94z\x96\xd3\xb0\x91\x9a'
</pre>
</div>
<p>
結果の文字列に特に意味はありません。
</p>
<p>
子プロセスが終わらない場合が気になるなら、タイムアウト値を指定することも出来ます。
</p>
<div class="org-src-container">
<pre class="src src-python">import subprocess
proc = subprocess.Popen(['sleep', '10'])
try:
proc.communicate(timeout=0.1)
except subprocess.TimeoutExpired:
proc.terminate()
proc.wait()
print('Exist status', proc.poll())
>>>
Exist status -15
</pre>
</div>
<p>
タイムアウト例外が発生したら子プロセスを終わらせます。 <code>proc.poll()</code> でexit codeが得られるようです。
</p>
</div>
</div>
<div id="outline-container-org33d9887" class="outline-3">
<h3 id="org33d9887"><span class="section-number-3">1.2</span> Item 53: ブロックするI/Oにはスレッドを使い、parallelismを避けよ</h3>
<div class="outline-text-3" id="text-1-2">
<p>
普通使うPythonはCPythonですが、CPythonはglobal interpreter lock (GIL)のために複数コアでの並列実行ができません。知りませんでした。衝撃的な事実。。。I/Oによる待ちが無ければ複数スレッド使っても実行時間は短縮されません。
</p>
<p>
更に、ネットワーク等の非同期I/Oでは、より効率の良いasyncio(後で出てきます)を使うことになるため、Pythonでのスレッドの出番はブロックする(ie, 非同期システムコールが無い)ディスクI/Oくらいしか無さそうです。。。というのは言い過ぎでした。キュー等でも使えますね。
</p>
<p>
Pythonでのスレッドの使い方例:
</p>
<div class="org-src-container">
<pre class="src src-python">import select
import socket
import time
from threading import Thread
def slow_systemcall():
select.select([socket.socket()],[],[],0.1)
start = time.time()
threads = []
for _ in range(5):
thread = Thread(target=slow_systemcall)
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
end = time.time()
delta = end - start
print(f'Took {delta:.3f} seconds')
>>>
Took 0.103 seconds
</pre>
</div>
</div>
</div>
<div id="outline-container-orgb61b3f2" class="outline-3">
<h3 id="orgb61b3f2"><span class="section-number-3">1.3</span> Item 54: スレッド間のデータレースを避けるために Lock を使え</h3>
<div class="outline-text-3" id="text-1-3">
<p>
単一コアで動くマルチスレッドにもロックは必要という話。
Pythonでmutexを用意する例:
</p>
<div class="org-src-container">
<pre class="src src-python">from threading import Lock
class LockingCounter:
def __init__(self):
self.lock = Lock()
self.count = 0 # ロック対象
def increment(self, offset):
with self.lock:
self.count += offset
</pre>
</div>
<p>
<code>Lock</code> クラスを使うと <code>with</code> でクリティカルリージョン(ie, ロック範囲)の指定ができるのが便利ですね。
</p>
<p>
おさらいです。mutexとbinary semaphoreは一見とても似ていますが、用途が違います。
</p>
<ul class="org-ul">
<li>mutexは資源の排他(ロック)のため</li>
<li>binary semaphoreはイベントが起きたことを通知(シグナル)するため</li>
</ul>
<p>
スピンロックと違って、両方とも待ちスレッドはスリープします。
</p>
</div>
</div>
<div id="outline-container-org7000a83" class="outline-3">
<h3 id="org7000a83"><span class="section-number-3">1.4</span> Item 55: スレッド間のワークアサインの調整には Queue を使え</h3>
<div class="outline-text-3" id="text-1-4">
<p>
<code>Queue</code> クラスはパイプラインを実装するのに便利です。 <code>Queue</code> の getメソッドは新データが来るまでブロックするため、自前でbusyウエイトを実装する必要がありません。
</p>
<div class="org-src-container">
<pre class="src src-python">from threading import Thread
from queue import Queue
my_queue = Queue() # キューのクラスが用意されています
def consumer():
print('Consumer waiting')
my_queue.get() # ブロックされます
print('Consumer done')
thread = Thread(target=consumer)
thread.start()
print('Producer putting')
my_queue.put(object())
print('Producer done')
thread.join()
>>>
Consumer waiting # アイテム(object)が入ってくるまで待ちます
Producer putting
Producer done
Consumer done
</pre>
</div>
<p>
最初に consumer がキューに来た後、producerが <code>put</code> するまで consumer は動き出さないことがわかります。。
</p>
<p>
キューを作るときにキューバッファのサイズを指定することもできます。いくつのconsumerがキューに入れるかを示し、それ以上のconsumerが来ても <code>put</code> でブロックします。
</p>
<div class="org-src-container">
<pre class="src src-python">from threading import Thread
from queue import Queue
import time
my_queue = Queue(1) # バッファサイズが1
def consumer():
time.sleep(0.1) # まず0.1秒スリープする
my_queue.get()
print('Consumer got 1')
my_queue.get()
print('Consumer got 2')
print('Consumer done')
thread = Thread(target=consumer)
thread.start()
my_queue.put(object()) # producerは立て続けに二つputしようとする
print('Producer put 1')
my_queue.put(object()) # ここでブロックする
print('Producer put 2')
print('Producer done')
thread.join()
>>>
Producer put 1 # 最初にproducerがputするのは前回と同じ
Consumer got 1 # 0.1秒待ってからgetする
Producer put 2 # consumerがgetして、やっとputから戻る
Producer done
Consumer got 2
Consumer done
</pre>
</div>
<p>
この例でのポイントは put 2のメッセージが got 1の後に来ているところです。consumerスレッドはスタートしてからまず0.1秒スリープしますが、その間にメインスレッドのproducerはputできずにブロックされていることがわかります。
</p>
<p>
次に、 <code>Queue.task_done()</code> はそのキューに対してそのタスクが完了したことを宣言します。全てのタスクの完了を待つにはそのキューに対して <code>Queue.join()</code> を呼べばよく、それまでブロックされます。これはスレッドのjoinとは別なことに注意です。
</p>
<p>
キューのタスクが完了するというのは、そのキューからgetしてきた仕事(アイテム)を最後の1個まで、全て処理し終わったという意味です。
</p>
<div class="org-src-container">
<pre class="src src-python">from threading import Thread
from queue import Queue
import time
in_queue = Queue()
def consumer():
print('Consumer waiting')
work = in_queue.get()
print('Consumer working')
time.sleep(1) # この例でのタスクはスリープすること
print('Consumer done')
in_queue.task_done() # タスク完了を宣言する
thread = Thread(target=consumer)
thread.start()
print('Producer putting')
in_queue.put(object())
print('Producer waiting')
in_queue.join() # in_queueの完了(=task_doneが呼ばれる)までブロックされる
print('Producer done')
thread.join()
>>>
Consumer waiting
Producer putting
Producer waiting
Consumer working
# ここで1秒スリープする
Consumer done
Producer done
</pre>
</div>
<p>
この例でのポイントはもちろん、consumer doneまでproducer doneが出ないところです。
</p>
<p>
さて、これらの知識を使ってパイプラインを実装します。パイプラインはdownload, resize, uploadの3ステージからなるとします。写真をカメラからダウンロードして、サイズを変えてまたアップロードする場合を想定しています。
</p>
<div class="org-src-container">
<pre class="src src-python">from threading import Thread
from queue import Queue
import time
def download(item):
pass
def resize(item):
pass
def upload(item):
pass
</pre>
</div>
<p>
<code>Queue</code> を継承した <code>ClosableQueue</code> を定義します。これはメソッド <code>close</code> を持ち、キューにこれ以降の入力が無いことを示す sentinel を入れます。sentinel は歩哨・見張りの意味で、終わりの印です。<a href="./effective1.html">Item 10</a>で出てきましたね。
</p>
<p>
<code>__iter__</code> を準備したことで、このキューを iterate することができます。 <code>get()</code> でキューから写真を取り出し、sentinel以外ならyieldして写真を返します。キューに何も入っていなかったら <code>get()</code> がブロックします。キューから取ってきたアイテムが写真でなくsentinelだったら、終わりの印なのでリターンしています。
</p>
<div class="org-src-container">
<pre class="src src-python">class ClosableQueue(Queue):
SENTINEL = object()
def close(self):
self.put(self.SENTINEL)
def __iter__(self):
while(True):
item = self.get() # 写真を一つ取り出す。無かったらブロックする
try:
if item is self.SENTINEL:
return
yield item # まだfinallyは実行しない
finally:
self.task_done() # この写真の処理が完了
</pre>
</div>
<p>
ここのtry - finallyの使い方に注目します。exceptで例外処理を行わないtry - finallyは、tryブロックの中で何が起こったとしてもfinallyブロックで取得しているロックを解放する(後始末する)ようなユースケースで使うようです。
</p>
<p>
この例ではロックは使っていませんが、Queue.task_doneをロック解放、Queue.joinをロック解放待ちのアナロジーとして考えると、似たユースケースと言えそうです。
</p>
<p>
<code>StoppableWorker</code> は <code>ClosableQueue</code> に対応した新ワーカースレッドです。スレッドは写真ではなくステージ(で作業する人)に対応します。 <code>in_queue</code> から写真(<code>item</code>)を取り出し、処理をして、処理後の写真(<code>result</code>)を <code>out_queue</code> に入れます。
</p>
<div class="org-src-container">
<pre class="src src-python">class StoppableWorker(Thread):
def __init__(self, func, in_queue, out_queue):
super().__init__()
self.func = func # やる作業
self.in_queue = in_queue
self.out_queue = out_queue
def run(self):
for item in self.in_queue: # queueをiterateする
result = self.func(item)
self.out_queue.put(result)
</pre>
</div>
<p>
キューとスレッドを用意します。キューとキューの間にワーカー(スレッド)がいるイメージですね。
</p>
<div class="org-src-container">
<pre class="src src-python">download_queue = ClosableQueue()
resize_queue = ClosableQueue()
upload_queue = ClosableQueue()
done_queue = ClosableQueue()
threads = [
StoppableWorker(download, download_queue, resize_queue),
StoppableWorker(resize, resize_queue, upload_queue),
StoppableWorker(upload, upload_queue, done_queue),
]
</pre>
</div>
<p>
最後にこれらをまとめます。SENTINELを投入する <code>Queue.close()</code> はここで呼んでいるのですね。
</p>
<div class="org-src-container">
<pre class="src src-python">for thread in threads:
thread.start()
for _ in range(1000):
download_queue.put(object()) # object()=写真を入れる
download_queue.close() # SENTINEL投入
download_queue.join() # task_done()が呼ばれるまでここでブロックされる
resize_queue.close() # SENTINEL投入
resize_queue.join() # task_done()が呼ばれるまでここでブロックされる
upload_queue.close() # SENTINEL投入
upload_queue.join() # task_done()が呼ばれるまでここでブロックされる
print(done_queue.qsize(), 'items finished')
for thread in threads:
thread.join()
>>>
1000 items finished
</pre>
</div>
<p>
あれ、まだ終わりじゃありませんでした。。。次は、ステージ毎に複数のワーカースレッドを用意してI/Oの並列度を上げることを考えます。
</p>
<p>
まず、複数スレッドをスタート、ストップさせるヘルパー関数を用意します。 <code>start_threads</code> 関数では引数 <code>count</code> の数だけ <code>StoppableWorker</code> スレッドを作ってスタートし、そのリストを返します。 <code>stop_threads</code> 関数ではキューの <code>close</code> を呼んでsentinelを投入し、キューの <code>join</code> でタスクの完了を待ってからスレッドを完了させます。
</p>
<div class="org-src-container">
<pre class="src src-python">def start_threads(count, *args):
threads = [StoppableWorker(*args) for _ in range(count)]
for thread in threads:
thread.start() # スレッドをスタートさせる
return threads
def stop_threads(closable_queue, threads):
for _ in threads:
closable_queue.close() # SENTINEL投入
closable_queue.join() # 全てのtask_done()を待ち、キューをクローズする
for thread in threads:
thread.join() # 全てのスレッドの完了を待つ
</pre>
</div>
<p>
最後にこれらをまとめます。ダウンロードスレッドは3多重、リサイズは4多重、アップロードは5多重を指定してスレッドを作成しています。後は1000個の写真を投入し、スレッドを1種類ずつ止めていきます。ポイントは、 <code>stop_threads</code> はsentinelを投入し、それが出てくるまでブロックするところでしょうか。このお陰で、後片付けが中途半端な状態で次の <code>stop_threads</code> に行くことはありません。
</p>
<div class="org-src-container">
<pre class="src src-python">download_queue = ClosableQueue()
resize_queue = ClosableQueue()
upload_queue = ClosableQueue()
done_queue = ClosableQueue()
download_threads = start_threads(
3, download, download_queue, resize_queue) # ダウンロードは3多重
resize_threads = start_threads(
4, resize, resize_queue, upload_queue) # リサイズは4多重
upload_threads = start_threads(
5, upload, upload_queue, done_queue) # アップロードは5多重
for _ in range(1000): # 1000個(枚)の写真を投入
download_queue.put(object())
stop_threads(download_queue, download_threads) # 完了待ちする
stop_threads(resize_queue, resize_threads)
stop_threads(upload_queue, upload_threads)
print(done_queue.qsize(), 'items finished')
>>>
1000 items finished
</pre>
</div>
</div>
</div>
<div id="outline-container-org3173f8f" class="outline-3">
<h3 id="org3173f8f"><span class="section-number-3">1.5</span> Item 56: いつコンカレンシーが必要になるかをどう理解するか知れ</h3>
<div class="outline-text-3" id="text-1-5">
<p>
あるワークを、コンカレントに実行できるものにばらまくことを fan-out、ばらまいたものを回収することを fan-inと言うそうです。Pythonにはこれらを実現するツールがたくさんあって、それぞれトレードオフがあります。次の節以降で説明していきます。
</p>
</div>
</div>
<div id="outline-container-orgf59e86a" class="outline-3">
<h3 id="orgf59e86a"><span class="section-number-3">1.6</span> Item 57: オンデマンドのfan-outで新たなスレッドインスタンスを作るのは避けろ</h3>
<div class="outline-text-3" id="text-1-6">
<p>
ダイナミックにfan-out/fan-inを繰り返すような用途や、非常に多くにfan-outするケースにはスレッドは合いません。
</p>
<ul class="org-ul">
<li>1スレッドあたり8MBのメモリを消費する</li>
<li>スレッドの作成、開始、ロックなどでオーバーヘッドが大きい</li>
<li>複雑になりデバッグが大変</li>
</ul>
</div>
</div>
<div id="outline-container-orgc735716" class="outline-3">
<h3 id="orgc735716"><span class="section-number-3">1.7</span> Item 58: コンカレンシーのためにQueueを使うにはリファクタリングが必要なことを理解せよ</h3>
<div class="outline-text-3" id="text-1-7">
<p>
Queueを使うとスレッド数はワーカーの数に限定されるので、上限を定めないスレッドよりはマシですが、仕組みが複雑なことと、仕様変更によっては大きなリファクタリングが必要になるため、よい方法とは言えません。
</p>
</div>
</div>
<div id="outline-container-orgc98e78e" class="outline-3">
<h3 id="orgc98e78e"><span class="section-number-3">1.8</span> Item 59: コンカレンシーでスレッドが必要なら ThreadPoolExecutorを検討せよ</h3>
<div class="outline-text-3" id="text-1-8">
<p>
スレッドプールはなかなか良さそうです。例外を呼び元に伝搬する仕組みもあります。ただ、 <code>max_workers</code> をあらかじめ決めておく必要があることがネックです
</p>
<p>
<a href="https://docs.python.org/3/library/concurrent.futures.html">公式サイト</a>から実装例です。
</p>
<div class="org-src-container">
<pre class="src src-python">import concurrent.futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# {future: url, ...}の辞書がfuture_to_urlに入ります。
# futureはそのcallableの実行を表すオブジェクトです。
# ...この場合はワーカースレッドですね。
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
# as_completedはfuture_to_urlのfuturesの完了(またはキャンセル)した
# インスタンスのiteratorを返します。それをiterateしてfuture
# = スレッドを得ます
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
>>>
'http://www.foxnews.com/' page is 323006 bytes
'http://www.cnn.com/' page is 1131345 bytes
'http://europe.wsj.com/' generated an exception: HTTP Error 404: Not Found
'http://some-made-up-domain.com/' page is 64668 bytes
'http://www.bbc.co.uk/' page is 300118 bytes
</pre>
</div>
<p>
上の例で、 <code>ThreadPoolExecutor</code> によるスレッドプールを <code>executor</code> としています。次の行で <code>URLS</code> リストに入っているURLに対して、複数のスレッドで関数 <code>load_url</code> を実行するようにfan-outしています。そして次の行の <code>for</code> 文で完了したスレッドを刈り取っています(fan-in)。スレッド内で発生した例外は、呼び元で <code>future.result()</code> を呼んで結果を刈り取る時に伝わるようです。意外と簡単に使えるのですね。
</p>
<p>
実際の実行結果で例外が発生したのはsome-made-up-domainではなくeurope.wsj.comの方だったのが笑えます。ブラウザーで見ると、前者はドメインが売りに出されており、後者はwsj.comにフォワードされました。
</p>
</div>
</div>
<div id="outline-container-orgb1e74df" class="outline-3">
<h3 id="orgb1e74df"><span class="section-number-3">1.9</span> Item 60: コルーチン(Coroutines)を使って高コンカレントI/Oを実現せよ</h3>
<div class="outline-text-3" id="text-1-9">
<p>
<a href="https://docs.python.org/3/library/asyncio.html">Asyncronus I/O</a>です。これは1スレッド内で、スレッドとは異なる仕組みを使ってコンテキストスイッチを行います。スレッドはOSカーネルの仕組みを使って、プリエンプティブにコンテキストスイッチしますが、Async I/Oでは長い待ちが発生する時(eg, ネットワーク待ち)に自発的に処理の実行を明け渡します。Async I/OはCPUネックの処理では効果がありません。Async I/Oの仕組みはスレッドよりもずっと軽く、数千のコンテキストをコンカレントに処理することができます。
</p>
<p>
PythonのAsynchronous I/Oについては、Real Pythonの<a href="https://realpython.com/async-io-python/">この記事</a>が最新(Python 3.7)の情報を元に、わかりやすく詳細に解説しています。PythonのAsynchronous I/Oの仕組みはまだ整備されている途中であり、ネット上には古い情報が多く混乱しがちですが、この記事は情報を整理する意味でもお勧めです。
</p>
<p>
<a href="./effective2.html">Item 33</a>にてジェネレーターとコルーチンについて書きました。コルーチンはジェネレーターの <code>yield</code> 等の仕組みを使って、ルーチンの途中で他のコルーチンにコンテキストスイッチを行い、後で再び中断した行から処理を再開することができます。
</p>
<p>
最新のPythonではジェネレーターは表に出ず、新たに導入した <code>async/await</code> のシンタックスを使ってコルーチンを使います。Pythonでは、 <code>async def</code> で定義された関数がコルーチンです。コルーチン内の <code>await</code> 文でコンテキストスイッチを行います。ジェネレーターのyield文がそうであったように、コルーチンではawait文のところから、以前のコンテキストのまま処理が再開されます。(実は、 <code>await</code> は <code>yield from</code> と等価だそうです)
</p>
<div class="org-src-container">
<pre class="src src-python">import asyncio
async def some_coroutine():
...
await slow_io_disk_read()
...
await slow_io_network_transfer()
...
</pre>
</div>
<p>
上記のコルーチンの例では、 <code>await slow_io_disk_read(), await slow_io_network_transfer()</code> で待ちが発生し、別のコルーチンにコンテキストスイッチします。
</p>
<p>
以下にコルーチンの基本的な実装パターンを示します。(Real Pythonの記事より)
</p>
<div class="org-src-container">
<pre class="src src-python">async def count():
print("One")
await asyncio.sleep(1)
print("Two")
async def main():
# コルーチンcount()を3つ実行する
await asyncio.gather(count(), count(), count())
if __name__ == "__main__":
import time
s = time.perf_counter()
# コルーチンメイン関数実行(完了を待つ)
asyncio.run(main())
elapsed = time.perf_counter() - s
print(f"{__file__} executed in {elapsed:0.2f} seconds.")
</pre>
</div>
<p>
<code>asyncio.run</code> でコルーチンの <code>main</code> 関数を起動し、 <code>main</code> の <code>await asyncio.gather</code> からコルーチン <code>count</code> を3つ起動〜回収します。 <code>asyncio.run</code> はこれらが全て実行完了するまでブロックされて待ちます。
</p>
<p>
コルーチンの <code>main</code> を用意し、そこから個別のコルーチンを複数起動するやり方です。
</p>
<p>
<code>asyncio.run</code> 関数はPython 3.7で導入され、コルーチンを起動する標準の方法になりました。 <code>run</code> 1行でイベントループを生成、タスクを起動〜完了〜回収、イベントループのクローズまで行います。以下の古い書式と同じ事をします。
</p>
<div class="org-src-container">
<pre class="src src-python">loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()
</pre>
</div>
<p>
<code>run</code> のお陰でイベントループを意識する必要が無く、使いやすくなりました。(<code>main</code> 関数も隠蔽してくれるともっとすっきりする気がしますが)
</p>
</div>
</div>
<div id="outline-container-orgd86deee" class="outline-3">
<h3 id="orgd86deee"><span class="section-number-3">1.10</span> Item 61: スレッド化されたI/Oをasyncioにポートする方法を知れ</h3>
<div class="outline-text-3" id="text-1-10">
<p>
主な作業
</p>
<ul class="org-ul">
<li>I/O待ちの発生する箇所に <code>await</code> を付ける。</li>
<li>待ちの発生する箇所を含む関数やfor, with等のブロックに <code>async</code> を付ける</li>
<li>関数名、クラス名を修正する</li>
<li>asyncioのビルトインモジュールを使う</li>
<li>スレッドの仕組みは全て置き換える</li>
</ul>
<p>
というところでしょうか。まだasyncioに未対応のモジュールもあることに注意。
</p>
</div>
</div>
<div id="outline-container-org99133da" class="outline-3">
<h3 id="org99133da"><span class="section-number-3">1.11</span> Item 62: asyncioへの移行を楽にするため、スレッドとコルーチンを混在させよ</h3>
<div class="outline-text-3" id="text-1-11">
<p>
asyncioへの移行はblocking I/Oには効果がありません。例えばディスクからリードするシステムコール <code>read</code> は完了までスリープせずにブロックされるため、別タスクにコンテキストスイッチをする機会がありません。blocking I/Oにはスレッドが有効です。
</p>
<p>
用途によってasyncioとスレッドを使い分ける(混在させる)ことが必要です。
</p>
</div>
</div>
<div id="outline-container-org3b33c3c" class="outline-3">
<h3 id="org3b33c3c"><span class="section-number-3">1.12</span> Item 63: レスポンスを最大化するためにはasyncioのイベントループをブロックするな</h3>
<div class="outline-text-3" id="text-1-12">
<p>
以下の例のように、コルーチン用のイベントループ内でblocking I/Oをするとよくありません。
</p>
<div class="org-src-container">
<pre class="src src-python">async def run_tasks(handles, interval, output_path):
with open(output_path, 'wb') as output:
async def write_async(data):
output.write(data) # ブロックされるI/O
tasks = []
for handle in handles:
coro = tail_async(handle, interval, write_async)
task = asyncio.create_task(coro)
tasks.append(task)
await asyncio.gather(*tasks)
</pre>
</div>
<p>
解決策として、ファイル操作を別スレッドとして独立させます。
</p>
<div class="org-src-container">
<pre class="src src-python">async def run_fully_async(handles, interval, output_path):
async with WriteThread(output_path) as output:
tasks = []
for handle in handles:
coro = tail_async(handle, interval, output.write)
task = asyncio.create_task(coro)
tasks.append(task)
await asyncio.gather(*tasks)
</pre>
</div>
<p>
そしてそのために、スレッドのクラスを <code>async with</code> 文で扱えるように <code>aenter, aexit</code> を用意します(<a href="https://www.python.org/dev/peps/pep-0492/">PEP 492</a>)。このスレッドの使い方は便利そうです。
</p>
<p>
ところで、ここではファイル操作系をスレッドとして独立させていますが、<a href="https://github.com/Tinche/aiofiles">aiofiles</a>を使えば、ファイル操作をasync化できそうです。
</p>
<div class="org-src-container">
<pre class="src src-python">async with aiofiles.open('filename', mode='r') as f:
contents = await f.read()
print(contents)
'My file contents'
</pre>
</div>
<p>
aiofilesのドキュメントを見ると、ファイル操作を別のスレッドプールにdelegateするとあります。
</p>
<blockquote>
<p>
aiofiles helps with this by introducing asynchronous versions of files that support delegating operations to a separate thread pool.
</p>
</blockquote>
<p>
こういうライブラリを使うのと、自分でスレッドを作るのと、どちらがいいのでしょうね。。
</p>
</div>
</div>
<div id="outline-container-org6044d80" class="outline-3">
<h3 id="org6044d80"><span class="section-number-3">1.13</span> Item 64: 真の並行動作にはconcurrent.futuresを検討せよ</h3>
<div class="outline-text-3" id="text-1-13">
<p>
Pythonのglobal interpreter lock (GIL)のせいで、マルチコアを使った真の並行動作は簡単には実現できません。Cエクステンションは高速化には適していますが、大きなコストがかかります。通常、遅くなる原因は多くの場所にあり、一部だけエクステンションとして抜き出して高速化する訳にはいかないようです。
</p>
<p>
<code>concurrent.futures</code> ビルトインモジュール経由でアクセスできる <code>multiprocessing</code> ビルトインモジュールが使えるかもしれません。利用する側は <code>ThreadPoolExecutor</code> の代わりに <code>ProcessPoolExecutor</code> で置き換えるだけでよいです。
</p>
<p>
ただしこれは、自プロセスと子プロセスの間のデータのやりとりでpickleを使ったバイナリエンコード・デコードが必要で、オーバーヘッドが馬鹿になりません。よって <code>ProcessPoolExecutor</code> で効果があるのは、プロセス間のデータ転送量及び頻度が少ない場合に限られます。
</p>
<p>
<code>multiprocessing</code> は共有メモリやプロセス間のロック、キュー、プロキシーといったより高度な手段を提供してはいますがが、これらは非常に複雑だそうです。
</p>
<p>
こんなところでPythonの限界が見えてきてしまいました。。。(インタプリター言語に何を求めているのか、という話もありますが)
</p>
</div>
</div>
</div>
<div id="outline-container-orgf5571e4" class="outline-2">
<h2 id="orgf5571e4"><span class="section-number-2">2</span> Chapter 8: 堅牢性(robustness)と性能</h2>
<div class="outline-text-2" id="text-2">
</div>
<div id="outline-container-orgc49a200" class="outline-3">
<h3 id="orgc49a200"><span class="section-number-3">2.1</span> Item 65: try/except/else/finallyで各ブロックを有効に使え</h3>
<div class="outline-text-3" id="text-2-1">
<p>
<code>try/except/else/finally</code> ブロックを整理します。
</p>
<div class="org-src-container">
<pre class="src src-python">def some_func():
# 例えばファイルをオープンする処理
# ここでの例外はすぐに呼び元に上がる
try:
# 例外が上がる可能性のあるオペレーション
except ZeroDivisionError as e:
# 想定した例外が上がった場合
else:
# tryで例外が上がらなかった場合
# ここでの例外は呼び元に伝搬する
finally:
# (tryに来ていたら)関数がリターンする前に必ず実行される
# 例えばファイルのクローズ処理
</pre>
</div>
</div>
</div>
<div id="outline-container-org211bb0b" class="outline-3">
<h3 id="org211bb0b"><span class="section-number-3">2.2</span> Item 66: try/finally挙動を再利用するためにcontextlibとwithステートメントを考えよ</h3>
<div class="outline-text-3" id="text-2-2">
<p>
<code>@contextmanager</code> デコレーターで修飾した関数はコンテキストマネージャーとなり、 <code>with</code> ステートメントで使えるようになります。正式に <code>__enter__</code>, <code>__exit__</code> を準備するよりも簡単です。
</p>
<div class="org-src-container">
<pre class="src src-python">from contextlib import contextmanager
import logging
@contextmanager
def debug_logging(level):
logger = logging.getLogger()
old_level = logger.getEffectiveLevel()
logger.setLevel(level) # 一時的に指定ログレベルを設定
try:
yield
finally:
logger.setLevel(old_level) # ログレベルを戻す
</pre>
</div>
<p>
上記関数では(一時的に) <code>level</code> にデバッグレベルを変更します。
</p>
<div class="org-src-container">
<pre class="src src-python">def my_function():
logging.debug('Some debug data') # DEBUG
logging.error('Error log here') # ERROR
logging.debug('More debug data') # DEBUG
with debug_logging(logging.DEBUG): # DEBUGレベルのブロック
print('* Inside:')
my_function()
print('* After:')
my_function()
>>>
* Inside:
DEBUG:root:Some debug data # DEBUGレベルが表示されている
ERROR:root:Error log here
DEBUG:root:More debug data # DEBUGレベルが表示されている
* After:
ERROR:root:Error log here
</pre>
</div>
<p>
上記 <code>with</code> ブロックはデバッグレベルをDEBUGにします。出力結果から、実際にwithブロックでのみDEBUGレベルのメッセージ出力されていることがわかります。
</p>
<p>