Асинхронное программирование: концепция Deferred 8

Posted by Андрей on Февраль 10, 2009

Асинхронная концепция программирования заключается в том, что результат выполнения функции доступен не сразу же, а через некоторое время в виде некоторого асинхронного (нарушающего обычный порядок выполнения) вызова. Зачем такое может быть полезно? Рассмотрим несколько примеров.

Первый пример – сетевой сервер, веб-приложение. Чаще всего как таковых вычислений на процессоре такие приложения не выполняют. Большая часть времени (реального, не процессорного) тратится на ввод-вывод: чтение запроса от клиента, обращение к диску за данными, сетевые обращение к другим подсистемам (БД, кэширующие сервера, RPC и т.п.), запись ответа клиенту. Во время этих операций ввода-вывода процессор простаивает, его можно загрузить обработкой запросов других клиентов. Возможны различные способы решить эту задачу: отдельный процесс на каждое соединение (Apache mpm_prefork, PostgreSQL, PHP FastCGI), отдельный поток (нить) на каждое соединение или комбинированный вариант процесс/нить (Apache mpm_worker, MySQL). Подход с использованием процессов или нитей перекладывает мультиплексирование процессора между обрабатываемыми соединениями на ОС, при этом расходуется относительно много ресурсов (память, переключения контекста и т.п.), такой вариант не подходит для обработки большого количества одновременных соединений, но идеален для ситуации, когда объем вычислений достаточно высок (например, в СУБД). К плюсам модели нитей и процессов можно добавить потенциальное использование всех доступных процессоров в многопроцессорной архитектуре.

Альтернативой является использование однопоточной модели с использованием примитивов асинхронного ввода-вывода, предоставляемых ОС (select, poll, и т.п.). При этом объем ресурсов на каждое новое обслуживаемое соединение не такой большой (новый сокет, какие-то структуры в памяти приложения). Однако программирование существенно усложняется, т.к. данные из сетевых сокетов поступают некоторыми «отрывками», причем за один цикл обработки данные поступают от разных соединений, находящихся в разных состояниях, часть соединений могут быть входящими от клиентов, часть – исходящими к внешним ресурсам (БД, другой сервер и т.п.). Для упрощения разработки используются различные концепции: callback, конечные автоматы и другие. Примеры сетевых серверов, использующих асинхронный ввод-вывод: nginx, lighttpd, HAProxy, pgBouncer, и т.д. Именно при такой однопоточной модели возникает необходимость в асинхронном программировании. Например, мы хотим выполнить запрос в БД. С точки зрения программы выполнение запроса – это сетевой ввод-вывод: соединение с сервером, отправка запроса, ожидание ответа, чтение ответа сервера БД. Поэтому если мы вызываем функцию «выполнить запрос БД», то она сразу вернуть результат не сможет (иначе она должна была бы заблокироваться), а вернет лишь нечто, что позволит впоследствие получить результат запроса или, возможно, ошибку (нет соединения с сервером, некорректный запрос и т.п.) Этим возвращаемым значением удобно сделать именно Deferred.

Второй пример связан с разработкой обычных десктопных приложений. Предположим, мы решили сделать аналог Miranda (QIP, MDC, …), то есть свой мессенджер. В интерфейсе программы есть контакт-лист, где можно удалить контакт. Когда пользователь выбирает это действие, он ожидает что контакт исчезнет на экране и что он действительно удалится из контакт-листа. На самом деле операция удаления из серверного контакт-листа опирается на сетевое взаимодействие с сервером, при этом пользовательский интерфейс не должен быть заблокирован на время выполнения этой операции, поэтому в любом случае после выполнения операции потребуется некоторое асинхронное взаимодействие с результатом операции. Можно использовать механизм сигналов-слотов, callback’ов или что-то еще, но лучше всего подойдет Deferred: операция удаления из контакт-листа возвращает Deferred, в котором обратно придет либо положительный результат (всё хорошо), либо исключение (точная ошибка, которую надо сообщить пользователю): в случае ошибки контакт надо восстановить контакт в контакт-листе.

