>  > DIXCEL (ディクセル) フロント ブレーキローター HD 1214947 BMW F20 1S20 16/05~ 118d

DIXCEL (ディクセル) フロント ブレーキローター HD 1214947 BMW F20 1S20 16/05~ 118d

index

1S20 DIXCEL BMW F20 16/05~ BMW ブレーキローター HD 118d 1214947 (ディクセル) フロント F20

ver2.6から追加された multiprocessingモジュールを使って並列処理させる. threadingを使う方法もあるが,クラス継承しないと行けないとか ちょっと遅いとか上手くCPU使えてないとかちょっと不満があるので,multiprocessingを使って話をする. これはthreadingと似たようなAPIを提供するのだが,GILを効率的に回避する.(theadではなくsubprocessを使う)

何はともあれサンプルを示す.単純にカウントダウンするという面白みもなんともないもの. 1Gをカウントダウンする.この場合4スレッドに分散させるので250Mをカウントしていることになるが….
threading01.py

import threading
import time
def countDown(n):
 while n > 0:
 n -= 1
class TestThread(threading.Thread):
 def __init__(self, n):
 threading.Thread.__init__(self)
 self._n = n
 def run(self):
 countDown(self._n)
n = int(1e9)
n1, n2, n3, n4 = int(n/4), int(n/4), int(n/4), int(n/4)
jobs = [TestThread(n1), TestThread(n2), TestThread(n3), TestThread(n4)]
start_time = time.time()
for j in jobs:
 j.start()
for j in jobs:
 j.join()
finish_time = time.time()
print(finish_time - start_time)

やることは単純で,threading.threadを継承するクラスを作ってrunメソッドをoverwrite(上書き)するだけ. で,あとはインスタンスを作ってstartとしてjoinする.

multiprocessing

次.multiprocessingを使った場合.こちらはクラスを継承する必要がなく,簡単にスレッドを実装できる. 上と同じでカウントダウンをさせましょうか.
multiprocessing01.py

from multiprocessing import Process
import time
def countDown(n):
 while n > 0:
 n -= 1
n = int(1e9)
n1, n2, n3, n4 = int(n/4), int(n/4), int(n/4), int(n/4)
jobs = [
 Process(target=countDown, args=(n1,)),
 Process(target=countDown, args=(n2,)),
 Process(target=countDown, args=(n3,)),
 Process(target=countDown, args=(n4,)),
 ]
start_time = time.time()
for j in jobs:
 j.start()
for j in jobs:
 j.join()
finish_time = time.time()
print(finish_time - start_time)

こちらのほうが簡単だね.関数を作ってProcessに投げる.後はstartしてjoinするだけ. 実行すると気づくと思うが,multiprocessingはちゃんと子プロセスを作る. topやらで見ると確かにProcessの数だけpythonが動いている.
一方でthreadingの方はそうではない.プロセスが一つだけ動くだけ. これや実行速度やCPU占有時間にも現れている. threadingは一つプロセスの中で複数のスレッドを立てて実行しているのに対して, multiprocessingは作ったスレッド分だけ子プロセスを作ってそれを制御する. (GILが大きなネックになっている証拠?) という事で [ENDLESS] エンドレス ブレーキパッド CC35 type-E (N84M) フロント用 ポルシェ 911 (930) 3.0/3.3 ターボ 77/9~89/12,今回はmultiprocessingを中心に書く.

DIXCEL (ディクセル) フロント ブレーキローター HD 1214947 BMW 単品【店頭受取対応商品】 F20 (99/8~01/2) 1S20 16/05~ 118d

もう少しmultiprocessingについて

一方のプロセスでデータを取って片方で処理する場合,データ取り終わって処理開始させるよりはデータは連続して取って処理は別に動かすというのが普通である. だがしかし,これが結構難しい. threadingは一つのプロセスだけで処理するから通信が楽であるが,独立したプロセスを作ってそれの通信と言うのは難しい. C++での実装の際にも問題になる(らしい)のだが,どうやって解決するか?
いくつかの方法が供給されている.

  1. shared memoryを使う
  2. server processを使う

1は一般的な共有メモリを使うという方法.型は ,arrayモジュールの書式に従う(ctypeと言うべきか). 大きく分けて2つあり,ArrayとValueである. 公式サンプル (ctypeについてはいつか…いつの日にか….cのsharedが使えるようにする,早い話)

