マルチプロセッシング:同時HTTP非同期リクエストのCPU使用率を最適化する

UrbiJr

サイト/ URLのリスト(時間の経過とともに変化する可能性があります)をダウンロードする必要があり、現在multiprocessing.Manager().Queue()、そのリストを送信および更新するために使用しています。
各URL /タスクを毎秒チェックする必要があります。したがって、各タスクは基本的に終了しません(ユーザーの中断など、特定の条件が満たされるまで)。優れた非同期HTTPクライアントmultiprocessing.Process()組み合わせると問題が解決するasyncio思いました残念ながら、50以上のURLを送信した後でも、CPU使用率が非常に高くなっています。実行中-タスクはすべての要求をしていないときは、差分自分に気づくでしょう-と、彼らはされている場合-実行しています- 。mock_request()do_request()

各ケースを再現する例を次に示します(CTRL + Cを押して、いつでも正常に終了します)。

import asyncio, os, sys, time, httpx
import multiprocessing
import queue as Queue

class ExitHandler(object):
    def __init__(self, manager, queue, processes):
        self.manager = manager
        self.queue = queue
        self.processes = processes
        
    def set_exit_handler(self):
        if os.name == "nt":
            try:
                import win32api
                win32api.SetConsoleCtrlHandler(self.on_exit, True)
            except ImportError:
                version = ".".join(map(str, sys.version_info[:2]))
                raise Exception("pywin32 not installed for Python " + version)
        else:
            import signal
            signal.signal(signal.SIGINT, self.on_exit)
            #signal.signal(signal.CTRL_C_EVENT, func)
            signal.signal(signal.SIGTERM, self.on_exit)

    def on_exit(self, sig, func=None):
        print('[Main process]: exit triggered, terminating all workers')
        STOP_WAIT_SECS= 5 
        for _ in range(N_WORKERS):
            self.queue.put('END')
        
        try:
            end_time = time.time() + STOP_WAIT_SECS
            # wait up to STOP_WAIT_SECS for all processes to complete
            for proc in self.processes:
                join_secs = max(0.0, min(end_time - time.time(), STOP_WAIT_SECS))
                proc.join(join_secs)

            # clear the procs list and _terminate_ any procs that have not yet exited
            while self.processes and len(self.processes) > 0:
                proc = self.processes.pop()
                if proc.is_alive():
                    proc.terminate()
            
            self.manager.shutdown()

            # finally, kill this thread and any running
            os._exit(0)
        except Exception:
            pass

async def mock_request(url):

    # we won't do any request here, it's just an example of how much less CPU
    # each process consumes when not doing requests

    x = 0
    while True:
        try:
            x += 1
            print('Finished downloading {}'.format(url))
            await asyncio.sleep(1)
        except asyncio.CancelledError:
            return

async def do_request(url):

    while True:
        try:
            # I use httpx (https://github.com/encode/httpx/) as async client for its simplicity
            # feel free to use your preferred library (e.g. aiohttp)
            async with httpx.AsyncClient() as s:
                await s.get(url)
                print('Finished downloading {}'.format(url))
                await asyncio.sleep(1)
        except asyncio.CancelledError:
            return

def worker(queue):
    
    try:
        event_loop = asyncio.get_event_loop()
        event_loop.run_until_complete(request_worker(queue))
    except KeyboardInterrupt:
        pass

async def request_worker(queue):
    
    p = multiprocessing.current_process()
    loop = asyncio.get_event_loop()
    
    while True:
        try:
            task = await loop.run_in_executor(None, queue.get)
            
            if task == 'END':
                break
            
            elif task['action'] == 'DOWNLOAD':
                print('Worker {}: Received new task'.format(p.name))
                f = loop.create_task(do_request(task['url'])) # high CPU usage
                # f = loop.create_task(mock_request(task['url'])) # low (almost none) CPU usage

        except KeyboardInterrupt:
            pass
        except Queue.Empty:
            pass

    print('Task Worker {}: ending'.format(p.name))

def run_workers(queue, processes):

    print('Starting workers')

    for _ in range(N_WORKERS):
        processes.append(multiprocessing.Process(target=worker, args=(queue,)))

    task = {
        'action': 'DOWNLOAD',
        'url': 'https://google.com'
    }
    
    # this is just an example forcing the same URL * 100 times, while in reaility
    # it will be 1 different URL per task
    for _ in range(100):
        queue.put(task)

    for p in processes:
        p.start()

    for p in processes:
        p.join()
    
    return True

if __name__ == "__main__":
    processes = []
    N_WORKERS = 8 # processes to spawn
    manager = multiprocessing.Manager()
    q = manager.Queue() # main queue to send URLs to

    # just a useful clean exit handler (press CTRL+C to terminate)
    exit_handler = ExitHandler(manager, q, processes) 
    exit_handler.set_exit_handler()

    # start the workers
    run_workers(q, processes)

リクエストを同時に実行するときに、各プロセスが消費するCPUの数の例を次に示します。

CPU使用率

マルチプロセッシングを使用するかどうかに関係なく、CPU使用率を大幅に削減する(1秒あたりのリクエスト数を同じに保つ)ソリューションはすべて受け入れられます。私にとって唯一の必需品はasyncパターンです。

kewne

これは際立っています:

while True:
    try:
        async with httpx.AsyncClient() as s:

これによりリクエストごとに新しいクライアントが初期化され、実装を確認することで、SSLコンテキストがインポートおよび初期化されます。これらはIMOのコストのかかる操作であるため、ループ内で実行すると、CPUに多大なコストがかかる可能性があります。

代わりに、コードを次のように並べ替えることを検討してください

async with httpx.AsyncClient() as s:
  while True:
    try:

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