Примеры можно приводить долго и много, теперь о том, что же такое Deferred. Deferred – это сердце framework’а асинхронного сетевого программирования Twisted в Python. Это простая и стройная концепция, которая позволяет перевести синхронное программирование в асинхронный код, не изобретая велосипед для каждой ситуации и обеспечивая высокое качества кода. Deferred – это просто возвращаемый результат функции, когда этот результат неизвестен (не был получен, будет получен в другой нити и т.п.) Что мы можем сделать с Deferred? Мы можем «подвеситься» в цепочку обработчиков, которые будут вызваны, когда результат будет получен. При этом Deferred может нести не только положительный результат выполнения, но и исключения, сгенерированные функцией или обработчиками, есть возможность исключения обработать, перевыкинуть и т.д. Фактически, для синхронного кода есть более-менее однозначная параллель в терминах Deferred. Для эффективной разработки с Deferred оказываются полезными такие возможности языка программирования, как замыкания, лямбда-функци.

Приведем пример синхронного кода и его альтернативу в терминах Deferred:

try:
    # Скачать по HTTP некоторую страницу
    page = downloadPage(url)
    # Распечатать содержимое
    print page
except HTTPError, e:
    # Произошла ошибка
    print "An error occured: %s", e

В асинхронном варианте с Deferred он был бы записан следующим образом:

def printContents(contents):
    """
    Callback, при успешном получении текста страницы,
    распечатываем её содержимое.
    """
    print contents
 
def handleError(failure):
    """
    Errback (обработчик ошибок), просто распечатываем текст ошибки.
    """
 
    # Мы готовы обработать только HTTPError, остальные исключения
    # "проваливаются" ниже.
    failure.trap(HTTPError)
    # Распечатываем само исключение
    print "An error occured: %s", failure
 
# Теперь функция выполняется асинхронно и вместо непосредственного
# результата мы получаем Deferred
deferred = downloadPage(url)
# Навешиваем на Deferred-объект обработчики успешных результатов
# и ошибок (callback, errback).
deferred.addCallback(printContents)
deferred.addErrback(handleError)

На практике обычно мы возвращаем Deferred из функций, которые получают Deferred в процессе своей работы, навешиваем большое количество обработчиков, обрабатываем исключения, некоторые исключения возвращаем через Deferred (выбрасываем наверх). В качестве более сложного примера приведем код в асинхронном варианте для примера атомарного счетчика из статьи про структуры данных в memcached, здесь мы предполагаем, что доступ к memcached как сетевому сервису идет через Deferred, т.е. методы класса Memcache возвращают Deferred (который вернет либо результат операции, либо ошибку):

class MCCounter(MemcacheObject):
    def __init__(self, mc, name):
        """
        Конструктор.
 
        @param name: имя счетчика
        @type name: C{str}
        """
        super(MCCounter, self).__init__(mc)
        self.key = 'counter' + name
 
    def increment(self, value=1):
        """
        Увеличить значение счетчика на указанное значение.
 
        @param value: инкремент
        @type value: C{int}
        @return: Deferred, результат операции
        """
        def tryAdd(failure):
            # Обрабатываем только KeyError, всё остальное "вывалится"
            # ниже
            failure.trap(KeyError)
 
            # Пытаемся создать ключ, если раз его еще нет
            d = self.mc.add(self.key, value, 0)
            # Если вдруг кто-то еще создаст ключ раньше нас,
            # мы это обработаем
            d.addErrback(tryIncr)
            # Возвращаем Deferred, он "вклеивается" в цепочку
            # Deferred, в контексте которого мы выполняемся
            return d
 
        def tryIncr(failure):
            # Всё аналогично функции tryAdd
            failure.trap(KeyError)
 
            d = self.mc.incr(self.key, value)
            d.addErrback(tryAdd)
            return d
 
        # Пытаемся выполнить инкремент, получаем Deferred
        d = self.mc.incr(self.key, value)
        # Обрабатываем ошибку
        d.addErrback(tryAdd)
        # Возвращаем Deferred вызывающему коду, он может тем самым:
        #  а) узнать, когда операция действительно завершится
        #  б) обработать необработанные нами ошибки (например, разрыв соединения)
        return d
 
    def value(self):
        """
        Получить значение счетчика.
 
        @return: текущее значение счетчика
        @rtype: C{int}
        @return: Deferred, значение счетчика
        """
        def handleKeyError(failure):
            # Обрабатываем только KeyError
            failure.trap(KeyError)
 
            # Ключа нет - возвращаем 0, он станет результатом
            # вышележащего Deferred
            return 0
 
        # Пытаемся получить значение ключа
        d = self.mc.get(self.key)
        # Будем обрабатывать ошибку отсутствия ключа
        d.addErrback(handleKeyError)
        # Возвращаем Deferred, наверх там можно будет повеситься
        # на его callback и получить искомое значение счетчика
        return d

