{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Introduction to Multiprocessing in Python\n", "Adapted from: \"LucidProgramming\" [1](https://github.com/vprusso/youtube_tutorials/tree/master/multiprocessing_and_threading)\n", "\n", "Multiprocessing docs: [2](https://docs.python.org/3.7/library/multiprocessing.html)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import time\n", "import os\n", "print(os.cpu_count())\n", "from multiprocessing import cpu_count\n", "print(cpu_count())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Processes" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from multiprocessing import Process, current_process" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# an example function\n", "\n", "def example(t):\n", " # Process ID assigned by Python\n", " proc_name = current_process().name\n", " # Process ID assigned by system\n", " proc_id = os.getpid()\n", " time.sleep(t)\n", " print(f\"Process {proc_name} ({proc_id}) waited {t} seconds\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "times = [5, 4, 3, 2, 1]\n", "proc_list = []\n", "\n", "for t in times:\n", " proc = Process(target=example, args=(t,))\n", " proc_list.append(proc)\n", " proc.start()\n", "\n", "for proc in proc_list:\n", " proc.join()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "proc_list = []\n", "\n", "for _ in range(100):\n", " proc = Process(target=example, args=(10,))\n", " proc_list.append(proc)\n", " proc.start()\n", " \n", "for proc in proc_list:\n", " proc.join()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Pool" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# YouTube Link: https://www.youtube.com/watch?v=u2jTn-Gj2Xw\n", "\n", "# One can create a pool of processes which will carry out tasks submitted to\n", "# it with the Pool class.\n", "\n", "# A process pool object which controls a pool of worker processes to which\n", "# jobs can be submitted. It supports asynchronous results with timeouts and\n", "# callbacks and has a parallel map implementation.\n", "\n", "import time\n", "from multiprocessing import Pool\n", "\n", "\n", "# contrived CPU intensive task\n", "def sum_square(numbers):\n", " s = 0\n", " for i in range(numbers):\n", " s += i * i\n", " return s\n", "\n", "\n", "def sum_square_with_mp(numbers):\n", "\n", " start_time = time.time()\n", " p = Pool()\n", " result = p.map(sum_square, numbers)\n", "\n", " p.close()\n", " p.join()\n", "\n", " end_time = time.time() - start_time\n", "\n", " print(f\"Processing {len(numbers)} numbers took {end_time} time using multiprocessing.\")\n", "\n", "\n", "def sum_square_no_mp(numbers):\n", "\n", " start_time = time.time()\n", " result = []\n", "\n", " for i in numbers:\n", " result.append(sum_square(i))\n", " end_time = time.time() - start_time\n", "\n", " print(f\"Processing {len(numbers)} numbers took {end_time} time using serial processing.\")\n", "\n", "numbers = range(10000)\n", "sum_square_with_mp(numbers)\n", "sum_square_no_mp(numbers)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Locks" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# YouTube Link: https://www.youtube.com/watch?v=iYJNmuD4McE\n", "\n", "# A lock or mutex is a sychronization mechanism for enforcing\n", "# limits on access to a resource in an environment where there\n", "# are many threads of execution.\n", "\n", "# More on locks:\n", "# https://en.wikipedia.org/wiki/Lock_(computer_science)\n", "\n", "from multiprocessing import Process, Lock, Value\n", "\n", "\n", "def add_500_no_mp(total):\n", " for i in range(100):\n", " time.sleep(0.01)\n", " total += 5\n", " return total\n", "\n", "\n", "def sub_500_no_mp(total):\n", " for i in range(100):\n", " time.sleep(0.01)\n", " total -= 5\n", " return total\n", "\n", "\n", "def add_500_no_lock(total):\n", " for i in range(100):\n", " time.sleep(0.01)\n", " total.value += 5\n", "\n", "\n", "def sub_500_no_lock(total):\n", " for i in range(100):\n", " time.sleep(0.01)\n", " total.value -= 5\n", "\n", "\n", "def add_500_lock(total, lock):\n", " for i in range(100):\n", " time.sleep(0.01)\n", " lock.acquire()\n", " total.value += 5\n", " lock.release()\n", "\n", "\n", "def sub_500_lock(total, lock):\n", " for i in range(100):\n", " time.sleep(0.01)\n", " lock.acquire()\n", " total.value -= 5\n", " lock.release()\n", "\n", "total = Value('i', 500)\n", "\n", "add_proc = Process(target=add_500_no_lock, args=(total,))\n", "sub_proc = Process(target=sub_500_no_lock, args=(total,))\n", "\n", "#lock = Lock()\n", "#add_proc = Process(target=add_500_lock, args=(total, lock))\n", "#sub_proc = Process(target=sub_500_lock, args=(total, lock))\n", "\n", "add_proc.start()\n", "sub_proc.start()\n", "\n", "add_proc.join()\n", "sub_proc.join()\n", "print(total.value)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Queues" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# YouTube Link: https://www.youtube.com/watch?v=TQx3IfCVvQ0\n", "\n", "# We show how to make use of the multiprocessing Queue class to communicate\n", "# between different processes.\n", "\n", "\n", "from multiprocessing import Process, Queue\n", "\n", "\n", "def square(numbers, queue):\n", " for i in numbers:\n", " queue.put(i*i)\n", "\n", "\n", "def cube(numbers, queue):\n", " for i in numbers:\n", " queue.put(i*i*i)\n", "\n", "\n", "numbers = range(5)\n", "\n", "queue = Queue()\n", "square_process = Process(target=square, args=(numbers, queue))\n", "cube_process = Process(target=cube, args=(numbers, queue))\n", "\n", "square_process.start()\n", "cube_process.start()\n", "\n", "square_process.join()\n", "cube_process.join()\n", "\n", "while not queue.empty():\n", " print(queue.get())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Pipes" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from multiprocessing import Process, Pipe\n", "\n", "def f(conn):\n", " conn.send([42, None, 'hello'])\n", " conn.close()\n", "\n", "a, b = Pipe()\n", "p = Process(target=f, args=(a,))\n", "p.start()\n", "print(b.recv()) # prints \"[42, None, 'hello']\"\n", "p.join()" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.3" } }, "nbformat": 4, "nbformat_minor": 2 }