2は上記ctypeでは記述が難しいlist,dictionary, Namespace, Lock, etcを扱える. サーバとクライアントの仕組みを使ってpythonオブジェクトプロキシでやり取りする。
multiproc_manager01.py

#!/usr/bin/env python
#coding:utf-8
from multiprocessing import Process, Manager
def func(l, d):
 l.append(1)
 d.update({2:"hello"})
 d[10] = "world"
if __name__ == "__main__":
 manager = Manager()
 l = manager.list(range(2))
 d = manager.dict()
 p = Process(target=func, args=(l,d))
 p.start()
 p.join()
 print(l)
 print(d)
$ python multiproc01.py
[0, 1, 1]
{2:"hello", 10:"world"}

と通信が出来る.これ以上は骨が折れるので,簡単なサンプルを示す.
dstat-modoki.py

#!/usr/bin/env python
#coding:utf-8
import time
from multiprocessing import Process, Manager, Array
def getCPUInfo():
 
 with open("/proc/stat") as f:
 lines = [[float(elm) for elm in v.split()[1:]] 
 for v in f.readlines() if v[:3] == "cpu"]
 return lines
 
def getCPUUsage(interval=1.):
 s1 = getCPUInfo()
 time.sleep(interval)
 s2 = getCPUInfo()
 num_of_cpu = len(s1) - 1
 cpu_total = [map(lambda x,y:(y-x)/num_of_cpu/interval/100. , s1.pop(0), s2.pop(0))]
 return cpu_total + [[(s2[i][j] - s1[i][j])/interval/100. for j,eml in enumerate(v)] \
 for i,v in enumerate(s1)]
def getMEMInfo():
 
 with open("/proc/meminfo") as f:
 lines = [ v.strip().split(":") for v in f.readlines()]
 
 mem_dict = {}
 for l in lines:
 mem_dict[l[0]] = float(l[1].strip().split()[0])
 return mem_dict
def getMEMUsage():
 memory_usage = getMEMInfo()
 return ((memory_usage["MemFree"] + memory_usage["Buffers"] + memory_usage["Cached"]), \
 memory_usage["MemTotal"])
def getLoadAvgInfo():
 with open("/proc/loadavg") as f:
 return [float(v) for v in f.read().split()[:3]]
def getNETInfo():
 """
 Inter -| Receive | Transmit
 face | bytes packets errs drop fifo frame compressed multicast| bytes ....
 lo: 0 0 0 ...
 """
 with open("/proc/net/dev") as f:
 lines = [v.split(":") for v in f.readlines()[2:]]
 net_dict = {}
 for l in lines:
 net_dict[l[0].strip()] = [int(v) for v in l[1].split()]
 return net_dict
def getNetUsage(interval=1., total=False):
 """
 dictionary of list
 receive's bytes, packets, errs, drop, fifo, compressed, multicast, 
 transmit's bytes, packets, errs, drop, fifo, compressed, multicast
 total 16 elements
 """
 net_dict1 = getNETInfo()
 time.sleep(interval)
 net_dict2 = getNETInfo()
 
 net_usage = {}
 for k,v in net_dict1.items():
 net_usage[k] = [0.]*16
 for i, elm in enumerate(v):
 net_usage[k][i] = (net_dict2[k][i] - elm)/interval
 if total:
 return (net_usage, net_dict2)
 else:
 return net_usage
def getCPU(l, interval=1):
 l += getCPUUsage(interval)
def getMEM(arr,):
 arr[0], arr[1] = getMEMUsage()
def getNET(d, interval=1):
 d.update(**getNetUsage(interval))
def getCPUStr(arr, progbar=False, bar="|"):
 if progbar:
 return "{0:6.1%} [{1:10s}]".format(arr[0][0], bar*int(arr[0][0]*10))
 else:
 return "C:{0:6.1%}".format(arr[0][0])
def getMEMStr(arr):
 return "M:{0:4.1f}/{1:4.1f}GB".format((arr[1] - arr[0])/1024/1024, arr[1]/1024/1024)
def getNETStr(dic, key):
 return "D:{0:7.3f}MB U:{1:7.3f}MB".format(dic[key][0]/1024/1024, dic[key][8]/1024/1024)