На практике приведенный выше код можно написать «короче», объединяя часто используемые операции, например:

    return self.mc.get(self.key).addErrback(handleKeyError)

Практически для каждой конструкции синхронного кода можно найти аналог в асинхронной концепции с Deferred:

  • последовательности синхронных операторов соответствует цепочка callback с асинхронными вызовами;
  • вызову одной подпрграммы с вводом-выводом из другой соответствует возврат Deferred из Deferred (ветвление Deferred);
  • глубокой цепочки вложенности, распространению исключений по стеку соответствует цепочка функций, возвращающие друг другу Deferred;
  • блокам try..except соответствуют обработчики ошибок (errback), которые могут «пробрасывать» исключения дальше, любое исключение в callback переводит выполнение в errback;
  • для «параллельного» выполнения асинхронных операций есть DeferredList.

Нити часто применяются в асинхронных программах для осуществления вычислительных процедур, осуществления блокирующегося ввода-вывода (когда не существует асинхронного аналога). Всё это легко моделируется в простой модели ‘worker’, тогда нет необходимости при грамотной архитектуре в явной синхронизации, при этом всё элегантно включается в общий поток вычислений с помощью Deferred:

def doCalculation(a, b):
    """
    В этой функции осуществляются вычисления, синхронные операции ввода-вывода,
    не затрагивающие основной поток.
    """
 
    return a/b
 
def printResult(result):
    print result
 
def handleDivisionByZero(failure):
    failure.trap(ZeroDivisionError)
 
    print "Ooops! Division by zero!"
 
deferToThread(doCalculation, 3, 2).addCallback(printResult).addCallback(
    lambda _: deferToThread(doCalculation, 3, 0).addErrback(handleDivisionByZero))

В приведенном выше примере функция deferToThread переносит выполнение указанной функции в отдельную нить и возвращает Deferred, через который будет асинхронно получен результат выполнения функции или исключение, если они будет выброшено. Первое деление (3/2) выполняется в отдельной нити, затем распечатывается его результат на экран, а затем запускается еще одно вычисление (3/0), которое генерирует исключение, обрабатываемое функцией handleDivisionByZero.

В одной статье не описать и части того, что хотелось бы сказать о Deferred, мне удалось не написать ни слова о том, как же они работают. Если успел заинтересовать – читайте материалы ниже, а я обещаю написать еще.

Дополнительные материалы

Trackbacks

Use this link to trackback from your own site.

Comments

Leave a response

  1. incoob Ср, 11 Фев 2009 10:47:16 UTC

    в случае ошибки контакт надо восстановить контакт в контакт-листе.

    тут вроде опечатка

  2. Андрей Ср, 11 Фев 2009 12:09:05 UTC

    Вроде бы нет. Если мы решили удалить, начали удалять, а удалить не получилось – мы должны в списке контактов его восстановить.

  3. brn Ср, 11 Фев 2009 13:12:13 UTC

    Ну как нет, должно быть либо «в случае ошибки контакт надо восстановить в контакт-листе», либо «в случае ошибки надо восстановить контакт в контакт-листе»

  4. Андрей Ср, 11 Фев 2009 13:35:26 UTC

    Спасибо, не заметил!

  5. Кирилл Чт, 12 Фев 2009 00:13:11 UTC

    Сложновато все это для меня новичка.

  6. Андрей Чт, 12 Фев 2009 03:04:11 UTC

    Я надеюсь в последующих постах немного разобраться со сложностью и рассказать по-простому, что и как.

  7. Артем Вт, 27 Окт 2009 14:20:15 UTC

    Добрый день Андрей!

    Подскажите пожалуйста как работать с базой данный в twisted в частности интересует mysql?

  8. Андрей Вт, 27 Окт 2009 15:00:12 UTC

    Артем, один из вариантов – это ADBAPI. Через него доступен любой вариант DB-API Python. По сути обычный блокирующийся драйвер БД будет выполняться в отдельной нити.

    Документация тут: http://twistedmatrix.com/projects/core/documentation/howto/index.html

    См. раздел «Twisted RDBMS Support»

Comments