Combining SSH and subprocess pipe in Python asyncio

When creating tool for system administration tasks, we often have the need to execute some other command-line programs, execute some commands on remote machine, transferring data between local and remote machine via SSH. A straightforward thinking will be to implement the tool in shell. But what if you, like me, hate the syntax of shell and be happier with Python? And especially when you want to challenge yourself with the new (somehow) cool asyncio?

To have a clear picture, let's give an example that you want to implement a command like this:

ssh remote-server "pg_dump -O webdata | gzip" | gunzip | psql localdb

This command is often used by me to transfer data from PostgreSQL on server to my local machine. What it does is to:

  • Connect to remote-server via SSH.
  • Run pg_dump on that server to backup the webdata database.
  • The exported data is not saved to file, but is fed to gzip, also on server.
  • The compressed data, output by gzip is sent back to local machine. In stead of being saved to file, it is passed directly to local gunzip.
  • The data uncompressed by gunzip is piped to psql, which restores it to localdb database.

Some little side notes here:

  • You don't see any parameter of username, password because I like to connect to PostgreSQL via Unix socket. By that, I don't need to generate password and don't have to worry about protecting password. I also often install PostgreSQL on the same machine as the web application, so I don't have to expose PostgreSQL to internet and avoid attacks targeting my PostgreSQL. Of course, this cannot apply to big, complex websites where you have to install PostgreSQL on separate server.
  • The -O option in pg_dump is to strip the "owner" information. It is because I use different database user for server and localhost.
  • The use of gzip and gunzip is to make data transferred over the network less.

Now, back to the problem. For one-shot task, I often just type the above command to Terminal. But when having to tackle with more things, like finding list of databases to transfer, do more tasks in addition to transfer database, I will write a script. To reduce the delay time, I make it run asynchronously, using asyncio-based libraries.

In a traditional blocking Python script, we can use paramiko for SSH, and standard lib subprocess for invoking other program.

In asyncio style, we can use asyncssh for SSH, and asyncio.create_subprocess_exec for "subprocess" thing. But the problem is, how to make it work together, given that we have to implement the "pipe" in Python?

One more note here: Actually, people can just pass the shell command above to Python's subprocess. But in that case, you are letting shell do everything. What Python does is just to invoke the shell. That is not fun and not let us discover Python's strength.

Ok. So the code can be like this:

async def copy_postgres_db(name: str):
    async with asyncssh.connect(REMOTE_HOST, username=REMOTE_USER) as conn:
        print(f'Connected to {REMOTE_HOST}')
        pipe1_read, pipe1_write = os.pipe()
        pipe2_read, pipe2_write = os.pipe()
        cmd_dump = f'pg_dump -O {name}'
        print(f'[{REMOTE_HOST}]: {cmd_dump}')
        p1 = await conn.create_process(cmd_dump, encoding=None)
        cmd_gzip = 'gzip -c'
        print(f'[{REMOTE_HOST}]: {cmd_gzip}')
        p2 = await conn.create_process(cmd_gzip, stdin=p1.stdout, stdout=pipe1_write, encoding=None)
        await p2.wait()
        print('[local]: gunzip')
        await asyncio.create_subprocess_exec('gunzip', stdin=pipe1_read, stdout=pipe2_write, encoding=None)
        os.close(pipe1_read)
        os.close(pipe2_write)
        print(f'[local]: psql {name}')
        p4 = await asyncio.create_subprocess_exec('psql', name, stdin=pipe2_read, encoding='utf-8')
        await p4.wait()
        os.close(pipe2_read)
        return p4.returncode == 0

Here, you can see that the way to connect data stream between processes in traditional Python (subprocess module), in asyncio (asyncio.create_subprocess_exec) and in asyncssh (asyncssh.SSHClientConnection.create_process) are pretty different.

In traditional, synchronous Python, you just pass subprocess.PIPE to stdout parameter of the process (example). But in asyncio, you must call lower-level function os.pipe() to create the pipe and connect the two end appropriately. When mixed with asyncssh, things start to be confusing.

First, if both processes are on remote machine:

cmd_dump = f'pg_dump -O {name}'
p1 = await conn.create_process(cmd_dump, encoding=None)
cmd_gzip = 'gzip -c'
p2 = await conn.create_process(cmd_gzip, stdin=p1.stdout, stdout=pipe1_write, encoding=None)

In the process 1, you don't need to pass anything to stdout. Actually, the create_process get arguments stdin=PIPE, stdout=PIPE by default. But if looking into its source code, we see that PIPE are imported from asyncio.subprocess. It is confusing because we knew above that, to connect data of processes created by asyncio, we have to use pipe from os.pipe.

Second, now look into how to connect remote process with local process:

cmd_gzip = 'gzip -c'
p2 = await conn.create_process(cmd_gzip, stdin=p1.stdout, stdout=pipe1_write, encoding=None)
await p2.wait()
await asyncio.create_subprocess_exec('gunzip', stdin=pipe1_read, stdout=pipe2_write, encoding=None)

Well... we are back to os.pipe, even have to pass it to the remote process.

One killing point to make you succeed with this game is to not forget await on process.wait() and encoding=None.

So, we have explored more about asyncio. Hope that in the future, the API of asyncio-based libraries will be more consistent.

Moreover, look at the first shell command, we can see how sexy Linux is. SSH with public keys, Postgres with Unix socket, pipe, all combined give you a very neat command to do many things smoothly.