マルチプロセッシング:同時HTTP非同期リクエストのCPU使用率を最適化する

分類Dev

Pythonマルチプロセッシングでの合計CPU使用率を制限する

分類Dev

プールとマルチプロセッシングを使用して、2つの関数を2つのリストに同時に適用する

分類Dev

スクリプトの実行時にマルチプロセッシングを使用する

分類Dev

シングルスレッドアプリケーションがマルチコアを占有し、CPU使用率を制限する理由

分類Dev

マルチプロセッシングで多数の非同期プロセスを開始する

分類Dev

FlatFileItemReaderと非同期プロセッサを使用してパフォーマンスを最適化する方法

分類Dev

LinuxでCPU使用率をチェックするスクリプト

分類Dev

プロセス名でフィルタリングし、CPU使用率をログに記録する

分類Dev

特定のプログラムのCPU使用率を監視して対応するバッチスクリプト

分類Dev

シェルスクリプトで時間とCPU使用率を交換する方法

分類Dev

マルチスレッドのないJavaプログラムで、gtimeコマンドがCPU使用率> 100%を返すのはなぜですか?

分類Dev

プロセスのCPU使用率を制限できるソリューションはありますか?

分類Dev

PyQtスクリプトでマルチプロセッシングを使用する場合のRuntimeErrorとIOError

分類Dev

Flash AS3(シェイプクラス)のCPU使用率と最適化

分類Dev

角度のあるhttpインターセプターで非同期スタイルでhttpリクエストをキャッシュする方法は?

分類Dev

CPU使用率、ディスク使用率、RAM使用率を出力するbashスクリプト

分類Dev

PythonスクリプトのCPU使用率をプロファイルする方法は?

分類Dev

Pythonスクリプト(または他のプログラミング言語スクリプト)を使用して、他のアプリケーションで使用されるリソース(RAM、CPU使用率など)を制限できますか?

分類Dev

SQLクエリを最適化して、クラスターのパフォーマンスを向上させ、ディスク使用率を削減します

分類Dev

プロセスをGPUからCPU使用率にシフトする方法

分類Dev

リクエストのマルチスレッドまたはマルチプロセッシングを実装する方法

分類Dev

特定のプロセスのCPU使用率を取得する

分類Dev

マルチプロセッシングまたはマルチスレッドアプリケーションでCPU時間を予約する

分類Dev

マルチプロセッシングを使用して、オブジェクトのリストにメソッドを並列に適用します

分類Dev

マルチプロセッシングモジュールを使用するときにCPU使用率を改善するにはどうすればよいですか?

分類Dev

プロセスごとのCPU使用率を監視する

分類Dev

マルチプロセッシングの使用方法。コールバックなしで非同期をプールしますか?

分類Dev

CPU使用率が低いPythonプロセスを強制終了するスクリプトが必要

Related 関連記事

  1. 1

    マルチプロセッシング:同時HTTP非同期リクエストのCPU使用率を最適化する

  2. 2

    Pythonマルチプロセッシングでの合計CPU使用率を制限する

  3. 3

    プールとマルチプロセッシングを使用して、2つの関数を2つのリストに同時に適用する

  4. 4

    スクリプトの実行時にマルチプロセッシングを使用する

  5. 5

    シングルスレッドアプリケーションがマルチコアを占有し、CPU使用率を制限する理由

  6. 6

    マルチプロセッシングで多数の非同期プロセスを開始する

  7. 7

    FlatFileItemReaderと非同期プロセッサを使用してパフォーマンスを最適化する方法

  8. 8

    LinuxでCPU使用率をチェックするスクリプト

  9. 9

    プロセス名でフィルタリングし、CPU使用率をログに記録する

  10. 10

    特定のプログラムのCPU使用率を監視して対応するバッチスクリプト

  11. 11

    シェルスクリプトで時間とCPU使用率を交換する方法

  12. 12

    マルチスレッドのないJavaプログラムで、gtimeコマンドがCPU使用率> 100%を返すのはなぜですか?

  13. 13

    プロセスのCPU使用率を制限できるソリューションはありますか?

  14. 14

    PyQtスクリプトでマルチプロセッシングを使用する場合のRuntimeErrorとIOError

  15. 15

    Flash AS3(シェイプクラス)のCPU使用率と最適化

  16. 16

    角度のあるhttpインターセプターで非同期スタイルでhttpリクエストをキャッシュする方法は?

  17. 17

    CPU使用率、ディスク使用率、RAM使用率を出力するbashスクリプト

  18. 18

    PythonスクリプトのCPU使用率をプロファイルする方法は?

  19. 19

    Pythonスクリプト(または他のプログラミング言語スクリプト)を使用して、他のアプリケーションで使用されるリソース(RAM、CPU使用率など)を制限できますか?

  20. 20

    SQLクエリを最適化して、クラスターのパフォーマンスを向上させ、ディスク使用率を削減します

  21. 21

    プロセスをGPUからCPU使用率にシフトする方法

  22. 22

    リクエストのマルチスレッドまたはマルチプロセッシングを実装する方法

  23. 23

    特定のプロセスのCPU使用率を取得する

  24. 24

    マルチプロセッシングまたはマルチスレッドアプリケーションでCPU時間を予約する

  25. 25

    マルチプロセッシングを使用して、オブジェクトのリストにメソッドを並列に適用します

  26. 26

    マルチプロセッシングモジュールを使用するときにCPU使用率を改善するにはどうすればよいですか?

  27. 27

    プロセスごとのCPU使用率を監視する

  28. 28

    マルチプロセッシングの使用方法。コールバックなしで非同期をプールしますか?

  29. 29

    CPU使用率が低いPythonプロセスを強制終了するスクリプトが必要

ホットタグ

アーカイブ