if __name__ == '__main__':
 ## ネットワークデバイス名を指定.eth0とかeth1とか
 ## intervalで更新時間を表示できる
 net_dev = "eth1"
 interval = 1.
 manager = Manager()
 cpu_usage = manager.list()
 mem_usage = Array("d", (0., 0.))
 net_usage = manager.dict()
 jobs = [Process(target=getCPU, args=(cpu_usage, interval)),
 Process(target=getMEM, args=(mem_usage, )),
 Process(target=getNET, args=(net_usage, interval))]
 for j in jobs:
 j.start()
 for j in jobs:
 j.join()
 
 message_cpu = getCPUStr(cpu_usage, False)
 message_net = getNETStr(net_usage, net_dev)
 message_mem = getMEMStr(mem_usage)
 print("{0} {1} {2}".format(message_cpu, message_net, message_mem))

dstatの代わり.tmuxのために作った.
CPU使用量(/proc/stat),物理メモリ(/proc/meminfo),ネットワーク(/proc/net/dev),ロードアベレージ(/proc/loadavg)を並列して取得する.
(実際はロードアベレージ以外を取得する)
並行でやることもないようなものだが,サンプルとしては丁度よいか.

中味はただパースして数値に変換しているだけ.難しい処理は行なっていない. できるだけワンラインで書けるように,処理を細かい処理で構成するようにした. 各関数で処理した値は,Managerクラスを使って渡している. 何が何であるのかはっきりしているのであればArrayを使っても構わない.

Queue/Pipe

次にQueue,Pipeを使っての処理.Queueはいわゆる生産者/消費者モデルの簡単な例である. Pipeも似たものであるが,双方向の通信が可能.Queueは一方通行という感じか.
例のごとく公式のサンプルを張り付けると,

from multiprocessing import Process, Queue
def f(q):
 q.put([42, None, 'hello'])
if __name__ == '__main__':
 q = Queue()
 p = Process(target=f, args=(q,))
 p.start()
 print q.get() # prints "[42, None, 'hello']"
 p.join()

別プロセスを走らせる.そのプロセス内でQueueインスタンスのputメソッドを呼び出してデータ[42, None, 'hello']を"put"する. メインのプロセスでgetメソッドを使ってそれを取り出す.
時間のかかる処理やかかる時間がまちまちな処理を走らせデータをputする. queueに置かれたデータをもう一方のプロセスで処理する.例えばファイルやDBに書き出す様な. そういった用途に使える.

Pipeであるが,Queueの発展のようなもので相互でのやり取りが可能である.

from multiprocessing import Process, Pipe
def f(conn):
 conn.send([42, None, 'hello'])
 conn.close()
if __name__ == '__main__':
 parent_conn, child_conn = Pipe()
 p = Process(target=f, args=(child_conn,))
 p.start()
 print parent_conn.recv() # prints "[42, None, 'hello']"
 p.join()

multiproc_pipe01.py

# 上のサンプルが物足りないので通信
# threadingのLockも利用して表示のロックを図る
# これをしないとスレッドが別々に非同期に動いてしまう.
from multiprocessing import Pipe, Process, Lock
import time
import sys
def func(conn, lc):
 conn.send("< hello")="" lc.acquire()="" while="" true:="" if="" conn.poll():="" break="" else:="" sys.stdout.write(".")="" sys.stdout.flush()="" time.sleep(1.)="" sys.stdout.write("\n")="" print(conn.recv())="" lc.release()="" conn.close()="" if="" __name__="=" '__main__':="" lc="Lock()" parent_conn,="" child_conn="Pipe()" p="Process(target=func," args="(child_conn," lc))="" p.start()="" ##="" 画面上に表示させるまでスレッドをロック="" lc.acquire()="" print(parent_conn.recv())="" lc.release()="" #="" ロック解除="" time.sleep(3.)="" parent_conn.send("="">> hi! what's up?")
 p.join() 

Pipeでは子プロセスを殺す時,Queueよりは簡単に組める.
multiproc_pipe02.py

from multiprocessing import Pipe, Process
import time
import datetime
import random
import math
def genRandom(conn):
 while True:
 if conn.poll():
 if conn.recv() is None:
 conn.close()
 break
 else:
 time.sleep(random.randint(1,15)*0.01)
 conn.send(random.normalvariate(10, 0.6))
 print("connection closed")
 conn.close()
def cleanUp(arr):
 return filter(lambda x: x is not None, arr)
