Post: Introduction to Achieving Concurrency in Python for Faster Network Automation

4 minute read

Throughout the years, I’ve utilized different methods to achieve concurrency in Python. In this article, I will give a few simple examples of how you can speed up your tasks for network automation (or anything really!).

Heads up! The examples include Python’s API for multithreading/multiprocessing, however you may never need to use it. If you are curious or need to implement your own concurrency code, the examples below are for you. If you’d rather use an automation framework, check out nornir. It is an alternative to Ansible where you can write all your automation tasks in pure Python. Engineers praise it’s ease of use and speed.

To make this article short, sweet, and to the point, I’m going to quickly go over a few basic concepts:

Concurrency

  • This is efficiently using the resources at hand. Things don’t necessarily have to be done at the same time, but rather tasks can be stopped and resumed while waiting for an I/O event. If you invoke several GET requests to an Arista switch, there will be some network latency involved, processing from the switch itself, etc. Using multithreading in Python will speed up API calls when multiple switches are involved.

Parallelism

  • Doing a lot of things at the same time. Imagine an 8 core processor where each core can work on different chunks of data to speed up the overall task. Parallelism can be a way of achieving conccurency.

GIL

  • The GIL is a mutex (lock) in CPython that will prevent more than one thread from executing Python bytecodes at once. CPython’s memory management is not thread-safe. This prevents race conditions. The GIL is great for single-threaded applications, but not for multi-threaded once. For example, when you make multiple API calls using multithreading, the GIL will not have much affect. This is because the GIL is shared between threads while they are waiting for I/O. So Python can switch off between threads and do work while waiting for another thread.
  • CPU bound programs are more efficiently ran using multi-processing
  • I/O bound programs (database, file, network) are more efficiently ran using multi-threading

When you run a multithreading application, you are spawning multiple child threads from a main thread:

Postman Import

If you are ever curious and want to monitor child threads spawned from a script you’ve created, you can run this command (or something similar) right after your program begins:

watch -n .5 'ps aux | grep multi-thread.py | grep -v grep

And multiprocessing:

Postman Import

Python now has the Futures library, introduced in 3.2 but has since added many features and changes. This is the new preferred way of writing multiprocessing/multithreading in code. If you are familiar with Javascript, the concepts should be very familiar. For example, Future may be thought of as a Promise. We will not be covering Python’s asyncio library in this post.

Alright, let’s get into the code! We will be using Parmiko to communicate via SSH to a few Cumulus switches set up in Vagrant. We will not cover the set up of my lab environment in this post. Run pip install parmiko if you do not already have this external library.

Example 1: Multithreading using Python’s multiprocessing.dummy module (Old Way). We use map() to run the show_system function against each argument in the provided list. map() will return the results in the order the arguments were received. close() is used to close the pool of workers. join() will wait until the workers have completed their work.

from multiprocessing.dummy import Pool as Threadpool
import paramiko
import json


def show_system(hostname):
    client = paramiko.SSHClient()
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    client.connect(
        hostname=hostname,
        username='cumulus',
        password='CumulusLinux!',
        look_for_keys=False,
        allow_agent=False,
        timeout=5
    )

    # show system information
    _, stdout, stderr = client.exec_command('net show system json')
    stdout = stdout.read().decode('utf-8')
    structured_show_system_output = json.loads(stdout)

    return structured_show_system_output


host_ips = ['192.168.1.26', '192.168.1.27', '192.168.1.28']

pool = Threadpool(16)

results = pool.map(show_system, host_ips)

pool.close()
pool.join()

Note: Functions need to be pickleable. Python exchange objects via pickles. This converts python objects to byte-strings and vice versa - it serializes/deserializes. Functions inside other functions can’t work, things like generators don’t work, etc.

Example 2: Multithreading using the Python Futures API (New Way)

from concurrent import futures


with futures.ThreadPoolExecutor(max_workers=16) as pool:
    results = [pool.submit(show_system, host_ip) for host_ip in host_ips]

    for future in futures.as_completed(results): # as_completed will return results as you get them
        data = future.result()
        print(data)

Additionally, you can guarantee results are returned in the order the arguments were passed by using map()

with futures.ThreadPoolExecutor(max_workers=16) as pool:

    for future in pool.map(show_system, host_ips): 
        print(future)

Note: You can input multiple arguments per element in iterable by providing pool.map() with more than one iterable

Note: There is a default for the max_workers argument that is a formula that seemigly changes every new release of Python :-)

Using wait() to wait for all futures to be fulfilled

pool = futures.ThreadPoolExecutor(max_workers=16)
f = [pool.submit(show_system, host_ip) for host_ip in host_ips]

print(futures.wait(f))

Example 4: Implement Multiprocessing using the Futures API

Normally, you may have to use something like manager to share a list or another data-type between processes. Here, we share the counter variable between processes

import time
import random

# we need to simulate some sort of CPU intensive work here, so we use time.sleep()
def do_something_interesting(i):
    time.sleep(1)
    return random.randint(0, 100000) + i

counter = int()
with futures.ProcessPoolExecutor(max_workers=8) as pool:
    submits = [pool.submit(do_something_interesting, x) for x in range(0, 4)]

    for i in futures.as_completed(submits):
        r = i.result() # you can use the timeout kwarg in result to wait a certain time before moving on
        counter += 1
        print(r)

print(counter)

As processes are worked on, they will update the counter.

Pretty easy right? That’s the beauty and power of Python. I plan on migrating code to use the Futures API which provides an even easier way to access concurrency. Leave a comment below and let me know what you think!

More on the GIL: GIL

Concurrency Vs. Parallelism: concurrency-vs-parallelism

Comments