portage API now provides an asyncio event loop policy

In portage-2.3.30, portage’s python API provides an asyncio event loop policy via a DefaultEventLoopPolicy class. For example, here’s a little program that uses portage’s DefaultEventLoopPolicy to do the same thing as emerge --regen, using an async_iter_completed function to implement the --jobs and --load-average options:

#!/usr/bin/env python

from __future__ import print_function

import argparse
import functools
import multiprocessing
import operator

import portage
from portage.util.futures.iter_completed import (
    async_iter_completed,
)
from portage.util.futures.unix_events import (
    DefaultEventLoopPolicy,
)


def handle_result(cpv, future):
    metadata = dict(zip(portage.auxdbkeys, future.result()))
    print(cpv)
    for k, v in sorted(metadata.items(),
        key=operator.itemgetter(0)):
        if v:
            print('\t{}: {}'.format(k, v))
    print()


def future_generator(repo_location, loop=None):

    portdb = portage.portdb

    for cp in portdb.cp_all(trees=[repo_location]):
        for cpv in portdb.cp_list(cp, mytree=repo_location):
            future = portdb.async_aux_get(
                cpv,
                portage.auxdbkeys,
                mytree=repo_location,
                loop=loop,
            )

            future.add_done_callback(
                functools.partial(handle_result, cpv))

            yield future


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--repo',
        action='store',
        default='gentoo',
    )
    parser.add_argument(
        '--jobs',
        action='store',
        type=int,
        default=multiprocessing.cpu_count(),
    )
    parser.add_argument(
        '--load-average',
        action='store',
        type=float,
        default=multiprocessing.cpu_count(),
    )
    args = parser.parse_args()

    try:
        repo_location = portage.settings.repositories.\
            get_location_for_name(args.repo)
    except KeyError:
        parser.error('unknown repo: {}\navailable repos: {}'.\
            format(args.repo, ' '.join(sorted(
            repo.name for repo in
            portage.settings.repositories))))

    policy = DefaultEventLoopPolicy()
    loop = policy.get_event_loop()

    try:
        for future_done_set in async_iter_completed(
            future_generator(repo_location, loop=loop),
            max_jobs=args.jobs,
            max_load=args.load_average,
            loop=loop):
            loop.run_until_complete(future_done_set)
    finally:
        loop.close()



if __name__ == '__main__':
    main()

Adapting regular iterators to asynchronous iterators in python

For I/O bound tasks, python coroutines make a nice replacement for threads. Unfortunately, there’s no asynchronous API for reading files, as discussed in the Best way to read/write files with AsyncIO thread of the python-tulip mailing list.

Meanwhile, it is essential that a long-running coroutine contain some asynchronous calls, since otherwise it will run all the way to completion before any other event loop tasks are allowed to run. For a long-running coroutine that needs to call a conventional iterator (rather than an asynchronous iterator), I’ve found this converter class to be useful:

class AsyncIteratorExecutor:
    """
    Converts a regular iterator into an asynchronous
    iterator, by executing the iterator in a thread.
    """
    def __init__(self, iterator, loop=None, executor=None):
        self.__iterator = iterator
        self.__loop = loop or asyncio.get_event_loop()
        self.__executor = executor

    def __aiter__(self):
        return self

    async def __anext__(self):
        value = await self.__loop.run_in_executor(
            self.__executor, next, self.__iterator, self)
        if value is self:
            raise StopAsyncIteration
        return value

For example, it can be used to asynchronously read lines of a text file as follows:

async def cat_file_async(filename):
    with open(filename, 'rt') as f:
        async for line in AsyncIteratorExecutor(f):
            print(line.rstrip())

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(
            cat_file_async('/path/of/file.txt'))
    finally:
        loop.close()