def getMean(arr):
 return 1.*sum(arr)/len(arr)
def getStd(arr):
 m = getMean(arr)
 return math.sqrt(sum(map(lambda x: (x - m)*(x - m), arr))/len(arr))
if __name__ == '__main__':
 ## create Pipe instance and Process instance
 ## Pipe returns two instance; parent and child
 ## child will run in subprocess
 ## parent will manage child
 parent_conn, child_conn = Pipe()
 p = Process(target=genRandom, args=(child_conn, ))
 p.daemon = True
 ## check current date
 start_time = datetime.datetime.now()
 p.start() # process start
 timeinterval = 10 # sec
 timeinterval = datetime.timedelta(seconds=timeinterval)
 sample_size = 1e2
 i = 0
 data_array = [None] * int(sample_size)
 while i < int(sample_size):="" print(p.is_alive())="" current_time="datetime.datetime.now()" if="" (current_time="" -="" start_time)=""> timeinterval: # elapsed setting value, break loop
 print("time out!")
 break
 else:
 res = parent_conn.recv()
 data_array[i] = res
 if i % 10 == 0 and i != 0:
 print("{0:%x %X}: {1:.2f}".format(datetime.datetime.now(), res))
 i += 1
 
 ## at first send "Signal" to child
 parent_conn.send(None)
 p.join() # join Process
 data_array = cleanUp(data_array)
 print("total event: {0}".format(i))
 print("mean: {0}, std: {1}".format(getMean(data_array), getStd(data_array)))
 print("done")

子プロセスはランダム秒待って乱数を親プロセスに送る. 親プロセスはひたすらそれを集める.
もし子プロセスが一定時間経過した,あるいは一定数のイベント集め終わったならば 子プロセスを終了し集めたデータの平均値と偏差を出力する.
子プロセスを親は直接殺さない. 終了のシグナルとしてNoneを子プロセスに送り ヘッドライト Projector Angel Eye Headlight LED Tail Light Foglights High Stop Ram 4.7L 07 08 プロジェクターエンジェルアイヘッドライトLEDテールライトフォグライトハイストップラム4.7L 07 08,子プロセスは読み取るべきデータがあり(pollメソッドで判断) かつそれがNoneで有るときにプロセスを終了する.
子プロセスが終了するとProcessインスタンスが勝手にterminateするのでjoin出来る. 後は後処理で 【プロミュー】送料無料【project mu】トヨタ アルテッツァ NS-C ブレーキパッド リア ACR30W/ 40W エスティマ (03/5~05/12),平均値・偏差を求めて出力する.

恐らく,server/clientモデルで作った方がプロセスの終了などを管理しやすくなると思うのだが… Pipeを使ってもまぁまぁなものが作れるので,それはそれで便利かと思われる.

append

例のエキスパートPythonプログラミングによると, multiprocessingはthreadingのラップではなくos.fork+subprocessの組み合わせのようなものらしい…. プロセスが作られた瞬間自動的にforkしてプロセスを走らせるとのこと. また, 共有メモリスペースを提供するmultiprocessing.Arrayとmultiprocessing.Valueクラスは極力使わないほうが良い (pp365)と書かれている. 並列化のボトルネックやコードの複雑化を引き起こすから ディクセル Premiumタイプ ブレーキパッド リア BMW ALPINA E34,だそうだ. もしデータをやりとりしたければ,ManagerのValue, dictやlist,あるいはctypeを使うべきだ,という事だろう.
(multiprocessingのValueではなく,managerクラスのValueやArrayを使うべきである.後者はProxyオブジェクトなので 安全に共有メモリにアクセスでき,必要であればロックを掛けることが出来る.)

実は最後のサンプルには落とし穴があった.処理に時間がかかる場合,L59のwhileループで詰まる. ここではサンプルサイズと経過時間をチェックしているのだが,elseの中つまり処理に時間を取られるとここで滞ってしまう. 従って目的の時間を大幅に過ぎてからループを抜ける可能性があるのだ.
そこでこれを回避するために,

DIXCEL (ディクセル) フロント ブレーキローター HD 1214947 BMW F20 1S20 16/05~ 118d

