Наши партнеры

UnixForum





Библиотека сайта rus-linux.net

Кластеризация согласно консенсусу

Оригинал: Clustering by Consensus
Автор: Dustin J. Mitchell
Дата публикации: July 12, 2016
Перевод: Н.Ромоданов
Дата перевода: январь 2017 г.

Сеть

В любом сетевом протоколе должна быть возможность отправлять и получать сообщения, а также средство вызова функций в любой момент времени в будущем.

В классе Network предоставляется простая имитация сети, имеющая эти возможности, а также смоделирована потеря пакетов и задержка передачи сообщений.

Таймеры представляют собой обработчики, в которых используется модуль heapq языка Python, позволяющий эффективно выбирать следующие события. Настройка таймера включает в себя размещение объекта Timer в памяти типа куча. Поскольку удаление элементов из кучи неэффективно, отмененные таймеры остаются в куче, но помечаются как отмененные.

Передача сообщений использует функциональные возможности таймера для планирования более поздней доставки сообщения в каждый узел с использованием случайно смоделированной задержки. Мы снова используем метод functools.partial для настройки в узле назначения будущего вызова метода receive с соответствующими аргументами.

Запуск процесса моделирования представляет собой только выскакивание таймеров из кучи и выполнения их, если они не были отменены и если узел назначения по-прежнему активен.

class Timer(object):

    def __init__(self, expires, address, callback):
        self.expires = expires
        self.address = address
        self.callback = callback
        self.cancelled = False

    def __cmp__(self, other):
        return cmp(self.expires, other.expires)

    def cancel(self):
        self.cancelled = True

class Network(object):
    PROP_DELAY = 0.03
    PROP_JITTER = 0.02
    DROP_PROB = 0.05

    def __init__(self, seed):
        self.nodes = {}
        self.rnd = random.Random(seed)
        self.timers = []
        self.now = 1000.0

    def new_node(self, address=None):
        node = Node(self, address=address)
        self.nodes[node.address] = node
        return node

    def run(self):
        while self.timers:
            next_timer = self.timers[0]
            if next_timer.expires > self.now:
                self.now = next_timer.expires
            heapq.heappop(self.timers)
            if next_timer.cancelled:
                continue
            if not next_timer.address or next_timer.address in self.nodes:
                next_timer.callback()

    def stop(self):
        self.timers = []

    def set_timer(self, address, seconds, callback):
        timer = Timer(self.now + seconds, address, callback)
        heapq.heappush(self.timers, timer)
        return timer

    def send(self, sender, destinations, message):
        sender.logger.debug("sending %s to %s", message, destinations)
        # avoid aliasing by making a closure containing distinct deep copy of
        # message for each dest
        def sendto(dest, message):
            if dest == sender.address:
                # reliably deliver local messages with no delay
                self.set_timer(sender.address, 0,  
                               lambda: sender.receive(sender.address, message))
            elif self.rnd.uniform(0, 1.0) > self.DROP_PROB:
                delay = self.PROP_DELAY + self.rnd.uniform(-self.PROP_JITTER, 
                                                           self.PROP_JITTER)
                self.set_timer(dest, delay, 
                               functools.partial(self.nodes[dest].receive, 
                                                 sender.address, message))
        for dest in (d for d in destinations if d in self.nodes):
            sendto(dest, copy.deepcopy(message))

Компонентная модель позволяет без каких-либо изменений в других компонентах переключаться в реально существующую сеть и взаимодействовать между реальными серверами в реальной сети, но это не было добавлено к данной реализации. Тестирование и отладку можно осуществлять в имитационной сети и использовать библиотеку, работающую на реальном сетевом оборудовании.

Поддержка процесса отладки

При разработке такой сложной системы, как эта, ошибки быстро переходят из стадии тривиальных, например, простая NameError (ошибка наименования), к скрытым отказам, которые проявляются только после нескольких минут (моделируемой) операции протокола. Отслеживание подобной ошибки требует выполнение работы по движению обратном направлении от точки, где ошибка стала очевидной. Интерактивные отладчики здесь бесполезны, поскольку они могут двигаться во времени только вперед.

Наиболее важной механизмом отладки в кластере Cluster является детерминированный симулятор. В отличие от реальной сети, он при каждом запуске будет вести себя одинаково и выдавать те же самые числа-затравки для генератора случайных чисел. Это означает, что мы дл того, чтобы увидеть ту же самую проблему более подробно, можем добавить в код дополнительные проверки или отладочную печать и повторно запустить моделирование.

Конечно, большая часть подробностей работы заключена в сообщениях, которыми обмениваются узлы в кластере, поэтому они автоматически записываются в журнал полностью. В журнале записываются классы ролей, которые принимают или передают сообщения, а также имитационную отметку времени (timestamp), которая вставляется с помощью класса SimTimeLogger.

class SimTimeLogger(logging.LoggerAdapter):

    def process(self, msg, kwargs):
        return "T=%.3f %s" % (self.extra['network'].now, msg), kwargs

    def getChild(self, name):
        return self.__class__(self.logger.getChild(name),
                              {'network': self.extra['network']})

Протокол, обладающий внутренней устойчивостью, такой как наш, часто может работать в течение длительного времени уже после того, как сработает ошибка. Например, во время разработки ошибка обращения через алиасы к данным была причиной того, что в результате все роли replica использовали один и тот же словарь решений decisions. Это означало, что как только решение было принято на одном узле, на всех остальных узлах это выглядело, как если бы на них тоже уже было принято решение. Даже с этой серьезной ошибкой кластер до того, как заблокировался, получил правильные результаты для нескольких транзакций.

Для того, чтобы как можно раньше поймать ошибки такого рода, можно воспользоваться таким важным инструментом, как утверждения. В утверждениях можно пользоваться любыми инвариантами, относящимися к сфере разработки алгоритмов, но когда код не ведет себя не так, как мы ожидаем, утверждения о наших ожиданиях будут отличным способом увидеть, где мы ошиблись.

 	assert not self.decisions.get(self.slot, None), \
            "next slot to commit is already decided"
    if slot in self.decisions:
        assert self.decisions[slot] == proposal, \
            "slot %d already decided with %r!" % (slot, self.decisions[slot])

Правильные предположеня, которые мы делаем во время чтения кода, является частью искусства отладки. В этом коде, взятом из Replica.do_Decision, проблема заключалась в том, что решение Decision для следующего слота, которое должно быть запомнено, игнорировалось, поскольку оно уже было в self.decisions. Лежащее в основе этого действия предположение было нарушено, поскольку решение для следующего слота не было принято. Утверждение, вставленное в начале кода do_Decision, позволило идентифицировать проблему и быстро привело к ее устранению. Аналогичным образом были выявлены другие серьезные ошибки, когда для одного и того же слота были разрешены различные решения.

В процесс разработки протокола было добавлено много других утверждений, но для экономии места мы оставили в коде только некоторые из них.

Перейти к следующей части статьи.