26日1時59分まで【エントリーポイント10倍】 送料無料 ダンロップ ルマン5 LE MANS V 165/60R15 165/60-15 H 4本 バルブ付 ハスラー フレアクロスオーバー キャスト ソリオ kei, [爪付ジャッキ]【送料無料】(株)今野製作所 イーグル 低床・レバー回転・安全弁付爪つきジャッキ 爪能力10t 爪ロングタイプ G-200TL 1台【456-0311】【代引不可商品】【北海道・沖縄送料別途】【smtb-KD】, Kumoi Motors コペン L880K マーチンフロントバンパー 塗装済み, カードでポイント最大34倍 3/21(木)20:00~3/26(火)1:59迄 ピレリ P ZERO ピーゼロ NERO ネロ GT サマータイヤ 225/50R17 WEDS ヴォルガ7 VOLGA7 取り寄せ ホイールセット 4本 17インチ 17 X 7 +38 5穴 114.3, K&H ケイアンドエイチ シート本体 ペアライド タック <セミオーダー> カラー:レーシングオレンジ ツアラー系 96- SWAGE-LINE スウェッジライン フロント ブレーキホースキット ホースの長さ:200mmロング ホースカラー:クリア NS400, レーシングスライダー 3点SET(II)R AGRAS(アグラス) FZ1・FZ1 FAZER, GRONDEMENT グロンドマン その他シートパーツ 国産シートカバー 張替タイプ カラー:エナメルホワイト/透明ダブルステッチ DJ-1 RR (AF19), ヘッドライト CCFL Angel Eye Headlamps LED Taillamps Glass Fog Roof Stop 99-02 Sierra 4.3L V6 CCFLエンジェルアイヘッドランプLEDタイルランプガラスフォグ屋根ストップ99-02 Sierra 4.3L V6, クッコ KUKKO 油圧式オートグリッププーラー 250mm 845-5-B-KU HD店 RALLY591スーパーライトキャリア [VANVAN200(~'03)]

DIXCEL (ディクセル) フロント ブレーキローター HD 1214947 BMW F20 1S20 16/05~ 118d:アクティブ ACTIVE ハイスロKIT TYPE-2 テフロンインナー 巻取Φ40 T-GLD 1069629 【★送料無料キャンペーン中】 【D2 レーシングスポーツ】GOLF (III) (Only for models with two-piece OE caliper) 93~97 4 X 100ストリートキャリパー (ダストカバー付) 6 POT 356X32 フロー ティング

DIXCEL (ディクセル) フロント ブレーキローター HD 1214947 BMW F20 1S20 16/05~ 118d.15インチタントL370系ENKEI オール オールテン マシニングガンメタリック 5.0Jx15SINCERA SN832i 165/55R15 トヨタ プロナード H12.4-H15.5 MCX20 フォグ LEDヘッドライト HB4 LinksAuto 2nd最新PLUS トップのメーカーチップ オールインワン一体型LED JA-X3 高輝度 色温度再設定可能 車検適合 2年保証 2本セット 新品 税込 送料無料

NGK(エヌジーケー) トヨタ カローラ2/コルサ/ターセル プラグ 【メール便】【送料無料】イリジウムIXプラグ 1979/6~1982/5 ターセル AL11 ■エンジン:2A-U ■排気量:1300 4本セット 【USA在庫あり】 ボルテックス Vortex クラッチレバー ショート ZX900、Z750 黒 577367 HD店

,もうひとつ処理専門のプロセスを作りデータを投げてやるのだ.
それがこれ.multiproc_pipe03.py (要pyROOT)

メインのスクリプトでは,単にプロセスの管理だけを行なっている. 一定時間経過したあるいは一定のイベント数に到達したかを判断し,それに応じてプロセスに司令を与える.

で,コレで何が出来るかというと仕事の分担が出来る. 要するにデータ収集とデータ処理を並列しておこなうとき楽なのだ. 処理に時間がかかる場合,1つのスレッドで仕事をさせる場合DAQにデッドタイムが生じてしまう. 結局そんなシビアな条件を課すんだったら 永井電子/ULTRA フルオートタイマー ハーネスセット レッド トヨタ セリカ E-ST185 3S-GTE 2000cc 1989年08月~1993年09月,

DIXCEL (ディクセル) フロント ブレーキローター HD 1214947 BMW F20 1S20 16/05~ 118d

,Cppで書いちゃうけどね….

{yahoojp} {quark.kj.yamagata-u.ac.jp}
{yahoojp}jpprem01-zenjp40-wl-zd-32704